From 4d0637768d98c20dc8eb2a67d8be99d49b9062d6 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 6 Dec 2022 05:51:44 +0900 Subject: [PATCH] Refactor advanced gbt to minimize inter-thread comms --- backend/src/api/mempool-blocks.ts | 152 ++++++++++++++++++---- backend/src/api/mempool.ts | 2 +- backend/src/api/tx-selection-worker.ts | 170 +++++++++---------------- backend/src/api/websocket-handler.ts | 8 +- backend/src/mempool.interfaces.ts | 15 ++- 5 files changed, 211 insertions(+), 136 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index e8ab48230..62bdc8f1b 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,17 +1,14 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; -import { StaticPool } from 'node-worker-threads-pool'; +import { Worker } from 'worker_threads'; import path from 'path'; class MempoolBlocks { private mempoolBlocks: MempoolBlockWithTransactions[] = []; private mempoolBlockDeltas: MempoolBlockDelta[] = []; - private makeTemplatesPool = new StaticPool({ - size: 1, - task: path.resolve(__dirname, './tx-selection-worker.js'), - }); + private txSelectionWorker: Worker | null = null; constructor() {} @@ -146,27 +143,136 @@ class MempoolBlocks { return mempoolBlockDeltas; } - public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit: number | null = null, condenseRest = false): Promise { - const { mempool, blocks } = await this.makeTemplatesPool.exec({ mempool: newMempool, blockLimit, weightLimit, condenseRest }); - const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, blocks); - - // copy CPFP info across to main thread's mempool - Object.keys(newMempool).forEach((txid) => { - if (newMempool[txid] && mempool[txid]) { - newMempool[txid].effectiveFeePerVsize = mempool[txid].effectiveFeePerVsize; - newMempool[txid].ancestors = mempool[txid].ancestors; - newMempool[txid].descendants = mempool[txid].descendants; - newMempool[txid].bestDescendant = mempool[txid].bestDescendant; - newMempool[txid].cpfpChecked = mempool[txid].cpfpChecked; - } + public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }): Promise { + // prepare a stripped down version of the mempool with only the minimum necessary data + // to reduce the overhead of passing this data to the worker thread + const strippedMempool: { [txid: string]: ThreadTransaction } = {}; + Object.values(newMempool).forEach(entry => { + strippedMempool[entry.txid] = { + txid: entry.txid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + vin: entry.vin.map(v => v.txid), + }; }); - this.mempoolBlocks = blocks; + if (!this.txSelectionWorker) { + this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js')); + this.txSelectionWorker.once('error', () => { + this.txSelectionWorker = null; + }); + this.txSelectionWorker.once('exit', () => { + this.txSelectionWorker = null; + }); + } + + // run the block construction algorithm in a separate thread, and wait for a result + const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { + this.txSelectionWorker?.once('message', (result): void => { + resolve(result); + }); + }); + this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); + const { blocks, clusters } = await workerResultPromise; + + this.processBlockTemplates(newMempool, blocks, clusters); + } + + public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise { + if (!this.txSelectionWorker) { + // need to reset the worker + return this.makeBlockTemplates(newMempool); + } + // prepare a stripped down version of the mempool with only the minimum necessary data + // to reduce the overhead of passing this data to the worker thread + const addedStripped: ThreadTransaction[] = added.map(entry => { + return { + txid: entry.txid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + vin: entry.vin.map(v => v.txid), + }; + }); + + // run the block construction algorithm in a separate thread, and wait for a result + const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve) => { + this.txSelectionWorker?.once('message', (result): void => { + resolve(result); + }); + }); + this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); + const { blocks, clusters } = await workerResultPromise; + + this.processBlockTemplates(newMempool, blocks, clusters); + } + + private processBlockTemplates(mempool, blocks, clusters): void { + // update this thread's mempool with the results + blocks.forEach(block => { + block.forEach(tx => { + if (tx.txid in mempool) { + if (tx.effectiveFeePerVsize != null) { + mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize; + } + if (tx.cpfpRoot && tx.cpfpRoot in clusters) { + const ancestors: Ancestor[] = []; + const descendants: Ancestor[] = []; + const cluster = clusters[tx.cpfpRoot]; + let matched = false; + cluster.forEach(txid => { + if (txid === tx.txid) { + matched = true; + } else { + const relative = { + txid: txid, + fee: mempool[txid].fee, + weight: mempool[txid].weight, + }; + if (matched) { + descendants.push(relative); + } else { + ancestors.push(relative); + } + } + }); + mempool[tx.txid].ancestors = ancestors; + mempool[tx.txid].descendants = descendants; + mempool[tx.txid].bestDescendant = null; + } + mempool[tx.txid].cpfpChecked = tx.cpfpChecked; + } + }); + }); + + // unpack the condensed blocks into proper mempool blocks + const mempoolBlocks = blocks.map((transactions, blockIndex) => { + return this.dataToMempoolBlocks(transactions.map(tx => { + return mempool[tx.txid] || null; + }).filter(tx => !!tx), undefined, undefined, blockIndex); + }); + + const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks); + + this.mempoolBlocks = mempoolBlocks; this.mempoolBlockDeltas = deltas; } private dataToMempoolBlocks(transactions: TransactionExtended[], - blockSize: number, blockWeight: number, blocksIndex: number): MempoolBlockWithTransactions { + blockSize: number | undefined, blockWeight: number | undefined, blocksIndex: number): MempoolBlockWithTransactions { + let totalSize = blockSize || 0; + let totalWeight = blockWeight || 0; + if (blockSize === undefined && blockWeight === undefined) { + totalSize = 0; + totalWeight = 0; + transactions.forEach(tx => { + totalSize += tx.size; + totalWeight += tx.weight; + }); + } let rangeLength = 4; if (blocksIndex === 0) { rangeLength = 8; @@ -177,8 +283,8 @@ class MempoolBlocks { rangeLength = 8; } return { - blockSize: blockSize, - blockVSize: blockWeight / 4, + blockSize: totalSize, + blockVSize: totalWeight / 4, nTx: transactions.length, totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0), medianFee: Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE), diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 584ddf816..717f4eebb 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -21,7 +21,7 @@ class Mempool { private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => void) | undefined; private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], - deletedTransactions: TransactionExtended[]) => void) | undefined; + deletedTransactions: TransactionExtended[]) => Promise) | undefined; private txPerSecondArray: number[] = []; private txPerSecond: number = 0; diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index ca40af84f..7297cbe88 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -1,17 +1,30 @@ import config from '../config'; import logger from '../logger'; -import { TransactionExtended, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; +import { ThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; import { PairingHeap } from '../utils/pairing-heap'; import { Common } from './common'; import { parentPort } from 'worker_threads'; +let mempool: { [txid: string]: ThreadTransaction } = {}; + if (parentPort) { - parentPort.on('message', (params: { mempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit: number | null, condenseRest: boolean}) => { - const { mempool, blocks } = makeBlockTemplates(params); + parentPort.on('message', (params) => { + if (params.type === 'set') { + mempool = params.mempool; + } else if (params.type === 'update') { + params.added.forEach(tx => { + mempool[tx.txid] = tx; + }); + params.removed.forEach(txid => { + delete mempool[txid]; + }); + } + + const { blocks, clusters } = makeBlockTemplates(mempool); // return the result to main thread. if (parentPort) { - parentPort.postMessage({ mempool, blocks }); + parentPort.postMessage({ blocks, clusters }); } }); } @@ -19,35 +32,24 @@ if (parentPort) { /* * Build projected mempool blocks using an approximation of the transaction selection algorithm from Bitcoin Core * (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp) -* -* blockLimit: number of blocks to build in total. -* weightLimit: maximum weight of transactions to consider using the selection algorithm. -* if weightLimit is significantly lower than the mempool size, results may start to diverge from getBlockTemplate -* condenseRest: whether to ignore excess transactions or append them to the final block. */ -function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: { mempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit?: number | null, condenseRest?: boolean | null }) - : { mempool: { [txid: string]: TransactionExtended }, blocks: MempoolBlockWithTransactions[] } { +function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) + : { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } } { const start = Date.now(); const auditPool: { [txid: string]: AuditTransaction } = {}; const mempoolArray: AuditTransaction[] = []; - const restOfArray: TransactionExtended[] = []; + const restOfArray: ThreadTransaction[] = []; + const cpfpClusters: { [root: string]: string[] } = {}; - let weight = 0; - const maxWeight = weightLimit ? Math.max(4_000_000 * blockLimit, weightLimit) : Infinity; // grab the top feerate txs up to maxWeight Object.values(mempool).sort((a, b) => b.feePerVsize - a.feePerVsize).forEach(tx => { - weight += tx.weight; - if (weight >= maxWeight) { - restOfArray.push(tx); - return; - } // initializing everything up front helps V8 optimize property access later auditPool[tx.txid] = { txid: tx.txid, fee: tx.fee, - size: tx.size, weight: tx.weight, feePerVsize: tx.feePerVsize, + effectiveFeePerVsize: tx.feePerVsize, vin: tx.vin, relativesSet: false, ancestorMap: new Map(), @@ -74,7 +76,7 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: // Build blocks by greedily choosing the highest feerate package // (i.e. the package rooted in the transaction with the best ancestor score) - const blocks: MempoolBlockWithTransactions[] = []; + const blocks: ThreadTransaction[][] = []; let blockWeight = 4000; let blockSize = 0; let transactions: AuditTransaction[] = []; @@ -82,7 +84,7 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: let overflow: AuditTransaction[] = []; let failures = 0; let top = 0; - while ((top < mempoolArray.length || !modified.isEmpty()) && (condenseRest || blocks.length < blockLimit)) { + while ((top < mempoolArray.length || !modified.isEmpty())) { // skip invalid transactions while (top < mempoolArray.length && (mempoolArray[top].used || mempoolArray[top].modified)) { top++; @@ -107,9 +109,13 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: // Check if the package fits into this block if (blockWeight + nextTx.ancestorWeight < config.MEMPOOL.BLOCK_WEIGHT_UNITS) { const ancestors: AuditTransaction[] = Array.from(nextTx.ancestorMap.values()); - const descendants: AuditTransaction[] = []; // sort ancestors by dependency graph (equivalent to sorting by ascending ancestor count) const sortedTxSet = [...ancestors.sort((a, b) => { return (a.ancestorMap.size || 0) - (b.ancestorMap.size || 0); }), nextTx]; + let isCluster = false; + if (sortedTxSet.length > 1) { + cpfpClusters[nextTx.txid] = sortedTxSet.map(tx => tx.txid); + isCluster = true; + } const effectiveFeeRate = nextTx.ancestorFee / (nextTx.ancestorWeight / 4); const used: AuditTransaction[] = []; while (sortedTxSet.length) { @@ -119,21 +125,9 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: ancestor.usedBy = nextTx.txid; // update original copy of this tx with effective fee rate & relatives data mempoolTx.effectiveFeePerVsize = effectiveFeeRate; - mempoolTx.ancestors = sortedTxSet.map((a) => { - return { - txid: a.txid, - fee: a.fee, - weight: a.weight, - }; - }).reverse(); - mempoolTx.descendants = descendants.map((a) => { - return { - txid: a.txid, - fee: a.fee, - weight: a.weight, - }; - }); - descendants.push(ancestor); + if (isCluster) { + mempoolTx.cpfpRoot = nextTx.txid; + } mempoolTx.cpfpChecked = true; transactions.push(ancestor); blockSize += ancestor.size; @@ -159,10 +153,10 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: // this block is full const exceededPackageTries = failures > 1000 && blockWeight > (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000); const queueEmpty = top >= mempoolArray.length && modified.isEmpty(); - if ((exceededPackageTries || queueEmpty) && (!condenseRest || blocks.length < blockLimit - 1)) { + if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { // construct this block if (transactions.length) { - blocks.push(dataToMempoolBlocks(transactions.map(t => mempool[t.txid]), blockSize, blockWeight, blocks.length)); + blocks.push(transactions.map(t => mempool[t.txid])); } // reset for the next block transactions = []; @@ -181,55 +175,40 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: overflow = []; } } - if (condenseRest) { - // pack any leftover transactions into the last block - for (const tx of overflow) { - if (!tx || tx?.used) { - continue; - } - blockWeight += tx.weight; - blockSize += tx.size; - const mempoolTx = mempool[tx.txid]; - // update original copy of this tx with effective fee rate & relatives data - mempoolTx.effectiveFeePerVsize = tx.score; - mempoolTx.ancestors = (Array.from(tx.ancestorMap?.values()) as AuditTransaction[]).map((a) => { - return { - txid: a.txid, - fee: a.fee, - weight: a.weight, - }; - }); - mempoolTx.bestDescendant = null; - mempoolTx.cpfpChecked = true; - transactions.push(tx); - tx.used = true; + // pack any leftover transactions into the last block + for (const tx of overflow) { + if (!tx || tx?.used) { + continue; } - const blockTransactions = transactions.map(t => mempool[t.txid]); - restOfArray.forEach(tx => { - blockWeight += tx.weight; - blockSize += tx.size; - tx.effectiveFeePerVsize = tx.feePerVsize; - tx.cpfpChecked = false; - tx.ancestors = []; - tx.bestDescendant = null; - blockTransactions.push(tx); - }); - if (blockTransactions.length) { - blocks.push(dataToMempoolBlocks(blockTransactions, blockSize, blockWeight, blocks.length)); + blockWeight += tx.weight; + const mempoolTx = mempool[tx.txid]; + // update original copy of this tx with effective fee rate & relatives data + mempoolTx.effectiveFeePerVsize = tx.score; + if (tx.ancestorMap.size > 0) { + cpfpClusters[tx.txid] = Array.from(tx.ancestorMap?.values()).map(a => a.txid); + mempoolTx.cpfpRoot = tx.txid; } - transactions = []; - } else if (transactions.length) { - blocks.push(dataToMempoolBlocks(transactions.map(t => mempool[t.txid]), blockSize, blockWeight, blocks.length)); + mempoolTx.cpfpChecked = true; + transactions.push(tx); + tx.used = true; } + const blockTransactions = transactions.map(t => mempool[t.txid]); + restOfArray.forEach(tx => { + blockWeight += tx.weight; + tx.effectiveFeePerVsize = tx.feePerVsize; + tx.cpfpChecked = false; + blockTransactions.push(tx); + }); + if (blockTransactions.length) { + blocks.push(blockTransactions); + } + transactions = []; const end = Date.now(); const time = end - start; logger.debug('Mempool templates calculated in ' + time / 1000 + ' seconds'); - return { - mempool, - blocks - }; + return { blocks, clusters: cpfpClusters }; } // traverse in-mempool ancestors @@ -239,9 +218,9 @@ function setRelatives( mempool: { [txid: string]: AuditTransaction }, ): void { for (const parent of tx.vin) { - const parentTx = mempool[parent.txid]; - if (parentTx && !tx.ancestorMap?.has(parent.txid)) { - tx.ancestorMap.set(parent.txid, parentTx); + const parentTx = mempool[parent]; + if (parentTx && !tx.ancestorMap?.has(parent)) { + tx.ancestorMap.set(parent, parentTx); parentTx.children.add(tx); // visit each node only once if (!parentTx.relativesSet) { @@ -312,27 +291,4 @@ function updateDescendants( }); } } -} - -function dataToMempoolBlocks(transactions: TransactionExtended[], - blockSize: number, blockWeight: number, blocksIndex: number): MempoolBlockWithTransactions { - let rangeLength = 4; - if (blocksIndex === 0) { - rangeLength = 8; - } - if (transactions.length > 4000) { - rangeLength = 6; - } else if (transactions.length > 10000) { - rangeLength = 8; - } - return { - blockSize: blockSize, - blockVSize: blockWeight / 4, - nTx: transactions.length, - totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0), - medianFee: Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE), - feeRange: Common.getFeesInRange(transactions, rangeLength), - transactionIds: transactions.map((tx) => tx.txid), - transactions: transactions.map((tx) => Common.stripTransaction(tx)), - }; } \ No newline at end of file diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index c78c93544..b6f32aa05 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -251,7 +251,7 @@ class WebsocketHandler { } if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { - await mempoolBlocks.makeBlockTemplates(newMempool, 8, null, true); + await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid)); } else { mempoolBlocks.updateMempoolBlocks(newMempool); } @@ -419,7 +419,7 @@ class WebsocketHandler { const _memPool = memPool.getMempool(); if (config.MEMPOOL.ADVANCED_GBT_AUDIT) { - await mempoolBlocks.makeBlockTemplates(_memPool, 2); + await mempoolBlocks.makeBlockTemplates(_memPool); } else { mempoolBlocks.updateMempoolBlocks(_memPool); } @@ -462,13 +462,15 @@ class WebsocketHandler { } } + const removed: string[] = []; // Update mempool to remove transactions included in the new block for (const txId of txIds) { delete _memPool[txId]; + removed.push(txId); } if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { - await mempoolBlocks.makeBlockTemplates(_memPool, 8, null, true); + await mempoolBlocks.updateBlockTemplates(_memPool, [], removed); } else { mempoolBlocks.updateMempoolBlocks(_memPool); } diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 11de304b8..046083322 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -81,10 +81,10 @@ export interface TransactionExtended extends IEsploraApi.Transaction { export interface AuditTransaction { txid: string; fee: number; - size: number; weight: number; feePerVsize: number; - vin: IEsploraApi.Vin[]; + effectiveFeePerVsize: number; + vin: string[]; relativesSet: boolean; ancestorMap: Map; children: Set; @@ -96,6 +96,17 @@ export interface AuditTransaction { modifiedNode: HeapNode; } +export interface ThreadTransaction { + txid: string; + fee: number; + weight: number; + feePerVsize: number; + effectiveFeePerVsize?: number; + vin: string[]; + cpfpRoot?: string; + cpfpChecked?: boolean; +} + export interface Ancestor { txid: string; weight: number;