From d322c6b5b548e30e6b4d57d6ffe70729b3e32157 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 7 Apr 2023 09:41:25 +0900 Subject: [PATCH 1/2] Fix new block mempool deletion race condition --- backend/src/api/blocks.ts | 7 ++++++- backend/src/api/disk-cache.ts | 2 +- backend/src/api/mempool-blocks.ts | 2 +- backend/src/api/mempool.ts | 27 ++++++--------------------- backend/src/index.ts | 12 ++++++++---- backend/src/mempool.interfaces.ts | 1 - 6 files changed, 22 insertions(+), 29 deletions(-) diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index c50c38107..23814a87e 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -529,13 +529,14 @@ class Blocks { return await BlocksRepository.$validateChain(); } - public async $updateBlocks() { + public async $updateBlocks(): Promise { // warn if this run stalls the main loop for more than 2 minutes const timer = this.startTimer(); diskCache.lock(); let fastForwarded = false; + let handledBlocks = 0; const blockHeightTip = await bitcoinApi.$getBlockHeightTip(); this.updateTimerProgress(timer, 'got block height tip'); @@ -697,11 +698,15 @@ class Blocks { this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`); await Promise.all(callbackPromises); this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`); + + handledBlocks++; } diskCache.unlock(); this.clearTimer(timer); + + return handledBlocks; } private startTimer() { diff --git a/backend/src/api/disk-cache.ts b/backend/src/api/disk-cache.ts index 220d22b8e..0264fe1a3 100644 --- a/backend/src/api/disk-cache.ts +++ b/backend/src/api/disk-cache.ts @@ -52,7 +52,7 @@ class DiskCache { const mempool = memPool.getMempool(); const mempoolArray: TransactionExtended[] = []; for (const tx in mempool) { - if (mempool[tx] && !mempool[tx].deleteAfter) { + if (mempool[tx]) { mempoolArray.push(mempool[tx]); } } diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 49fda543b..fdaa8c466 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -178,7 +178,7 @@ class MempoolBlocks { // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread const strippedMempool: { [txid: string]: ThreadTransaction } = {}; - Object.values(newMempool).filter(tx => !tx.deleteAfter).forEach(entry => { + Object.values(newMempool).forEach(entry => { strippedMempool[entry.txid] = { txid: entry.txid, fee: entry.fee, diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index d476d6bca..127314218 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -12,7 +12,6 @@ import rbfCache from './rbf-cache'; class Mempool { private static WEBSOCKET_REFRESH_RATE_MS = 10000; - private static LAZY_DELETE_AFTER_SECONDS = 30; private inSync: boolean = false; private mempoolCacheDelta: number = -1; private mempoolCache: { [txId: string]: TransactionExtended } = {}; @@ -119,7 +118,7 @@ class Mempool { return txTimes; } - public async $updateMempool(): Promise { + public async $updateMempool(transactions: string[]): Promise { logger.debug(`Updating mempool...`); // warn if this run stalls the main loop for more than 2 minutes @@ -128,7 +127,6 @@ class Mempool { const start = new Date().getTime(); let hasChange: boolean = false; const currentMempoolSize = Object.keys(this.mempoolCache).length; - const transactions = await bitcoinApi.$getRawMempool(); this.updateTimerProgress(timer, 'got raw mempool'); const diff = transactions.length - currentMempoolSize; const newTransactions: TransactionExtended[] = []; @@ -207,13 +205,15 @@ class Mempool { const transactionsObject = {}; transactions.forEach((txId) => transactionsObject[txId] = true); - // Flag transactions for lazy deletion + // Delete evicted transactions from mempool for (const tx in this.mempoolCache) { - if (!transactionsObject[tx] && !this.mempoolCache[tx].deleteAfter) { + if (!transactionsObject[tx]) { deletedTransactions.push(this.mempoolCache[tx]); - this.mempoolCache[tx].deleteAfter = new Date().getTime() + Mempool.LAZY_DELETE_AFTER_SECONDS * 1000; } } + for (const tx of deletedTransactions) { + delete this.mempoolCache[tx.txid]; + } } const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx)); @@ -270,10 +270,6 @@ class Mempool { if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) { // Store replaced transactions rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]); - // Erase the replaced transactions from the local mempool - for (const replaced of rbfTransactions[rbfTransaction]) { - delete this.mempoolCache[replaced.txid]; - } } } } @@ -291,17 +287,6 @@ class Mempool { } } - public deleteExpiredTransactions() { - const now = new Date().getTime(); - for (const tx in this.mempoolCache) { - const lazyDeleteAt = this.mempoolCache[tx].deleteAfter; - if (lazyDeleteAt && lazyDeleteAt < now) { - delete this.mempoolCache[tx]; - rbfCache.evict(tx); - } - } - } - private $getMempoolInfo() { if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) { return Promise.all([ diff --git a/backend/src/index.ts b/backend/src/index.ts index 3887aac2c..9f543d644 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -2,6 +2,7 @@ import express from 'express'; import { Application, Request, Response, NextFunction } from 'express'; import * as http from 'http'; import * as WebSocket from 'ws'; +import bitcoinApi from './api/bitcoin/bitcoin-api-factory'; import cluster from 'cluster'; import DB from './database'; import config from './config'; @@ -179,12 +180,15 @@ class Server { logger.debug(msg); } } - await blocks.$updateBlocks(); - memPool.deleteExpiredTransactions(); - await memPool.$updateMempool(); + const newMempool = await bitcoinApi.$getRawMempool(); + const numHandledBlocks = await blocks.$updateBlocks(); + if (numHandledBlocks === 0) { + await memPool.$updateMempool(newMempool); + } indexer.$run(); - setTimeout(this.runMainUpdateLoop.bind(this), config.MEMPOOL.POLL_RATE_MS); + // rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS + setTimeout(this.runMainUpdateLoop.bind(this), numHandledBlocks > 0 ? 1 : config.MEMPOOL.POLL_RATE_MS); this.backendRetryCount = 0; } catch (e: any) { this.backendRetryCount++; diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 25cd8b95d..7ba44fb91 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -80,7 +80,6 @@ export interface TransactionExtended extends IEsploraApi.Transaction { descendants?: Ancestor[]; bestDescendant?: BestDescendant | null; cpfpChecked?: boolean; - deleteAfter?: number; position?: { block: number, vsize: number, From 3d0f7d6855dd68e65511151ca58d93a692f7cd32 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Thu, 4 May 2023 19:10:53 -0400 Subject: [PATCH 2/2] add missing rbf eviction --- backend/src/api/rbf-cache.ts | 2 +- backend/src/api/websocket-handler.ts | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index d75fb0ba8..6c5afc146 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -163,7 +163,7 @@ class RbfCache { } // flag a transaction as removed from the mempool - public evict(txid, fast: boolean = false): void { + public evict(txid: string, fast: boolean = false): void { if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) { this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours } diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index f28f284c7..dc773742a 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -301,6 +301,9 @@ class WebsocketHandler { rbfReplacements = rbfCache.getRbfTrees(false); fullRbfReplacements = rbfCache.getRbfTrees(true); } + for (const deletedTx of deletedTransactions) { + rbfCache.evict(deletedTx.txid); + } const recommendedFees = feeApi.getRecommendedFee(); this.wss.clients.forEach(async (client) => {