diff --git a/backend/src/api/common.ts b/backend/src/api/common.ts index 13fc86147..f3d3e43b5 100644 --- a/backend/src/api/common.ts +++ b/backend/src/api/common.ts @@ -80,8 +80,8 @@ export class Common { return arr; } - static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: MempoolTransactionExtended[] } { - const matches: { [txid: string]: MempoolTransactionExtended[] } = {}; + static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} { + const matches: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = {}; // For small N, a naive nested loop is extremely fast, but it doesn't scale if (added.length < 1000 && deleted.length < 50 && !forceScalable) { @@ -96,7 +96,7 @@ export class Common { addedTx.vin.some((vin) => vin.txid === deletedVin.txid && vin.vout === deletedVin.vout)); }); if (foundMatches?.length) { - matches[addedTx.txid] = [...new Set(foundMatches)]; + matches[addedTx.txid] = { replaced: [...new Set(foundMatches)], replacedBy: addedTx }; } }); } else { @@ -124,7 +124,7 @@ export class Common { foundMatches.add(deletedTx); } if (foundMatches.size) { - matches[addedTx.txid] = [...foundMatches]; + matches[addedTx.txid] = { replaced: [...foundMatches], replacedBy: addedTx }; } } } @@ -139,17 +139,17 @@ export class Common { const replaced: Set = new Set(); for (let i = 0; i < tx.vin.length; i++) { const vin = tx.vin[i]; - const match = spendMap.get(`${vin.txid}:${vin.vout}`); + const key = `${vin.txid}:${vin.vout}`; + const match = spendMap.get(key); if (match && match.txid !== tx.txid) { replaced.add(match); // remove this tx from the spendMap // prevents the same tx being replaced more than once for (const replacedVin of match.vin) { - const key = `${replacedVin.txid}:${replacedVin.vout}`; - spendMap.delete(key); + const replacedKey = `${replacedVin.txid}:${replacedVin.vout}`; + spendMap.delete(replacedKey); } } - const key = `${vin.txid}:${vin.vout}`; spendMap.delete(key); } if (replaced.size) { diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 1f55179fb..1442b05fa 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -19,12 +19,13 @@ class Mempool { private mempoolCache: { [txId: string]: MempoolTransactionExtended } = {}; private mempoolCandidates: { [txid: string ]: boolean } = {}; private spendMap = new Map(); + private recentlyDeleted: MempoolTransactionExtended[][] = []; // buffer of transactions deleted in recent mempool updates private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0, maxmempool: 300000000, mempoolminfee: Common.isLiquid() ? 0.00000100 : 0.00001000, minrelaytxfee: Common.isLiquid() ? 0.00000100 : 0.00001000 }; private mempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, newTransactions: MempoolTransactionExtended[], - deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void) | undefined; + deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void) | undefined; private $asyncMempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, mempoolSize: number, newTransactions: MempoolTransactionExtended[], - deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates) => Promise) | undefined; + deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise) | undefined; private accelerations: { [txId: string]: Acceleration } = {}; private accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {}; @@ -74,12 +75,12 @@ class Mempool { } public setMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void): void { + newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void): void { this.mempoolChangedCallback = fn; } public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, mempoolSize: number, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], + newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise): void { this.$asyncMempoolChangedCallback = fn; } @@ -362,12 +363,15 @@ class Mempool { const candidatesChanged = candidates?.added?.length || candidates?.removed?.length; - if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { - this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, accelerationDelta); + this.recentlyDeleted.unshift(deletedTransactions); + this.recentlyDeleted.length = Math.min(this.recentlyDeleted.length, 10); // truncate to the last 10 mempool updates + + if (this.mempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length)) { + this.mempoolChangedCallback(this.mempoolCache, newTransactions, this.recentlyDeleted, accelerationDelta); } - if (this.$asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidatesChanged)) { + if (this.$asyncMempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length || candidatesChanged)) { this.updateTimerProgress(timer, 'running async mempool callback'); - await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, deletedTransactions, accelerationDelta, candidates); + await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, this.recentlyDeleted, accelerationDelta, candidates); this.updateTimerProgress(timer, 'completed async mempool callback'); } @@ -541,16 +545,7 @@ class Mempool { } } - public handleRbfTransactions(rbfTransactions: { [txid: string]: MempoolTransactionExtended[]; }): void { - for (const rbfTransaction in rbfTransactions) { - if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) { - // Store replaced transactions - rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]); - } - } - } - - public handleMinedRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void { + public handleRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void { for (const rbfTransaction in rbfTransactions) { if (rbfTransactions[rbfTransaction].replacedBy && rbfTransactions[rbfTransaction]?.replaced?.length) { // Store replaced transactions diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 79a783f88..2a047472e 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -520,8 +520,17 @@ class WebsocketHandler { } } + /** + * + * @param newMempool + * @param mempoolSize + * @param newTransactions array of transactions added this mempool update. + * @param recentlyDeletedTransactions array of arrays of transactions removed in the last N mempool updates, most recent first. + * @param accelerationDelta + * @param candidates + */ async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], + newTransactions: MempoolTransactionExtended[], recentlyDeletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates): Promise { if (!this.webSocketServers.length) { throw new Error('No WebSocket.Server have been set'); @@ -529,6 +538,8 @@ class WebsocketHandler { this.printLogs(); + const deletedTransactions = recentlyDeletedTransactions.length ? recentlyDeletedTransactions[0] : []; + const transactionIds = (memPool.limitGBT && candidates) ? Object.keys(candidates?.txs || {}) : Object.keys(newMempool); let added = newTransactions; let removed = deletedTransactions; @@ -547,7 +558,7 @@ class WebsocketHandler { const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); const mempoolInfo = memPool.getMempoolInfo(); const vBytesPerSecond = memPool.getVBytesPerSecond(); - const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); + const rbfTransactions = Common.findRbfTransactions(newTransactions, recentlyDeletedTransactions.flat()); const da = difficultyAdjustment.getDifficultyAdjustment(); const accelerations = memPool.getAccelerations(); memPool.handleRbfTransactions(rbfTransactions); @@ -578,7 +589,7 @@ class WebsocketHandler { const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; for (const tx of newTransactions) { if (rbfTransactions[tx.txid]) { - for (const replaced of rbfTransactions[tx.txid]) { + for (const replaced of rbfTransactions[tx.txid].replaced) { replacedTransactions.push({ replaced: replaced.txid, by: tx }); } } @@ -947,7 +958,7 @@ class WebsocketHandler { await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, structuredClone(transactions)); const rbfTransactions = Common.findMinedRbfTransactions(transactions, memPool.getSpendMap()); - memPool.handleMinedRbfTransactions(rbfTransactions); + memPool.handleRbfTransactions(rbfTransactions); memPool.removeFromSpendMap(transactions); if (config.MEMPOOL.AUDIT && memPool.isInSync()) {