From 93f1a4e6d411c780f6c3db642f35305d594f0e7a Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 8 May 2023 19:03:39 -0600 Subject: [PATCH] Optimize main thread processing of GBT updates --- backend/src/api/mempool-blocks.ts | 202 +++++++++++++++---------- backend/src/api/tx-selection-worker.ts | 39 +++-- backend/src/mempool.interfaces.ts | 1 + 3 files changed, 145 insertions(+), 97 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 68da823c9..758e7e050 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,5 +1,5 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; import { Worker } from 'worker_threads'; @@ -90,19 +90,34 @@ class MempoolBlocks { private calculateMempoolBlocks(transactionsSorted: TransactionExtended[], prevBlocks: MempoolBlockWithTransactions[]): MempoolBlockWithTransactions[] { const mempoolBlocks: MempoolBlockWithTransactions[] = []; - let blockWeight = 0; let blockSize = 0; + let blockWeight = 0; + let blockVsize = 0; + let blockFees = 0; + const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; + let transactionIds: string[] = []; let transactions: TransactionExtended[] = []; transactionsSorted.forEach((tx) => { if (blockWeight + tx.weight <= config.MEMPOOL.BLOCK_WEIGHT_UNITS || mempoolBlocks.length === config.MEMPOOL.MEMPOOL_BLOCKS_AMOUNT - 1) { blockWeight += tx.weight; + blockVsize += tx.vsize; blockSize += tx.size; + blockFees += tx.fee; transactions.push(tx); + transactionIds.push(tx.txid); } else { mempoolBlocks.push(this.dataToMempoolBlocks(transactions, mempoolBlocks.length)); + blockVsize = 0; + tx.position = { + block: mempoolBlocks.length, + vsize: blockVsize + (tx.vsize / 2), + }; + blockVsize += tx.vsize; blockWeight = tx.weight; blockSize = tx.size; + blockFees = tx.fee; + transactionIds = [tx.txid]; transactions = [tx]; } }); @@ -151,6 +166,8 @@ class MempoolBlocks { } public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise { + const start = Date.now(); + // reset mempool short ids this.resetUids(); for (const tx of Object.values(newMempool)) { @@ -167,7 +184,7 @@ class MempoolBlocks { fee: entry.fee, weight: entry.weight, feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.effectiveFeePerVsize || (entry.fee / (entry.weight / 4)), inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], }); } @@ -189,7 +206,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: number[][], rates: Map, clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); @@ -197,19 +214,14 @@ class MempoolBlocks { this.txSelectionWorker?.once('error', reject); }); this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); - let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); - // filter out stale transactions - const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); - const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - if (filteredCount < unfilteredCount) { - logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from makeBlockTemplates`); - } + const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - return this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + const processed = this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); + logger.debug(`makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`); + return processed; } catch (e) { logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } @@ -223,6 +235,8 @@ class MempoolBlocks { return; } + const start = Date.now(); + for (const tx of Object.values(added)) { this.setUid(tx); } @@ -235,7 +249,7 @@ class MempoolBlocks { fee: entry.fee, weight: entry.weight, feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.effectiveFeePerVsize || (entry.fee / (entry.weight / 4)), inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], }; }); @@ -243,7 +257,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: number[][], rates: Map, clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); @@ -251,76 +265,98 @@ class MempoolBlocks { this.txSelectionWorker?.once('error', reject); }); this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids }); - let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); - // filter out stale transactions - const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); - const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); - if (filteredCount < unfilteredCount) { - logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from updateBlockTemplates`); - } + const { blocks, rates, clusters } = this.convertResultTxids(await workerResultPromise); this.removeUids(removedUids); // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + this.processBlockTemplates(newMempool, blocks, rates, clusters, saveResults); + logger.debug(`updateBlockTemplates completed in ${(Date.now() - start) / 1000} seconds`); } catch (e) { logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } } - private processBlockTemplates(mempool, blocks: ThreadTransaction[][], clusters, saveResults): MempoolBlockWithTransactions[] { - // update this thread's mempool with the results - blocks.forEach(block => { - block.forEach(tx => { - if (tx.txid && tx.txid in mempool) { - if (tx.effectiveFeePerVsize != null) { - mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize; - } - if (tx.cpfpRoot && tx.cpfpRoot in clusters) { - const ancestors: Ancestor[] = []; - const descendants: Ancestor[] = []; - const cluster = clusters[tx.cpfpRoot]; - let matched = false; - cluster.forEach(txid => { - if (!txid || !mempool[txid]) { - logger.warn('projected transaction ancestor missing from mempool cache'); - return; - } - if (txid === tx.txid) { - matched = true; - } else { - const relative = { - txid: txid, - fee: mempool[txid].fee, - weight: mempool[txid].weight, - }; - if (matched) { - descendants.push(relative); - } else { - ancestors.push(relative); - } - } - }); - mempool[tx.txid].ancestors = ancestors; - mempool[tx.txid].descendants = descendants; - mempool[tx.txid].bestDescendant = null; - } - mempool[tx.txid].cpfpChecked = tx.cpfpChecked; - } else { - logger.warn('projected transaction missing from mempool cache'); - } - }); - }); + private processBlockTemplates(mempool, blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }, saveResults): MempoolBlockWithTransactions[] { + for (const txid of Object.keys(rates)) { + if (txid in mempool) { + mempool[txid].effectiveFeePerVsize = rates[txid]; + } + } - // unpack the condensed blocks into proper mempool blocks - const mempoolBlocks = blocks.map((transactions, blockIndex) => { - return this.dataToMempoolBlocks(transactions.map(tx => { - return mempool[tx.txid] || null; - }).filter(tx => !!tx), blockIndex); - }); + const readyBlocks: { transactionIds, transactions, totalSize, totalWeight, totalFees }[] = []; + const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; + // update this thread's mempool with the results + for (let blockIndex = 0; blockIndex < blocks.length; blockIndex++) { + const block: string[] = blocks[blockIndex]; + let txid: string; + let mempoolTx: TransactionExtended; + let totalSize = 0; + let totalVsize = 0; + let totalWeight = 0; + let totalFees = 0; + const transactions: TransactionExtended[] = []; + for (let txIndex = 0; txIndex < block.length; txIndex++) { + txid = block[txIndex]; + if (txid) { + mempoolTx = mempool[txid]; + // save position in projected blocks + mempoolTx.position = { + block: blockIndex, + vsize: totalVsize + (mempoolTx.vsize / 2), + }; + mempoolTx.cpfpChecked = true; + + totalSize += mempoolTx.size; + totalVsize += mempoolTx.vsize; + totalWeight += mempoolTx.weight; + totalFees += mempoolTx.fee; + + transactions.push(mempoolTx); + } + } + readyBlocks.push({ + transactionIds: block, + transactions, + totalSize, + totalWeight, + totalFees + }); + } + + for (const cluster of Object.values(clusters)) { + for (const memberTxid of cluster) { + if (memberTxid in mempool) { + const mempoolTx = mempool[memberTxid]; + const ancestors: Ancestor[] = []; + const descendants: Ancestor[] = []; + let matched = false; + cluster.forEach(txid => { + if (txid === memberTxid) { + matched = true; + } else { + const relative = { + txid: txid, + fee: mempool[txid].fee, + weight: mempool[txid].weight, + }; + if (matched) { + descendants.push(relative); + } else { + ancestors.push(relative); + } + } + }); + mempoolTx.ancestors = ancestors; + mempoolTx.descendants = descendants; + mempoolTx.bestDescendant = null; + } + } + } + + const mempoolBlocks = readyBlocks.map((b, i) => this.dataToMempoolBlocks(b.transactions, i)); if (saveResults) { const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks); @@ -388,14 +424,16 @@ class MempoolBlocks { } } - private convertResultTxids({ blocks, clusters }: { blocks: any[][], clusters: Map}) - : { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }} { - for (const block of blocks) { - for (const tx of block) { - tx.txid = this.uidMap.get(tx.uid); - if (tx.cpfpRoot) { - tx.cpfpRoot = this.uidMap.get(tx.cpfpRoot); - } + private convertResultTxids({ blocks, rates, clusters }: { blocks: number[][], rates: Map, clusters: Map}) + : { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }} { + const convertedBlocks: string[][] = blocks.map(block => block.map(uid => { + return this.uidMap.get(uid) || ''; + })); + const convertedRates = {}; + for (const rateUid of rates.keys()) { + const rateTxid = this.uidMap.get(rateUid); + if (rateTxid) { + convertedRates[rateTxid] = rates.get(rateUid); } } const convertedClusters = {}; @@ -408,7 +446,7 @@ class MempoolBlocks { convertedClusters[rootTxid] = members; } } - return { blocks, clusters: convertedClusters } as { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }}; + return { blocks: convertedBlocks, rates: convertedRates, clusters: convertedClusters } as { blocks: string[][], rates: { [root: string]: number }, clusters: { [root: string]: string[] }}; } } diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index 15aac51d2..21fd97a1b 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -1,6 +1,6 @@ import config from '../config'; import logger from '../logger'; -import { CompactThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; +import { CompactThreadTransaction, AuditTransaction } from '../mempool.interfaces'; import { PairingHeap } from '../utils/pairing-heap'; import { Common } from './common'; import { parentPort } from 'worker_threads'; @@ -20,11 +20,11 @@ if (parentPort) { }); } - const { blocks, clusters } = makeBlockTemplates(mempool); + const { blocks, rates, clusters } = makeBlockTemplates(mempool); // return the result to main thread. if (parentPort) { - parentPort.postMessage({ blocks, clusters }); + parentPort.postMessage({ blocks, rates, clusters }); } }); } @@ -34,14 +34,14 @@ if (parentPort) { * (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp) */ function makeBlockTemplates(mempool: Map) - : { blocks: CompactThreadTransaction[][], clusters: Map } { + : { blocks: number[][], rates: Map, clusters: Map } { const start = Date.now(); const auditPool: Map = new Map(); const mempoolArray: AuditTransaction[] = []; - const restOfArray: CompactThreadTransaction[] = []; const cpfpClusters: Map = new Map(); mempool.forEach(tx => { + tx.dirty = false; // initializing everything up front helps V8 optimize property access later auditPool.set(tx.uid, { uid: tx.uid, @@ -82,9 +82,8 @@ function makeBlockTemplates(mempool: Map) // Build blocks by greedily choosing the highest feerate package // (i.e. the package rooted in the transaction with the best ancestor score) - const blocks: CompactThreadTransaction[][] = []; + const blocks: number[][] = []; let blockWeight = 4000; - let blockSize = 0; let transactions: AuditTransaction[] = []; const modified: PairingHeap = new PairingHeap((a, b): boolean => { if (a.score === b.score) { @@ -140,13 +139,16 @@ function makeBlockTemplates(mempool: Map) ancestor.used = true; ancestor.usedBy = nextTx.uid; // update original copy of this tx with effective fee rate & relatives data - mempoolTx.effectiveFeePerVsize = effectiveFeeRate; - if (isCluster) { - mempoolTx.cpfpRoot = nextTx.uid; + if (mempoolTx.effectiveFeePerVsize !== effectiveFeeRate) { + mempoolTx.effectiveFeePerVsize = effectiveFeeRate; + mempoolTx.dirty = true; + } + if (mempoolTx.cpfpRoot !== nextTx.uid) { + mempoolTx.cpfpRoot = isCluster ? nextTx.uid : null; + mempoolTx.dirty; } mempoolTx.cpfpChecked = true; transactions.push(ancestor); - blockSize += ancestor.size; blockWeight += ancestor.weight; used.push(ancestor); } @@ -172,11 +174,10 @@ function makeBlockTemplates(mempool: Map) if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { // construct this block if (transactions.length) { - blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); + blocks.push(transactions.map(t => t.uid)); } // reset for the next block transactions = []; - blockSize = 0; blockWeight = 4000; // 'overflow' packages didn't fit in this block, but are valid candidates for the next @@ -197,14 +198,22 @@ function makeBlockTemplates(mempool: Map) } // add the final unbounded block if it contains any transactions if (transactions.length > 0) { - blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); + blocks.push(transactions.map(t => t.uid)); + } + + // get map of dirty transactions + const rates = new Map(); + for (const tx of mempool.values()) { + if (tx?.dirty) { + rates.set(tx.uid, tx.effectiveFeePerVsize || tx.feePerVsize); + } } const end = Date.now(); const time = end - start; logger.debug('Mempool templates calculated in ' + time / 1000 + ' seconds'); - return { blocks, clusters: cpfpClusters }; + return { blocks, rates, clusters: cpfpClusters }; } // traverse in-mempool ancestors diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index a306e9061..c625b87b5 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -114,6 +114,7 @@ export interface CompactThreadTransaction { inputs: number[]; cpfpRoot?: string; cpfpChecked?: boolean; + dirty?: boolean; } export interface ThreadTransaction {