From f0c8d27435b75e57cc31fe0852c411cddb0ebffc Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 9 Jun 2023 19:51:03 -0400 Subject: [PATCH] implement chunked worker thread messaging --- backend/src/api/mempool-blocks.ts | 47 ++++++++++++++++++++++---- backend/src/api/tx-selection-worker.ts | 22 ++++++------ 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 9b5da8b3b..07f5bc44c 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -5,6 +5,8 @@ import config from '../config'; import { Worker } from 'worker_threads'; import path from 'path'; +const MAX_WORKER_MESSAGE_SIZE = 100_000; + class MempoolBlocks { private mempoolBlocks: MempoolBlockWithTransactions[] = []; private mempoolBlockDeltas: MempoolBlockDelta[] = []; @@ -216,10 +218,15 @@ class MempoolBlocks { // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread - const strippedMempool: Map = new Map(); + const strippedMempoolChunks: CompactThreadTransaction[][] = []; + let strippedMempoolChunk: CompactThreadTransaction[] = []; Object.values(newMempool).forEach(entry => { + if (strippedMempoolChunk.length >= MAX_WORKER_MESSAGE_SIZE) { + strippedMempoolChunks.push(strippedMempoolChunk); + strippedMempoolChunk = []; + } if (entry.uid != null) { - strippedMempool.set(entry.uid, { + strippedMempoolChunk.push({ uid: entry.uid, fee: entry.fee, weight: (entry.adjustedVsize * 4), @@ -230,6 +237,7 @@ class MempoolBlocks { }); } }); + strippedMempoolChunks.push(strippedMempoolChunk); // (re)initialize tx selection worker thread if (!this.txSelectionWorker) { @@ -254,7 +262,13 @@ class MempoolBlocks { }); this.txSelectionWorker?.once('error', reject); }); - this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); + this.txSelectionWorker.postMessage({ type: 'clear' }); + for (const [index, chunk] of strippedMempoolChunks.entries()) { + const lastChunk = index === strippedMempoolChunks.length - 1; + this.txSelectionWorker.postMessage({ type: lastChunk ? 'execute' : 'chunk', added: chunk }); + // yield back to the event loop + await Common.sleep$(0); + } const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); // clean up thread error listener @@ -284,8 +298,14 @@ class MempoolBlocks { const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[]; // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread - const addedStripped: CompactThreadTransaction[] = added.filter(entry => entry.uid != null).map(entry => { - return { + const addedStrippedChunks: CompactThreadTransaction[][] = []; + let addedStripped: CompactThreadTransaction[] = []; + added.filter(entry => entry.uid != null).forEach(entry => { + if (addedStripped.length >= MAX_WORKER_MESSAGE_SIZE) { + addedStrippedChunks.push(addedStripped); + addedStripped = []; + } + addedStripped.push({ uid: entry.uid || 0, fee: entry.fee, weight: (entry.adjustedVsize * 4), @@ -293,8 +313,9 @@ class MempoolBlocks { feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize, effectiveFeePerVsize: entry.effectiveFeePerVsize || entry.adjustedFeePerVsize || entry.feePerVsize, inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], - }; + }); }); + addedStrippedChunks.push(addedStripped); // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; @@ -306,7 +327,19 @@ class MempoolBlocks { }); this.txSelectionWorker?.once('error', reject); }); - this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids }); + const multipleChunks = addedStrippedChunks.length > 1; + if (multipleChunks) { + for (const chunk of addedStrippedChunks) { + this.txSelectionWorker.postMessage({ type: 'chunk', added: chunk }); + // yield back to the event loop + await Common.sleep$(0); + } + } + this.txSelectionWorker.postMessage({ + type: 'execute', + added: multipleChunks ? [] : addedStrippedChunks[0] || [], + removed: removedUids + }); const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); this.removeUids(removedUids); diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index 0acc2f65e..7a281a704 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -8,22 +8,24 @@ let mempool: Map = new Map(); if (parentPort) { parentPort.on('message', (params) => { - if (params.type === 'set') { - mempool = params.mempool; - } else if (params.type === 'update') { - params.added.forEach(tx => { + if (params.type === 'clear') { + mempool = new Map(); + } else { + params.added?.forEach(tx => { mempool.set(tx.uid, tx); }); - params.removed.forEach(uid => { + params.removed?.forEach(uid => { mempool.delete(uid); }); } - - const { blocks, rates, clusters } = makeBlockTemplates(mempool); - // return the result to main thread. - if (parentPort) { - parentPort.postMessage({ blocks, rates, clusters }); + if (params.type === 'execute') { + const { blocks, rates, clusters } = makeBlockTemplates(mempool); + + // return the result to main thread. + if (parentPort) { + parentPort.postMessage({ blocks, rates, clusters }); + } } }); }