implement chunked worker thread messaging

This commit is contained in:
Mononaut 2023-06-09 19:51:03 -04:00
parent 9a99ee6486
commit f0c8d27435
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
2 changed files with 52 additions and 17 deletions

View File

@ -5,6 +5,8 @@ import config from '../config';
import { Worker } from 'worker_threads';
import path from 'path';
const MAX_WORKER_MESSAGE_SIZE = 100_000;
class MempoolBlocks {
private mempoolBlocks: MempoolBlockWithTransactions[] = [];
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
@ -216,10 +218,15 @@ class MempoolBlocks {
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const strippedMempool: Map<number, CompactThreadTransaction> = new Map();
const strippedMempoolChunks: CompactThreadTransaction[][] = [];
let strippedMempoolChunk: CompactThreadTransaction[] = [];
Object.values(newMempool).forEach(entry => {
if (strippedMempoolChunk.length >= MAX_WORKER_MESSAGE_SIZE) {
strippedMempoolChunks.push(strippedMempoolChunk);
strippedMempoolChunk = [];
}
if (entry.uid != null) {
strippedMempool.set(entry.uid, {
strippedMempoolChunk.push({
uid: entry.uid,
fee: entry.fee,
weight: (entry.adjustedVsize * 4),
@ -230,6 +237,7 @@ class MempoolBlocks {
});
}
});
strippedMempoolChunks.push(strippedMempoolChunk);
// (re)initialize tx selection worker thread
if (!this.txSelectionWorker) {
@ -254,7 +262,13 @@ class MempoolBlocks {
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
this.txSelectionWorker.postMessage({ type: 'clear' });
for (const [index, chunk] of strippedMempoolChunks.entries()) {
const lastChunk = index === strippedMempoolChunks.length - 1;
this.txSelectionWorker.postMessage({ type: lastChunk ? 'execute' : 'chunk', added: chunk });
// yield back to the event loop
await Common.sleep$(0);
}
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
// clean up thread error listener
@ -284,8 +298,14 @@ class MempoolBlocks {
const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[];
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const addedStripped: CompactThreadTransaction[] = added.filter(entry => entry.uid != null).map(entry => {
return {
const addedStrippedChunks: CompactThreadTransaction[][] = [];
let addedStripped: CompactThreadTransaction[] = [];
added.filter(entry => entry.uid != null).forEach(entry => {
if (addedStripped.length >= MAX_WORKER_MESSAGE_SIZE) {
addedStrippedChunks.push(addedStripped);
addedStripped = [];
}
addedStripped.push({
uid: entry.uid || 0,
fee: entry.fee,
weight: (entry.adjustedVsize * 4),
@ -293,8 +313,9 @@ class MempoolBlocks {
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
effectiveFeePerVsize: entry.effectiveFeePerVsize || entry.adjustedFeePerVsize || entry.feePerVsize,
inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[],
};
});
});
addedStrippedChunks.push(addedStripped);
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
@ -306,7 +327,19 @@ class MempoolBlocks {
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids });
const multipleChunks = addedStrippedChunks.length > 1;
if (multipleChunks) {
for (const chunk of addedStrippedChunks) {
this.txSelectionWorker.postMessage({ type: 'chunk', added: chunk });
// yield back to the event loop
await Common.sleep$(0);
}
}
this.txSelectionWorker.postMessage({
type: 'execute',
added: multipleChunks ? [] : addedStrippedChunks[0] || [],
removed: removedUids
});
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
this.removeUids(removedUids);

View File

@ -8,22 +8,24 @@ let mempool: Map<number, CompactThreadTransaction> = new Map();
if (parentPort) {
parentPort.on('message', (params) => {
if (params.type === 'set') {
mempool = params.mempool;
} else if (params.type === 'update') {
params.added.forEach(tx => {
if (params.type === 'clear') {
mempool = new Map();
} else {
params.added?.forEach(tx => {
mempool.set(tx.uid, tx);
});
params.removed.forEach(uid => {
params.removed?.forEach(uid => {
mempool.delete(uid);
});
}
const { blocks, rates, clusters } = makeBlockTemplates(mempool);
// return the result to main thread.
if (parentPort) {
parentPort.postMessage({ blocks, rates, clusters });
if (params.type === 'execute') {
const { blocks, rates, clusters } = makeBlockTemplates(mempool);
// return the result to main thread.
if (parentPort) {
parentPort.postMessage({ blocks, rates, clusters });
}
}
});
}