diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index fdaa8c466..af23a6376 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,5 +1,5 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; import { Worker } from 'worker_threads'; @@ -10,6 +10,9 @@ class MempoolBlocks { private mempoolBlockDeltas: MempoolBlockDelta[] = []; private txSelectionWorker: Worker | null = null; + private nextUid: number = 1; + private uidMap: Map = new Map(); // map short numerical uids to full txids + constructor() {} public getMempoolBlocks(): MempoolBlock[] { @@ -175,18 +178,26 @@ class MempoolBlocks { } public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise { + // reset mempool short ids + this.resetUids(); + for (const tx of Object.values(newMempool)) { + this.setUid(tx); + } + // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread - const strippedMempool: { [txid: string]: ThreadTransaction } = {}; + const strippedMempool: Map = new Map(); Object.values(newMempool).forEach(entry => { - strippedMempool[entry.txid] = { - txid: entry.txid, - fee: entry.fee, - weight: entry.weight, - feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), - vin: entry.vin.map(v => v.txid), - }; + if (entry.uid != null) { + strippedMempool.set(entry.uid, { + uid: entry.uid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], + }); + } }); // (re)initialize tx selection worker thread @@ -205,7 +216,7 @@ class MempoolBlocks { // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); @@ -213,7 +224,7 @@ class MempoolBlocks { this.txSelectionWorker?.once('error', reject); }); this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool }); - let { blocks, clusters } = await workerResultPromise; + let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); // filter out stale transactions const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); @@ -232,37 +243,42 @@ class MempoolBlocks { return this.mempoolBlocks; } - public async $updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise { + public async $updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: TransactionExtended[], saveResults: boolean = false): Promise { if (!this.txSelectionWorker) { // need to reset the worker await this.$makeBlockTemplates(newMempool, saveResults); return; } + + for (const tx of Object.values(added)) { + this.setUid(tx); + } + const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[]; // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread - const addedStripped: ThreadTransaction[] = added.map(entry => { + const addedStripped: CompactThreadTransaction[] = added.filter(entry => entry.uid != null).map(entry => { return { - txid: entry.txid, + uid: entry.uid || 0, fee: entry.fee, weight: entry.weight, feePerVsize: entry.fee / (entry.weight / 4), effectiveFeePerVsize: entry.fee / (entry.weight / 4), - vin: entry.vin.map(v => v.txid), + inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[], }; }); // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; try { - const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => { + const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map }>((resolve, reject) => { threadErrorListener = reject; this.txSelectionWorker?.once('message', (result): void => { resolve(result); }); this.txSelectionWorker?.once('error', reject); }); - this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); - let { blocks, clusters } = await workerResultPromise; + this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids }); + let { blocks, clusters } = this.convertResultTxids(await workerResultPromise); // filter out stale transactions const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0); blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool))); @@ -271,6 +287,8 @@ class MempoolBlocks { logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from updateBlockTemplates`); } + this.removeUids(removedUids); + // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); @@ -280,7 +298,7 @@ class MempoolBlocks { } } - private processBlockTemplates(mempool, blocks, clusters, saveResults): MempoolBlockWithTransactions[] { + private processBlockTemplates(mempool, blocks: ThreadTransaction[][], clusters, saveResults): MempoolBlockWithTransactions[] { // update this thread's mempool with the results blocks.forEach((block, blockIndex) => { let runningVsize = 0; @@ -371,6 +389,54 @@ class MempoolBlocks { transactions: fitTransactions.map((tx) => Common.stripTransaction(tx)), }; } + + private resetUids(): void { + this.uidMap.clear(); + this.nextUid = 1; + } + + private setUid(tx: TransactionExtended): number { + const uid = this.nextUid; + this.nextUid++; + this.uidMap.set(uid, tx.txid); + tx.uid = uid; + return uid; + } + + private getUid(tx: TransactionExtended): number | void { + if (tx?.uid != null && this.uidMap.has(tx.uid)) { + return tx.uid; + } + } + + private removeUids(uids: number[]): void { + for (const uid of uids) { + this.uidMap.delete(uid); + } + } + + private convertResultTxids({ blocks, clusters }: { blocks: any[][], clusters: Map}) + : { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }} { + for (const block of blocks) { + for (const tx of block) { + tx.txid = this.uidMap.get(tx.uid); + if (tx.cpfpRoot) { + tx.cpfpRoot = this.uidMap.get(tx.cpfpRoot); + } + } + } + const convertedClusters = {}; + for (const rootUid of clusters.keys()) { + const rootTxid = this.uidMap.get(rootUid); + if (rootTxid) { + const members = clusters.get(rootUid)?.map(uid => { + return this.uidMap.get(uid); + }); + convertedClusters[rootTxid] = members; + } + } + return { blocks, clusters: convertedClusters } as { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }}; + } } export default new MempoolBlocks(); diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index c035099a3..1635acac4 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -1,10 +1,10 @@ import config from '../config'; import logger from '../logger'; -import { ThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; +import { CompactThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces'; import { PairingHeap } from '../utils/pairing-heap'; import { parentPort } from 'worker_threads'; -let mempool: { [txid: string]: ThreadTransaction } = {}; +let mempool: Map = new Map(); if (parentPort) { parentPort.on('message', (params) => { @@ -12,10 +12,10 @@ if (parentPort) { mempool = params.mempool; } else if (params.type === 'update') { params.added.forEach(tx => { - mempool[tx.txid] = tx; + mempool.set(tx.uid, tx); }); - params.removed.forEach(txid => { - delete mempool[txid]; + params.removed.forEach(uid => { + mempool.delete(uid); }); } @@ -23,7 +23,7 @@ if (parentPort) { // return the result to main thread. if (parentPort) { - parentPort.postMessage({ blocks, clusters }); + parentPort.postMessage({ blocks, clusters }); } }); } @@ -32,26 +32,25 @@ if (parentPort) { * Build projected mempool blocks using an approximation of the transaction selection algorithm from Bitcoin Core * (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp) */ -function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) - : { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } } { +function makeBlockTemplates(mempool: Map) + : { blocks: CompactThreadTransaction[][], clusters: Map } { const start = Date.now(); - const auditPool: { [txid: string]: AuditTransaction } = {}; + const auditPool: Map = new Map(); const mempoolArray: AuditTransaction[] = []; - const restOfArray: ThreadTransaction[] = []; - const cpfpClusters: { [root: string]: string[] } = {}; + const restOfArray: CompactThreadTransaction[] = []; + const cpfpClusters: Map = new Map(); - // grab the top feerate txs up to maxWeight - Object.values(mempool).sort((a, b) => b.feePerVsize - a.feePerVsize).forEach(tx => { + mempool.forEach(tx => { // initializing everything up front helps V8 optimize property access later - auditPool[tx.txid] = { - txid: tx.txid, + auditPool.set(tx.uid, { + uid: tx.uid, fee: tx.fee, weight: tx.weight, feePerVsize: tx.feePerVsize, effectiveFeePerVsize: tx.feePerVsize, - vin: tx.vin, + inputs: tx.inputs || [], relativesSet: false, - ancestorMap: new Map(), + ancestorMap: new Map(), children: new Set(), ancestorFee: 0, ancestorWeight: 0, @@ -59,8 +58,8 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) used: false, modified: false, modifiedNode: null, - }; - mempoolArray.push(auditPool[tx.txid]); + }); + mempoolArray.push(auditPool.get(tx.uid) as AuditTransaction); }); // Build relatives graph & calculate ancestor scores @@ -73,8 +72,8 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) // Sort by descending ancestor score mempoolArray.sort((a, b) => { if (b.score === a.score) { - // tie-break by lexicographic txid order for stability - return a.txid < b.txid ? -1 : 1; + // tie-break by uid for stability + return a.uid < b.uid ? -1 : 1; } else { return (b.score || 0) - (a.score || 0); } @@ -82,14 +81,14 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) // Build blocks by greedily choosing the highest feerate package // (i.e. the package rooted in the transaction with the best ancestor score) - const blocks: ThreadTransaction[][] = []; + const blocks: CompactThreadTransaction[][] = []; let blockWeight = 4000; let blockSize = 0; let transactions: AuditTransaction[] = []; const modified: PairingHeap = new PairingHeap((a, b): boolean => { if (a.score === b.score) { - // tie-break by lexicographic txid order for stability - return a.txid > b.txid; + // tie-break by uid for stability + return a.uid > b.uid; } else { return (a.score || 0) > (b.score || 0); } @@ -126,20 +125,23 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) const sortedTxSet = [...ancestors.sort((a, b) => { return (a.ancestorMap.size || 0) - (b.ancestorMap.size || 0); }), nextTx]; let isCluster = false; if (sortedTxSet.length > 1) { - cpfpClusters[nextTx.txid] = sortedTxSet.map(tx => tx.txid); + cpfpClusters.set(nextTx.uid, sortedTxSet.map(tx => tx.uid)); isCluster = true; } const effectiveFeeRate = nextTx.ancestorFee / (nextTx.ancestorWeight / 4); const used: AuditTransaction[] = []; while (sortedTxSet.length) { const ancestor = sortedTxSet.pop(); - const mempoolTx = mempool[ancestor.txid]; + const mempoolTx = mempool.get(ancestor.uid); + if (!mempoolTx) { + continue; + } ancestor.used = true; - ancestor.usedBy = nextTx.txid; + ancestor.usedBy = nextTx.uid; // update original copy of this tx with effective fee rate & relatives data mempoolTx.effectiveFeePerVsize = effectiveFeeRate; if (isCluster) { - mempoolTx.cpfpRoot = nextTx.txid; + mempoolTx.cpfpRoot = nextTx.uid; } mempoolTx.cpfpChecked = true; transactions.push(ancestor); @@ -169,7 +171,7 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) if ((exceededPackageTries || queueEmpty) && blocks.length < 7) { // construct this block if (transactions.length) { - blocks.push(transactions.map(t => mempool[t.txid])); + blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); } // reset for the next block transactions = []; @@ -194,7 +196,7 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) } // add the final unbounded block if it contains any transactions if (transactions.length > 0) { - blocks.push(transactions.map(t => mempool[t.txid])); + blocks.push(transactions.map(t => mempool.get(t.uid) as CompactThreadTransaction)); } const end = Date.now(); @@ -208,10 +210,10 @@ function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction }) // recursion unavoidable, but should be limited to depth < 25 by mempool policy function setRelatives( tx: AuditTransaction, - mempool: { [txid: string]: AuditTransaction }, + mempool: Map, ): void { - for (const parent of tx.vin) { - const parentTx = mempool[parent]; + for (const parent of tx.inputs) { + const parentTx = mempool.get(parent); if (parentTx && !tx.ancestorMap?.has(parent)) { tx.ancestorMap.set(parent, parentTx); parentTx.children.add(tx); @@ -220,7 +222,7 @@ function setRelatives( setRelatives(parentTx, mempool); } parentTx.ancestorMap.forEach((ancestor) => { - tx.ancestorMap.set(ancestor.txid, ancestor); + tx.ancestorMap.set(ancestor.uid, ancestor); }); } }; @@ -238,7 +240,7 @@ function setRelatives( // avoids recursion to limit call stack depth function updateDescendants( rootTx: AuditTransaction, - mempool: { [txid: string]: AuditTransaction }, + mempool: Map, modified: PairingHeap, ): void { const descendantSet: Set = new Set(); @@ -254,9 +256,9 @@ function updateDescendants( }); while (descendants.length) { descendantTx = descendants.pop(); - if (descendantTx && descendantTx.ancestorMap && descendantTx.ancestorMap.has(rootTx.txid)) { + if (descendantTx && descendantTx.ancestorMap && descendantTx.ancestorMap.has(rootTx.uid)) { // remove tx as ancestor - descendantTx.ancestorMap.delete(rootTx.txid); + descendantTx.ancestorMap.delete(rootTx.uid); descendantTx.ancestorFee -= rootTx.fee; descendantTx.ancestorWeight -= rootTx.weight; tmpScore = descendantTx.score; diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index dc773742a..bc0fb8ea5 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -282,7 +282,7 @@ class WebsocketHandler { this.printLogs(); if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { - await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true); + await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions, true); } else { mempoolBlocks.updateMempoolBlocks(newMempool, true); } diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 7ba44fb91..53bd3ff33 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -84,17 +84,18 @@ export interface TransactionExtended extends IEsploraApi.Transaction { block: number, vsize: number, }; + uid?: number; } export interface AuditTransaction { - txid: string; + uid: number; fee: number; weight: number; feePerVsize: number; effectiveFeePerVsize: number; - vin: string[]; + inputs: number[]; relativesSet: boolean; - ancestorMap: Map; + ancestorMap: Map; children: Set; ancestorFee: number; ancestorWeight: number; @@ -104,13 +105,24 @@ export interface AuditTransaction { modifiedNode: HeapNode; } +export interface CompactThreadTransaction { + uid: number; + fee: number; + weight: number; + feePerVsize: number; + effectiveFeePerVsize?: number; + inputs: number[]; + cpfpRoot?: string; + cpfpChecked?: boolean; +} + export interface ThreadTransaction { txid: string; fee: number; weight: number; feePerVsize: number; effectiveFeePerVsize?: number; - vin: string[]; + inputs: number[]; cpfpRoot?: string; cpfpChecked?: boolean; }