Merge pull request #3655 from mempool/mononaut/mempool-delete-race-condition
Fix new block mempool eviction race condition
This commit is contained in:
		
						commit
						64b3e7ad50
					
				@ -529,13 +529,14 @@ class Blocks {
 | 
				
			|||||||
    return await BlocksRepository.$validateChain();
 | 
					    return await BlocksRepository.$validateChain();
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async $updateBlocks() {
 | 
					  public async $updateBlocks(): Promise<number> {
 | 
				
			||||||
    // warn if this run stalls the main loop for more than 2 minutes
 | 
					    // warn if this run stalls the main loop for more than 2 minutes
 | 
				
			||||||
    const timer = this.startTimer();
 | 
					    const timer = this.startTimer();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    diskCache.lock();
 | 
					    diskCache.lock();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let fastForwarded = false;
 | 
					    let fastForwarded = false;
 | 
				
			||||||
 | 
					    let handledBlocks = 0;
 | 
				
			||||||
    const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
 | 
					    const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
 | 
				
			||||||
    this.updateTimerProgress(timer, 'got block height tip');
 | 
					    this.updateTimerProgress(timer, 'got block height tip');
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -697,11 +698,15 @@ class Blocks {
 | 
				
			|||||||
      this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
 | 
					      this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
 | 
				
			||||||
      await Promise.all(callbackPromises);
 | 
					      await Promise.all(callbackPromises);
 | 
				
			||||||
      this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
 | 
					      this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      handledBlocks++;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    diskCache.unlock();
 | 
					    diskCache.unlock();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    this.clearTimer(timer);
 | 
					    this.clearTimer(timer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return handledBlocks;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private startTimer() {
 | 
					  private startTimer() {
 | 
				
			||||||
 | 
				
			|||||||
@ -52,7 +52,7 @@ class DiskCache {
 | 
				
			|||||||
      const mempool = memPool.getMempool();
 | 
					      const mempool = memPool.getMempool();
 | 
				
			||||||
      const mempoolArray: TransactionExtended[] = [];
 | 
					      const mempoolArray: TransactionExtended[] = [];
 | 
				
			||||||
      for (const tx in mempool) {
 | 
					      for (const tx in mempool) {
 | 
				
			||||||
        if (mempool[tx] && !mempool[tx].deleteAfter) {
 | 
					        if (mempool[tx]) {
 | 
				
			||||||
          mempoolArray.push(mempool[tx]);
 | 
					          mempoolArray.push(mempool[tx]);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
				
			|||||||
@ -178,7 +178,7 @@ class MempoolBlocks {
 | 
				
			|||||||
    // prepare a stripped down version of the mempool with only the minimum necessary data
 | 
					    // prepare a stripped down version of the mempool with only the minimum necessary data
 | 
				
			||||||
    // to reduce the overhead of passing this data to the worker thread
 | 
					    // to reduce the overhead of passing this data to the worker thread
 | 
				
			||||||
    const strippedMempool: { [txid: string]: ThreadTransaction } = {};
 | 
					    const strippedMempool: { [txid: string]: ThreadTransaction } = {};
 | 
				
			||||||
    Object.values(newMempool).filter(tx => !tx.deleteAfter).forEach(entry => {
 | 
					    Object.values(newMempool).forEach(entry => {
 | 
				
			||||||
      strippedMempool[entry.txid] = {
 | 
					      strippedMempool[entry.txid] = {
 | 
				
			||||||
        txid: entry.txid,
 | 
					        txid: entry.txid,
 | 
				
			||||||
        fee: entry.fee,
 | 
					        fee: entry.fee,
 | 
				
			||||||
 | 
				
			|||||||
@ -12,7 +12,6 @@ import rbfCache from './rbf-cache';
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
class Mempool {
 | 
					class Mempool {
 | 
				
			||||||
  private static WEBSOCKET_REFRESH_RATE_MS = 10000;
 | 
					  private static WEBSOCKET_REFRESH_RATE_MS = 10000;
 | 
				
			||||||
  private static LAZY_DELETE_AFTER_SECONDS = 30;
 | 
					 | 
				
			||||||
  private inSync: boolean = false;
 | 
					  private inSync: boolean = false;
 | 
				
			||||||
  private mempoolCacheDelta: number = -1;
 | 
					  private mempoolCacheDelta: number = -1;
 | 
				
			||||||
  private mempoolCache: { [txId: string]: TransactionExtended } = {};
 | 
					  private mempoolCache: { [txId: string]: TransactionExtended } = {};
 | 
				
			||||||
@ -119,7 +118,7 @@ class Mempool {
 | 
				
			|||||||
    return txTimes;
 | 
					    return txTimes;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async $updateMempool(): Promise<void> {
 | 
					  public async $updateMempool(transactions: string[]): Promise<void> {
 | 
				
			||||||
    logger.debug(`Updating mempool...`);
 | 
					    logger.debug(`Updating mempool...`);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // warn if this run stalls the main loop for more than 2 minutes
 | 
					    // warn if this run stalls the main loop for more than 2 minutes
 | 
				
			||||||
@ -128,7 +127,6 @@ class Mempool {
 | 
				
			|||||||
    const start = new Date().getTime();
 | 
					    const start = new Date().getTime();
 | 
				
			||||||
    let hasChange: boolean = false;
 | 
					    let hasChange: boolean = false;
 | 
				
			||||||
    const currentMempoolSize = Object.keys(this.mempoolCache).length;
 | 
					    const currentMempoolSize = Object.keys(this.mempoolCache).length;
 | 
				
			||||||
    const transactions = await bitcoinApi.$getRawMempool();
 | 
					 | 
				
			||||||
    this.updateTimerProgress(timer, 'got raw mempool');
 | 
					    this.updateTimerProgress(timer, 'got raw mempool');
 | 
				
			||||||
    const diff = transactions.length - currentMempoolSize;
 | 
					    const diff = transactions.length - currentMempoolSize;
 | 
				
			||||||
    const newTransactions: TransactionExtended[] = [];
 | 
					    const newTransactions: TransactionExtended[] = [];
 | 
				
			||||||
@ -207,13 +205,15 @@ class Mempool {
 | 
				
			|||||||
      const transactionsObject = {};
 | 
					      const transactionsObject = {};
 | 
				
			||||||
      transactions.forEach((txId) => transactionsObject[txId] = true);
 | 
					      transactions.forEach((txId) => transactionsObject[txId] = true);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      // Flag transactions for lazy deletion
 | 
					      // Delete evicted transactions from mempool
 | 
				
			||||||
      for (const tx in this.mempoolCache) {
 | 
					      for (const tx in this.mempoolCache) {
 | 
				
			||||||
        if (!transactionsObject[tx] && !this.mempoolCache[tx].deleteAfter) {
 | 
					        if (!transactionsObject[tx]) {
 | 
				
			||||||
          deletedTransactions.push(this.mempoolCache[tx]);
 | 
					          deletedTransactions.push(this.mempoolCache[tx]);
 | 
				
			||||||
          this.mempoolCache[tx].deleteAfter = new Date().getTime() + Mempool.LAZY_DELETE_AFTER_SECONDS * 1000;
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					      for (const tx of deletedTransactions) {
 | 
				
			||||||
 | 
					        delete this.mempoolCache[tx.txid];
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
 | 
					    const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
 | 
				
			||||||
@ -270,10 +270,6 @@ class Mempool {
 | 
				
			|||||||
      if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
 | 
					      if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
 | 
				
			||||||
        // Store replaced transactions
 | 
					        // Store replaced transactions
 | 
				
			||||||
        rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
 | 
					        rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
 | 
				
			||||||
        // Erase the replaced transactions from the local mempool
 | 
					 | 
				
			||||||
        for (const replaced of rbfTransactions[rbfTransaction]) {
 | 
					 | 
				
			||||||
          delete this.mempoolCache[replaced.txid];
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
@ -291,17 +287,6 @@ class Mempool {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public deleteExpiredTransactions() {
 | 
					 | 
				
			||||||
    const now = new Date().getTime();
 | 
					 | 
				
			||||||
    for (const tx in this.mempoolCache) {
 | 
					 | 
				
			||||||
      const lazyDeleteAt = this.mempoolCache[tx].deleteAfter;
 | 
					 | 
				
			||||||
      if (lazyDeleteAt && lazyDeleteAt < now) {
 | 
					 | 
				
			||||||
        delete this.mempoolCache[tx];
 | 
					 | 
				
			||||||
        rbfCache.evict(tx);
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  private $getMempoolInfo() {
 | 
					  private $getMempoolInfo() {
 | 
				
			||||||
    if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
 | 
					    if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
 | 
				
			||||||
      return Promise.all([
 | 
					      return Promise.all([
 | 
				
			||||||
 | 
				
			|||||||
@ -163,7 +163,7 @@ class RbfCache {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // flag a transaction as removed from the mempool
 | 
					  // flag a transaction as removed from the mempool
 | 
				
			||||||
  public evict(txid, fast: boolean = false): void {
 | 
					  public evict(txid: string, fast: boolean = false): void {
 | 
				
			||||||
    if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
 | 
					    if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
 | 
				
			||||||
      this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours
 | 
					      this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -301,6 +301,9 @@ class WebsocketHandler {
 | 
				
			|||||||
      rbfReplacements = rbfCache.getRbfTrees(false);
 | 
					      rbfReplacements = rbfCache.getRbfTrees(false);
 | 
				
			||||||
      fullRbfReplacements = rbfCache.getRbfTrees(true);
 | 
					      fullRbfReplacements = rbfCache.getRbfTrees(true);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    for (const deletedTx of deletedTransactions) {
 | 
				
			||||||
 | 
					      rbfCache.evict(deletedTx.txid);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    const recommendedFees = feeApi.getRecommendedFee();
 | 
					    const recommendedFees = feeApi.getRecommendedFee();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    this.wss.clients.forEach(async (client) => {
 | 
					    this.wss.clients.forEach(async (client) => {
 | 
				
			||||||
 | 
				
			|||||||
@ -2,6 +2,7 @@ import express from 'express';
 | 
				
			|||||||
import { Application, Request, Response, NextFunction } from 'express';
 | 
					import { Application, Request, Response, NextFunction } from 'express';
 | 
				
			||||||
import * as http from 'http';
 | 
					import * as http from 'http';
 | 
				
			||||||
import * as WebSocket from 'ws';
 | 
					import * as WebSocket from 'ws';
 | 
				
			||||||
 | 
					import bitcoinApi from './api/bitcoin/bitcoin-api-factory';
 | 
				
			||||||
import cluster from 'cluster';
 | 
					import cluster from 'cluster';
 | 
				
			||||||
import DB from './database';
 | 
					import DB from './database';
 | 
				
			||||||
import config from './config';
 | 
					import config from './config';
 | 
				
			||||||
@ -179,12 +180,15 @@ class Server {
 | 
				
			|||||||
          logger.debug(msg);
 | 
					          logger.debug(msg);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      await blocks.$updateBlocks();
 | 
					      const newMempool = await bitcoinApi.$getRawMempool();
 | 
				
			||||||
      memPool.deleteExpiredTransactions();
 | 
					      const numHandledBlocks = await blocks.$updateBlocks();
 | 
				
			||||||
      await memPool.$updateMempool();
 | 
					      if (numHandledBlocks === 0) {
 | 
				
			||||||
 | 
					        await memPool.$updateMempool(newMempool);
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
      indexer.$run();
 | 
					      indexer.$run();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      setTimeout(this.runMainUpdateLoop.bind(this), config.MEMPOOL.POLL_RATE_MS);
 | 
					      // rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS
 | 
				
			||||||
 | 
					      setTimeout(this.runMainUpdateLoop.bind(this), numHandledBlocks > 0 ? 1 : config.MEMPOOL.POLL_RATE_MS);
 | 
				
			||||||
      this.backendRetryCount = 0;
 | 
					      this.backendRetryCount = 0;
 | 
				
			||||||
    } catch (e: any) {
 | 
					    } catch (e: any) {
 | 
				
			||||||
      this.backendRetryCount++;
 | 
					      this.backendRetryCount++;
 | 
				
			||||||
 | 
				
			|||||||
@ -80,7 +80,6 @@ export interface TransactionExtended extends IEsploraApi.Transaction {
 | 
				
			|||||||
  descendants?: Ancestor[];
 | 
					  descendants?: Ancestor[];
 | 
				
			||||||
  bestDescendant?: BestDescendant | null;
 | 
					  bestDescendant?: BestDescendant | null;
 | 
				
			||||||
  cpfpChecked?: boolean;
 | 
					  cpfpChecked?: boolean;
 | 
				
			||||||
  deleteAfter?: number;
 | 
					 | 
				
			||||||
  position?: {
 | 
					  position?: {
 | 
				
			||||||
    block: number,
 | 
					    block: number,
 | 
				
			||||||
    vsize: number,
 | 
					    vsize: number,
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user