|
|
|
@@ -5,6 +5,8 @@ import config from '../config';
|
|
|
|
import { Worker } from 'worker_threads';
|
|
|
|
import { Worker } from 'worker_threads';
|
|
|
|
import path from 'path';
|
|
|
|
import path from 'path';
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const MAX_WORKER_MESSAGE_SIZE = 100_000;
|
|
|
|
|
|
|
|
|
|
|
|
class MempoolBlocks {
|
|
|
|
class MempoolBlocks {
|
|
|
|
private mempoolBlocks: MempoolBlockWithTransactions[] = [];
|
|
|
|
private mempoolBlocks: MempoolBlockWithTransactions[] = [];
|
|
|
|
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
|
|
|
|
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
|
|
|
|
@@ -216,10 +218,15 @@ class MempoolBlocks {
|
|
|
|
|
|
|
|
|
|
|
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
|
|
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
|
|
|
// to reduce the overhead of passing this data to the worker thread
|
|
|
|
// to reduce the overhead of passing this data to the worker thread
|
|
|
|
const strippedMempool: Map<number, CompactThreadTransaction> = new Map();
|
|
|
|
const strippedMempoolChunks: CompactThreadTransaction[][] = [];
|
|
|
|
|
|
|
|
let strippedMempoolChunk: CompactThreadTransaction[] = [];
|
|
|
|
Object.values(newMempool).forEach(entry => {
|
|
|
|
Object.values(newMempool).forEach(entry => {
|
|
|
|
|
|
|
|
if (strippedMempoolChunk.length >= MAX_WORKER_MESSAGE_SIZE) {
|
|
|
|
|
|
|
|
strippedMempoolChunks.push(strippedMempoolChunk);
|
|
|
|
|
|
|
|
strippedMempoolChunk = [];
|
|
|
|
|
|
|
|
}
|
|
|
|
if (entry.uid != null) {
|
|
|
|
if (entry.uid != null) {
|
|
|
|
strippedMempool.set(entry.uid, {
|
|
|
|
strippedMempoolChunk.push({
|
|
|
|
uid: entry.uid,
|
|
|
|
uid: entry.uid,
|
|
|
|
fee: entry.fee,
|
|
|
|
fee: entry.fee,
|
|
|
|
weight: (entry.adjustedVsize * 4),
|
|
|
|
weight: (entry.adjustedVsize * 4),
|
|
|
|
@@ -230,6 +237,7 @@ class MempoolBlocks {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
strippedMempoolChunks.push(strippedMempoolChunk);
|
|
|
|
|
|
|
|
|
|
|
|
// (re)initialize tx selection worker thread
|
|
|
|
// (re)initialize tx selection worker thread
|
|
|
|
if (!this.txSelectionWorker) {
|
|
|
|
if (!this.txSelectionWorker) {
|
|
|
|
@@ -254,7 +262,13 @@ class MempoolBlocks {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
this.txSelectionWorker?.once('error', reject);
|
|
|
|
this.txSelectionWorker?.once('error', reject);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
|
|
|
|
this.txSelectionWorker.postMessage({ type: 'clear' });
|
|
|
|
|
|
|
|
for (const [index, chunk] of strippedMempoolChunks.entries()) {
|
|
|
|
|
|
|
|
const lastChunk = index === strippedMempoolChunks.length - 1;
|
|
|
|
|
|
|
|
this.txSelectionWorker.postMessage({ type: lastChunk ? 'execute' : 'chunk', added: chunk });
|
|
|
|
|
|
|
|
// yield back to the event loop
|
|
|
|
|
|
|
|
await Common.sleep$(0);
|
|
|
|
|
|
|
|
}
|
|
|
|
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
|
|
|
|
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
|
|
|
|
|
|
|
|
|
|
|
|
// clean up thread error listener
|
|
|
|
// clean up thread error listener
|
|
|
|
@@ -284,8 +298,14 @@ class MempoolBlocks {
|
|
|
|
const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[];
|
|
|
|
const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[];
|
|
|
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
|
|
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
|
|
|
// to reduce the overhead of passing this data to the worker thread
|
|
|
|
// to reduce the overhead of passing this data to the worker thread
|
|
|
|
const addedStripped: CompactThreadTransaction[] = added.filter(entry => entry.uid != null).map(entry => {
|
|
|
|
const addedStrippedChunks: CompactThreadTransaction[][] = [];
|
|
|
|
return {
|
|
|
|
let addedStripped: CompactThreadTransaction[] = [];
|
|
|
|
|
|
|
|
added.filter(entry => entry.uid != null).forEach(entry => {
|
|
|
|
|
|
|
|
if (addedStripped.length >= MAX_WORKER_MESSAGE_SIZE) {
|
|
|
|
|
|
|
|
addedStrippedChunks.push(addedStripped);
|
|
|
|
|
|
|
|
addedStripped = [];
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
addedStripped.push({
|
|
|
|
uid: entry.uid || 0,
|
|
|
|
uid: entry.uid || 0,
|
|
|
|
fee: entry.fee,
|
|
|
|
fee: entry.fee,
|
|
|
|
weight: (entry.adjustedVsize * 4),
|
|
|
|
weight: (entry.adjustedVsize * 4),
|
|
|
|
@@ -293,8 +313,9 @@ class MempoolBlocks {
|
|
|
|
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
|
|
|
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
|
|
|
effectiveFeePerVsize: entry.effectiveFeePerVsize || entry.adjustedFeePerVsize || entry.feePerVsize,
|
|
|
|
effectiveFeePerVsize: entry.effectiveFeePerVsize || entry.adjustedFeePerVsize || entry.feePerVsize,
|
|
|
|
inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[],
|
|
|
|
inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[],
|
|
|
|
};
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
addedStrippedChunks.push(addedStripped);
|
|
|
|
|
|
|
|
|
|
|
|
// run the block construction algorithm in a separate thread, and wait for a result
|
|
|
|
// run the block construction algorithm in a separate thread, and wait for a result
|
|
|
|
let threadErrorListener;
|
|
|
|
let threadErrorListener;
|
|
|
|
@@ -306,7 +327,19 @@ class MempoolBlocks {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
this.txSelectionWorker?.once('error', reject);
|
|
|
|
this.txSelectionWorker?.once('error', reject);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids });
|
|
|
|
const multipleChunks = addedStrippedChunks.length > 1;
|
|
|
|
|
|
|
|
if (multipleChunks) {
|
|
|
|
|
|
|
|
for (const chunk of addedStrippedChunks) {
|
|
|
|
|
|
|
|
this.txSelectionWorker.postMessage({ type: 'chunk', added: chunk });
|
|
|
|
|
|
|
|
// yield back to the event loop
|
|
|
|
|
|
|
|
await Common.sleep$(0);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
this.txSelectionWorker.postMessage({
|
|
|
|
|
|
|
|
type: 'execute',
|
|
|
|
|
|
|
|
added: multipleChunks ? [] : addedStrippedChunks[0] || [],
|
|
|
|
|
|
|
|
removed: removedUids
|
|
|
|
|
|
|
|
});
|
|
|
|
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
|
|
|
|
const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise);
|
|
|
|
|
|
|
|
|
|
|
|
this.removeUids(removedUids);
|
|
|
|
this.removeUids(removedUids);
|
|
|
|
|