Merge pull request #3560 from mempool/mononaut/missing-tx-bug
Fix thread inconsistency / lazy deletion race condition bugs
This commit is contained in:
		
						commit
						4c4a91ae95
					
				@ -1,4 +1,5 @@
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { TransactionExtended, MempoolBlockWithTransactions } from '../mempool.interfaces';
 | 
			
		||||
 | 
			
		||||
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners
 | 
			
		||||
@ -39,17 +40,19 @@ class Audit {
 | 
			
		||||
        } else {
 | 
			
		||||
          isCensored[txid] = true;
 | 
			
		||||
        }
 | 
			
		||||
        displacedWeight += mempool[txid].weight;
 | 
			
		||||
        displacedWeight += mempool[txid]?.weight || 0;
 | 
			
		||||
      } else {
 | 
			
		||||
        matchedWeight += mempool[txid].weight;
 | 
			
		||||
        matchedWeight += mempool[txid]?.weight || 0;
 | 
			
		||||
      }
 | 
			
		||||
      projectedWeight += mempool[txid].weight;
 | 
			
		||||
      projectedWeight += mempool[txid]?.weight || 0;
 | 
			
		||||
      inTemplate[txid] = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (transactions[0]) {
 | 
			
		||||
      displacedWeight += (4000 - transactions[0].weight);
 | 
			
		||||
      projectedWeight += transactions[0].weight;
 | 
			
		||||
      matchedWeight += transactions[0].weight;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // we can expect an honest miner to include 'displaced' transactions in place of recent arrivals and censored txs
 | 
			
		||||
    // these displaced transactions should occupy the first N weight units of the next projected block
 | 
			
		||||
@ -59,20 +62,25 @@ class Audit {
 | 
			
		||||
    let failures = 0;
 | 
			
		||||
    while (projectedBlocks[1] && index < projectedBlocks[1].transactionIds.length && failures < 500) {
 | 
			
		||||
      const txid = projectedBlocks[1].transactionIds[index];
 | 
			
		||||
      const fits = (mempool[txid].weight - displacedWeightRemaining) < 4000;
 | 
			
		||||
      const feeMatches = mempool[txid].effectiveFeePerVsize >= lastFeeRate;
 | 
			
		||||
      const tx = mempool[txid];
 | 
			
		||||
      if (tx) {
 | 
			
		||||
        const fits = (tx.weight - displacedWeightRemaining) < 4000;
 | 
			
		||||
        const feeMatches = tx.effectiveFeePerVsize >= lastFeeRate;
 | 
			
		||||
        if (fits || feeMatches) {
 | 
			
		||||
          isDisplaced[txid] = true;
 | 
			
		||||
          if (fits) {
 | 
			
		||||
          lastFeeRate = Math.min(lastFeeRate, mempool[txid].effectiveFeePerVsize);
 | 
			
		||||
            lastFeeRate = Math.min(lastFeeRate, tx.effectiveFeePerVsize);
 | 
			
		||||
          }
 | 
			
		||||
        if (mempool[txid].firstSeen == null || (now - (mempool[txid]?.firstSeen || 0)) > PROPAGATION_MARGIN) {
 | 
			
		||||
          displacedWeightRemaining -= mempool[txid].weight;
 | 
			
		||||
          if (tx.firstSeen == null || (now - (tx?.firstSeen || 0)) > PROPAGATION_MARGIN) {
 | 
			
		||||
            displacedWeightRemaining -= tx.weight;
 | 
			
		||||
          }
 | 
			
		||||
          failures = 0;
 | 
			
		||||
        } else {
 | 
			
		||||
          failures++;
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        logger.warn('projected transaction missing from mempool cache');
 | 
			
		||||
      }
 | 
			
		||||
      index++;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -108,20 +116,25 @@ class Audit {
 | 
			
		||||
    index = projectedBlocks[0].transactionIds.length - 1;
 | 
			
		||||
    while (index >= 0) {
 | 
			
		||||
      const txid = projectedBlocks[0].transactionIds[index];
 | 
			
		||||
      const tx = mempool[txid];
 | 
			
		||||
      if (tx) {
 | 
			
		||||
        if (overflowWeightRemaining > 0) {
 | 
			
		||||
          if (isCensored[txid]) {
 | 
			
		||||
            delete isCensored[txid];
 | 
			
		||||
          }
 | 
			
		||||
        if (mempool[txid].effectiveFeePerVsize > maxOverflowRate) {
 | 
			
		||||
          maxOverflowRate = mempool[txid].effectiveFeePerVsize;
 | 
			
		||||
          if (tx.effectiveFeePerVsize > maxOverflowRate) {
 | 
			
		||||
            maxOverflowRate = tx.effectiveFeePerVsize;
 | 
			
		||||
            rateThreshold = (Math.ceil(maxOverflowRate * 100) / 100) + 0.005;
 | 
			
		||||
          }
 | 
			
		||||
      } else if (mempool[txid].effectiveFeePerVsize <= rateThreshold) { // tolerance of 0.01 sat/vb + rounding
 | 
			
		||||
        } else if (tx.effectiveFeePerVsize <= rateThreshold) { // tolerance of 0.01 sat/vb + rounding
 | 
			
		||||
          if (isCensored[txid]) {
 | 
			
		||||
            delete isCensored[txid];
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        overflowWeightRemaining -= (mempool[txid]?.weight || 0);
 | 
			
		||||
      } else {
 | 
			
		||||
        logger.warn('projected transaction missing from mempool cache');
 | 
			
		||||
      }
 | 
			
		||||
      index--;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -43,8 +43,10 @@ class DiskCache {
 | 
			
		||||
      const mempool = memPool.getMempool();
 | 
			
		||||
      const mempoolArray: TransactionExtended[] = [];
 | 
			
		||||
      for (const tx in mempool) {
 | 
			
		||||
        if (mempool[tx] && !mempool[tx].deleteAfter) {
 | 
			
		||||
          mempoolArray.push(mempool[tx]);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      Common.shuffleArray(mempoolArray);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -151,7 +151,7 @@ class MempoolBlocks {
 | 
			
		||||
    // prepare a stripped down version of the mempool with only the minimum necessary data
 | 
			
		||||
    // to reduce the overhead of passing this data to the worker thread
 | 
			
		||||
    const strippedMempool: { [txid: string]: ThreadTransaction } = {};
 | 
			
		||||
    Object.values(newMempool).forEach(entry => {
 | 
			
		||||
    Object.values(newMempool).filter(tx => !tx.deleteAfter).forEach(entry => {
 | 
			
		||||
      strippedMempool[entry.txid] = {
 | 
			
		||||
        txid: entry.txid,
 | 
			
		||||
        fee: entry.fee,
 | 
			
		||||
@ -186,7 +186,14 @@ class MempoolBlocks {
 | 
			
		||||
        this.txSelectionWorker?.once('error', reject);
 | 
			
		||||
      });
 | 
			
		||||
      this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
 | 
			
		||||
      const { blocks, clusters } = await workerResultPromise;
 | 
			
		||||
      let { blocks, clusters } = await workerResultPromise;
 | 
			
		||||
      // filter out stale transactions
 | 
			
		||||
      const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
 | 
			
		||||
      blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool)));
 | 
			
		||||
      const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
 | 
			
		||||
      if (filteredCount < unfilteredCount) {
 | 
			
		||||
        logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from makeBlockTemplates`);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // clean up thread error listener
 | 
			
		||||
      this.txSelectionWorker?.removeListener('error', threadErrorListener);
 | 
			
		||||
@ -228,7 +235,14 @@ class MempoolBlocks {
 | 
			
		||||
        this.txSelectionWorker?.once('error', reject);
 | 
			
		||||
      });
 | 
			
		||||
      this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
 | 
			
		||||
      const { blocks, clusters } = await workerResultPromise;
 | 
			
		||||
      let { blocks, clusters } = await workerResultPromise;
 | 
			
		||||
      // filter out stale transactions
 | 
			
		||||
      const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
 | 
			
		||||
      blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool)));
 | 
			
		||||
      const filteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
 | 
			
		||||
      if (filteredCount < unfilteredCount) {
 | 
			
		||||
        logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from updateBlockTemplates`);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // clean up thread error listener
 | 
			
		||||
      this.txSelectionWorker?.removeListener('error', threadErrorListener);
 | 
			
		||||
@ -243,7 +257,7 @@ class MempoolBlocks {
 | 
			
		||||
    // update this thread's mempool with the results
 | 
			
		||||
    blocks.forEach(block => {
 | 
			
		||||
      block.forEach(tx => {
 | 
			
		||||
        if (tx.txid in mempool) {
 | 
			
		||||
        if (tx.txid && tx.txid in mempool) {
 | 
			
		||||
          if (tx.effectiveFeePerVsize != null) {
 | 
			
		||||
            mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize;
 | 
			
		||||
          }
 | 
			
		||||
@ -253,6 +267,10 @@ class MempoolBlocks {
 | 
			
		||||
            const cluster = clusters[tx.cpfpRoot];
 | 
			
		||||
            let matched = false;
 | 
			
		||||
            cluster.forEach(txid => {
 | 
			
		||||
              if (!txid || !mempool[txid]) {
 | 
			
		||||
                logger.warn('projected transaction ancestor missing from mempool cache');
 | 
			
		||||
                return;
 | 
			
		||||
              }
 | 
			
		||||
              if (txid === tx.txid) {
 | 
			
		||||
                matched = true;
 | 
			
		||||
              } else {
 | 
			
		||||
@ -273,6 +291,8 @@ class MempoolBlocks {
 | 
			
		||||
            mempool[tx.txid].bestDescendant = null;
 | 
			
		||||
          }
 | 
			
		||||
          mempool[tx.txid].cpfpChecked = tx.cpfpChecked;
 | 
			
		||||
        } else {
 | 
			
		||||
          logger.warn('projected transaction missing from mempool cache');
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
@ -38,7 +38,6 @@ class Mempool {
 | 
			
		||||
 | 
			
		||||
  constructor() {
 | 
			
		||||
    setInterval(this.updateTxPerSecond.bind(this), 1000);
 | 
			
		||||
    setInterval(this.deleteExpiredTransactions.bind(this), 20000);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
@ -256,7 +255,7 @@ class Mempool {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private deleteExpiredTransactions() {
 | 
			
		||||
  public deleteExpiredTransactions() {
 | 
			
		||||
    const now = new Date().getTime();
 | 
			
		||||
    for (const tx in this.mempoolCache) {
 | 
			
		||||
      const lazyDeleteAt = this.mempoolCache[tx].deleteAfter;
 | 
			
		||||
 | 
			
		||||
@ -178,6 +178,7 @@ class Server {
 | 
			
		||||
          logger.debug(msg);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      memPool.deleteExpiredTransactions();
 | 
			
		||||
      await blocks.$updateBlocks();
 | 
			
		||||
      await memPool.$updateMempool();
 | 
			
		||||
      indexer.$run();
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user