improve thread error handling

This commit is contained in:
Mononaut 2022-12-07 14:51:26 -06:00
parent 4d0637768d
commit 56b6f79f97
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E

View File

@ -158,8 +158,11 @@ class MempoolBlocks {
}; };
}); });
// (re)initialize tx selection worker thread
if (!this.txSelectionWorker) { if (!this.txSelectionWorker) {
this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js')); this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js'));
// if the thread throws an unexpected error, or exits for any other reason,
// reset worker state so that it will be re-initialized on the next run
this.txSelectionWorker.once('error', () => { this.txSelectionWorker.once('error', () => {
this.txSelectionWorker = null; this.txSelectionWorker = null;
}); });
@ -169,15 +172,25 @@ class MempoolBlocks {
} }
// run the block construction algorithm in a separate thread, and wait for a result // run the block construction algorithm in a separate thread, and wait for a result
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { let threadErrorListener;
this.txSelectionWorker?.once('message', (result): void => { try {
resolve(result); const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
this.txSelectionWorker?.once('error', reject);
}); });
}); this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); const { blocks, clusters } = await workerResultPromise;
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters); this.processBlockTemplates(newMempool, blocks, clusters);
// clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener);
} catch (e) {
logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
}
} }
public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise<void> { public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise<void> {
@ -199,15 +212,25 @@ class MempoolBlocks {
}); });
// run the block construction algorithm in a separate thread, and wait for a result // run the block construction algorithm in a separate thread, and wait for a result
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { let threadErrorListener;
this.txSelectionWorker?.once('message', (result): void => { try {
resolve(result); const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
this.txSelectionWorker?.once('error', reject);
}); });
}); this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); const { blocks, clusters } = await workerResultPromise;
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters); this.processBlockTemplates(newMempool, blocks, clusters);
// clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener);
} catch (e) {
logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
}
} }
private processBlockTemplates(mempool, blocks, clusters): void { private processBlockTemplates(mempool, blocks, clusters): void {