Refactor advanced gbt to minimize inter-thread comms

This commit is contained in:
Mononaut
2022-12-06 05:51:44 +09:00
parent 07987ff4b6
commit 4d0637768d
5 changed files with 211 additions and 136 deletions

View File

@@ -1,17 +1,14 @@
import logger from '../logger';
import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta } from '../mempool.interfaces';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces';
import { Common } from './common';
import config from '../config';
import { StaticPool } from 'node-worker-threads-pool';
import { Worker } from 'worker_threads';
import path from 'path';
class MempoolBlocks {
private mempoolBlocks: MempoolBlockWithTransactions[] = [];
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
private makeTemplatesPool = new StaticPool({
size: 1,
task: path.resolve(__dirname, './tx-selection-worker.js'),
});
private txSelectionWorker: Worker | null = null;
constructor() {}
@@ -146,27 +143,136 @@ class MempoolBlocks {
return mempoolBlockDeltas;
}
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit: number | null = null, condenseRest = false): Promise<void> {
const { mempool, blocks } = await this.makeTemplatesPool.exec({ mempool: newMempool, blockLimit, weightLimit, condenseRest });
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, blocks);
// copy CPFP info across to main thread's mempool
Object.keys(newMempool).forEach((txid) => {
if (newMempool[txid] && mempool[txid]) {
newMempool[txid].effectiveFeePerVsize = mempool[txid].effectiveFeePerVsize;
newMempool[txid].ancestors = mempool[txid].ancestors;
newMempool[txid].descendants = mempool[txid].descendants;
newMempool[txid].bestDescendant = mempool[txid].bestDescendant;
newMempool[txid].cpfpChecked = mempool[txid].cpfpChecked;
}
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }): Promise<void> {
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
Object.values(newMempool).forEach(entry => {
strippedMempool[entry.txid] = {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
this.mempoolBlocks = blocks;
if (!this.txSelectionWorker) {
this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js'));
this.txSelectionWorker.once('error', () => {
this.txSelectionWorker = null;
});
this.txSelectionWorker.once('exit', () => {
this.txSelectionWorker = null;
});
}
// run the block construction algorithm in a separate thread, and wait for a result
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => {
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
});
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters);
}
public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise<void> {
if (!this.txSelectionWorker) {
// need to reset the worker
return this.makeBlockTemplates(newMempool);
}
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const addedStripped: ThreadTransaction[] = added.map(entry => {
return {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
// run the block construction algorithm in a separate thread, and wait for a result
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => {
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
});
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters);
}
private processBlockTemplates(mempool, blocks, clusters): void {
// update this thread's mempool with the results
blocks.forEach(block => {
block.forEach(tx => {
if (tx.txid in mempool) {
if (tx.effectiveFeePerVsize != null) {
mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize;
}
if (tx.cpfpRoot && tx.cpfpRoot in clusters) {
const ancestors: Ancestor[] = [];
const descendants: Ancestor[] = [];
const cluster = clusters[tx.cpfpRoot];
let matched = false;
cluster.forEach(txid => {
if (txid === tx.txid) {
matched = true;
} else {
const relative = {
txid: txid,
fee: mempool[txid].fee,
weight: mempool[txid].weight,
};
if (matched) {
descendants.push(relative);
} else {
ancestors.push(relative);
}
}
});
mempool[tx.txid].ancestors = ancestors;
mempool[tx.txid].descendants = descendants;
mempool[tx.txid].bestDescendant = null;
}
mempool[tx.txid].cpfpChecked = tx.cpfpChecked;
}
});
});
// unpack the condensed blocks into proper mempool blocks
const mempoolBlocks = blocks.map((transactions, blockIndex) => {
return this.dataToMempoolBlocks(transactions.map(tx => {
return mempool[tx.txid] || null;
}).filter(tx => !!tx), undefined, undefined, blockIndex);
});
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks);
this.mempoolBlocks = mempoolBlocks;
this.mempoolBlockDeltas = deltas;
}
private dataToMempoolBlocks(transactions: TransactionExtended[],
blockSize: number, blockWeight: number, blocksIndex: number): MempoolBlockWithTransactions {
blockSize: number | undefined, blockWeight: number | undefined, blocksIndex: number): MempoolBlockWithTransactions {
let totalSize = blockSize || 0;
let totalWeight = blockWeight || 0;
if (blockSize === undefined && blockWeight === undefined) {
totalSize = 0;
totalWeight = 0;
transactions.forEach(tx => {
totalSize += tx.size;
totalWeight += tx.weight;
});
}
let rangeLength = 4;
if (blocksIndex === 0) {
rangeLength = 8;
@@ -177,8 +283,8 @@ class MempoolBlocks {
rangeLength = 8;
}
return {
blockSize: blockSize,
blockVSize: blockWeight / 4,
blockSize: totalSize,
blockVSize: totalWeight / 4,
nTx: transactions.length,
totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0),
medianFee: Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE),