From 56b6f79f97c6227eec960b5d7983277eca2d3d7d Mon Sep 17 00:00:00 2001 From: Mononaut Date: Wed, 7 Dec 2022 14:51:26 -0600 Subject: [PATCH] improve thread error handling --- backend/src/api/mempool-blocks.ts | 51 ++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 62bdc8f1b..d94ed77bd 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -158,8 +158,11 @@ class MempoolBlocks { }; }); + // (re)initialize tx selection worker thread if (!this.txSelectionWorker) { this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js')); + // if the thread throws an unexpected error, or exits for any other reason, + // reset worker state so that it will be re-initialized on the next run this.txSelectionWorker.once('error', () => { this.txSelectionWorker = null; }); @@ -169,15 +172,25 @@ class MempoolBlocks { } // run the block construction algorithm in a separate thread, and wait for a result - const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { - this.txSelectionWorker?.once('message', (result): void => { - resolve(result); + let threadErrorListener; + try { + const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => { + threadErrorListener = reject; + this.txSelectionWorker?.once('message', (result): void => { + resolve(result); + }); + this.txSelectionWorker?.once('error', reject); }); - }); - this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); - const { blocks, clusters } = await workerResultPromise; + this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); + const { blocks, clusters } = await workerResultPromise; - this.processBlockTemplates(newMempool, blocks, clusters); + this.processBlockTemplates(newMempool, blocks, clusters); + + // clean up thread error listener + this.txSelectionWorker?.removeListener('error', threadErrorListener); + } catch (e) { + logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); + } } public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise { @@ -199,15 +212,25 @@ class MempoolBlocks { }); // run the block construction algorithm in a separate thread, and wait for a result - const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { - this.txSelectionWorker?.once('message', (result): void => { - resolve(result); + let threadErrorListener; + try { + const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => { + threadErrorListener = reject; + this.txSelectionWorker?.once('message', (result): void => { + resolve(result); + }); + this.txSelectionWorker?.once('error', reject); }); - }); - this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); - const { blocks, clusters } = await workerResultPromise; + this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); + const { blocks, clusters } = await workerResultPromise; - this.processBlockTemplates(newMempool, blocks, clusters); + this.processBlockTemplates(newMempool, blocks, clusters); + + // clean up thread error listener + this.txSelectionWorker?.removeListener('error', threadErrorListener); + } catch (e) { + logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); + } } private processBlockTemplates(mempool, blocks, clusters): void {