Unbork unfurler concurrency implementation
This commit is contained in:
		
							parent
							
								
									0631f357b6
								
							
						
					
					
						commit
						1a903d3efb
					
				| @ -1,5 +1,5 @@ | ||||
| import * as puppeteer from 'puppeteer'; | ||||
| import ConcurrencyImplementation, { ResourceData } from 'puppeteer-cluster/dist/concurrency/ConcurrencyImplementation'; | ||||
| import ConcurrencyImplementation from 'puppeteer-cluster/dist/concurrency/ConcurrencyImplementation'; | ||||
| import { timeoutExecute } from 'puppeteer-cluster/dist/util'; | ||||
| 
 | ||||
| import config from '../config'; | ||||
| @ -8,17 +8,24 @@ const mempoolHost = config.MEMPOOL.HTTP_HOST + (config.MEMPOOL.HTTP_PORT ? ':' + | ||||
| const BROWSER_TIMEOUT = 8000; | ||||
| // maximum lifetime of a single page session
 | ||||
| const maxAgeMs = (config.PUPPETEER.MAX_PAGE_AGE || (24 * 60 * 60)) * 1000; | ||||
| const maxConcurrency = config.PUPPETEER.CLUSTER_SIZE; | ||||
| 
 | ||||
| interface repairablePage extends puppeteer.Page { | ||||
| interface RepairablePage extends puppeteer.Page { | ||||
|   repairRequested?: boolean; | ||||
|   language?: string | null; | ||||
|   createdAt?: number; | ||||
|   free?: boolean; | ||||
|   index?: number; | ||||
| } | ||||
| 
 | ||||
| interface ResourceData { | ||||
|   page: RepairablePage; | ||||
| } | ||||
| 
 | ||||
| export default class ReusablePage extends ConcurrencyImplementation { | ||||
| 
 | ||||
|   protected browser: puppeteer.Browser | null = null; | ||||
|   protected currentPage: repairablePage | null = null; | ||||
|   protected pageCreatedAt: number = 0; | ||||
|   protected pages: RepairablePage[] = []; | ||||
|   private repairing: boolean = false; | ||||
|   private repairRequested: boolean = false; | ||||
|   private openInstances: number = 0; | ||||
| @ -46,40 +53,70 @@ export default class ReusablePage extends ConcurrencyImplementation { | ||||
|     } | ||||
| 
 | ||||
|     try { | ||||
|       this.browser = await this.puppeteer.launch(this.options) as puppeteer.Browser; | ||||
|       await this.init(); | ||||
|     } catch (err) { | ||||
|       throw new Error('Unable to restart chrome.'); | ||||
|     } | ||||
|     this.currentPage = null; | ||||
|     this.repairRequested = false; | ||||
|     this.repairing = false; | ||||
|     this.waitingForRepairResolvers.forEach(resolve => resolve()); | ||||
|     this.waitingForRepairResolvers = []; | ||||
|     await this.createResources(); | ||||
|   } | ||||
| 
 | ||||
|   public async init() { | ||||
|     this.browser = await this.puppeteer.launch(this.options); | ||||
|     const promises = [] | ||||
|     for (let i = 0; i < maxConcurrency; i++) { | ||||
|       const newPage = await this.initPage(); | ||||
|       newPage.index = this.pages.length; | ||||
|       console.log('initialized page ', newPage.index); | ||||
|       this.pages.push(newPage); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public async close() { | ||||
|     await (this.browser as puppeteer.Browser).close(); | ||||
|   } | ||||
| 
 | ||||
|   protected async initPage(): Promise<RepairablePage> { | ||||
|     const page = await (this.browser as puppeteer.Browser).newPage() as RepairablePage; | ||||
|     page.language = null; | ||||
|     page.createdAt = Date.now(); | ||||
|     const defaultUrl = mempoolHost + '/preview/block/1'; | ||||
|     page.on('pageerror', (err) => { | ||||
|       page.repairRequested = true; | ||||
|     }); | ||||
|     await page.goto(defaultUrl, { waitUntil: "load" }); | ||||
|     page.free = true; | ||||
|     return page | ||||
|   } | ||||
| 
 | ||||
|   protected async createResources(): Promise<ResourceData> { | ||||
|     if (!this.currentPage) { | ||||
|       this.currentPage = await (this.browser as puppeteer.Browser).newPage(); | ||||
|       this.currentPage.language = null; | ||||
|       this.pageCreatedAt = Date.now(); | ||||
|       const defaultUrl = mempoolHost + '/preview/block/1'; | ||||
|       this.currentPage.on('pageerror', (err) => { | ||||
|         this.repairRequested = true; | ||||
|       }); | ||||
|       await this.currentPage.goto(defaultUrl, { waitUntil: "load" }); | ||||
|     const page = this.pages.find(p => p.free); | ||||
|     if (!page) { | ||||
|       console.log('no free pages!') | ||||
|       throw new Error('no pages available'); | ||||
|     } else { | ||||
|       page.free = false; | ||||
|       return { page }; | ||||
|     } | ||||
|     return { | ||||
|       page: this.currentPage | ||||
|   } | ||||
| 
 | ||||
|   protected async repairPage(page) { | ||||
|     // create a new page
 | ||||
|     const newPage = await this.initPage(); | ||||
|     newPage.free = true; | ||||
|     // replace the old page
 | ||||
|     newPage.index = page.index; | ||||
|     this.pages.splice(page.index, 1, newPage); | ||||
|     // clean up the old page
 | ||||
|     try { | ||||
|       await page.goto('about:blank', {timeout: 200}); // prevents memory leak (maybe?)
 | ||||
|     } catch (e) { | ||||
|       console.log('unexpected page repair error'); | ||||
|     } | ||||
|     await page.close(); | ||||
|     return newPage; | ||||
|   } | ||||
| 
 | ||||
|   public async workerInstance() { | ||||
| @ -97,8 +134,15 @@ export default class ReusablePage extends ConcurrencyImplementation { | ||||
| 
 | ||||
|           close: async () => { | ||||
|             this.openInstances -= 1; // decrement first in case of error
 | ||||
|             if (resources?.page != null) { | ||||
|               if (resources.page.repairRequested || (Date.now() - (resources.page.createdAt || 0) > maxAgeMs)) { | ||||
|                 resources.page = await this.repairPage(resources.page); | ||||
|               } else { | ||||
|                 resources.page.free = true; | ||||
|               } | ||||
|             } | ||||
| 
 | ||||
|             if (this.repairRequested || this.currentPage?.repairRequested || (Date.now() - this.pageCreatedAt > maxAgeMs)) { | ||||
|             if (this.repairRequested) { | ||||
|               await this.repair(); | ||||
|             } | ||||
|           }, | ||||
| @ -108,9 +152,7 @@ export default class ReusablePage extends ConcurrencyImplementation { | ||||
|       close: async () => {}, | ||||
| 
 | ||||
|       repair: async () => { | ||||
|         console.log('Repair requested'); | ||||
|         this.repairRequested = true; | ||||
|         await this.repair(); | ||||
|         await this.repairPage(resources.page); | ||||
|       }, | ||||
|     }; | ||||
|   } | ||||
|  | ||||
| @ -51,8 +51,6 @@ class Server { | ||||
|     this.server.listen(config.SERVER.HTTP_PORT, () => { | ||||
|       console.log(`Mempool Unfurl Server is running on port ${config.SERVER.HTTP_PORT}`); | ||||
|     }); | ||||
| 
 | ||||
|     this.initClusterPages(); | ||||
|   } | ||||
| 
 | ||||
|   async stopServer() { | ||||
| @ -70,16 +68,7 @@ class Server { | ||||
|     this.app.get('*', (req, res) => { return this.renderHTML(req, res) }) | ||||
|   } | ||||
| 
 | ||||
|   async initClusterPages() { | ||||
|     for (let i = 0; i < config.PUPPETEER.CLUSTER_SIZE; i++) { | ||||
|       this.cluster?.execute({ action: 'init' }); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async clusterTask({ page, data: { url, path, action } }) { | ||||
|     if (action === 'init') { | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       const urlParts = parseLanguageUrl(path); | ||||
|       if (page.language !== urlParts.lang) { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user