detect and log stall in main loop
This commit is contained in:
		
							parent
							
								
									f61f520a4b
								
							
						
					
					
						commit
						95df317f56
					
				@ -36,6 +36,8 @@ class Blocks {
 | 
			
		||||
  private newBlockCallbacks: ((block: BlockExtended, txIds: string[], transactions: TransactionExtended[]) => void)[] = [];
 | 
			
		||||
  private newAsyncBlockCallbacks: ((block: BlockExtended, txIds: string[], transactions: TransactionExtended[]) => Promise<void>)[] = [];
 | 
			
		||||
 | 
			
		||||
  private mainLoopTimeout: number = 120000;
 | 
			
		||||
 | 
			
		||||
  constructor() { }
 | 
			
		||||
 | 
			
		||||
  public getBlocks(): BlockExtended[] {
 | 
			
		||||
@ -528,8 +530,12 @@ class Blocks {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $updateBlocks() {
 | 
			
		||||
    // warn if this run stalls the main loop for more than 2 minutes
 | 
			
		||||
    const timer = this.startTimer();
 | 
			
		||||
 | 
			
		||||
    let fastForwarded = false;
 | 
			
		||||
    const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
 | 
			
		||||
    this.updateTimerProgress(timer, 'got block height tip');
 | 
			
		||||
 | 
			
		||||
    if (this.blocks.length === 0) {
 | 
			
		||||
      this.currentBlockHeight = Math.max(blockHeightTip - config.MEMPOOL.INITIAL_BLOCKS_AMOUNT, -1);
 | 
			
		||||
@ -547,16 +553,21 @@ class Blocks {
 | 
			
		||||
 | 
			
		||||
    if (!this.lastDifficultyAdjustmentTime) {
 | 
			
		||||
      const blockchainInfo = await bitcoinClient.getBlockchainInfo();
 | 
			
		||||
      this.updateTimerProgress(timer, 'got blockchain info for initial difficulty adjustment');
 | 
			
		||||
      if (blockchainInfo.blocks === blockchainInfo.headers) {
 | 
			
		||||
        const heightDiff = blockHeightTip % 2016;
 | 
			
		||||
        const blockHash = await bitcoinApi.$getBlockHash(blockHeightTip - heightDiff);
 | 
			
		||||
        this.updateTimerProgress(timer, 'got block hash for initial difficulty adjustment');
 | 
			
		||||
        const block: IEsploraApi.Block = await bitcoinCoreApi.$getBlock(blockHash);
 | 
			
		||||
        this.updateTimerProgress(timer, 'got block for initial difficulty adjustment');
 | 
			
		||||
        this.lastDifficultyAdjustmentTime = block.timestamp;
 | 
			
		||||
        this.currentDifficulty = block.difficulty;
 | 
			
		||||
 | 
			
		||||
        if (blockHeightTip >= 2016) {
 | 
			
		||||
          const previousPeriodBlockHash = await bitcoinApi.$getBlockHash(blockHeightTip - heightDiff - 2016);
 | 
			
		||||
          this.updateTimerProgress(timer, 'got previous block hash for initial difficulty adjustment');
 | 
			
		||||
          const previousPeriodBlock: IEsploraApi.Block = await bitcoinCoreApi.$getBlock(previousPeriodBlockHash);
 | 
			
		||||
          this.updateTimerProgress(timer, 'got previous block for initial difficulty adjustment');
 | 
			
		||||
          this.previousDifficultyRetarget = (block.difficulty - previousPeriodBlock.difficulty) / previousPeriodBlock.difficulty * 100;
 | 
			
		||||
          logger.debug(`Initial difficulty adjustment data set.`);
 | 
			
		||||
        }
 | 
			
		||||
@ -571,9 +582,11 @@ class Blocks {
 | 
			
		||||
      } else {
 | 
			
		||||
        this.currentBlockHeight++;
 | 
			
		||||
        logger.debug(`New block found (#${this.currentBlockHeight})!`);
 | 
			
		||||
        this.updateTimerProgress(timer, `getting orphaned blocks for ${this.currentBlockHeight}`);
 | 
			
		||||
        await chainTips.updateOrphanedBlocks();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      this.updateTimerProgress(timer, `getting block data for ${this.currentBlockHeight}`);
 | 
			
		||||
      const blockHash = await bitcoinApi.$getBlockHash(this.currentBlockHeight);
 | 
			
		||||
      const verboseBlock = await bitcoinClient.getBlock(blockHash, 2);
 | 
			
		||||
      const block = BitcoinApi.convertBlock(verboseBlock);
 | 
			
		||||
@ -582,39 +595,51 @@ class Blocks {
 | 
			
		||||
      const cpfpSummary: CpfpSummary = Common.calculateCpfp(block.height, transactions);
 | 
			
		||||
      const blockExtended: BlockExtended = await this.$getBlockExtended(block, cpfpSummary.transactions);
 | 
			
		||||
      const blockSummary: BlockSummary = this.summarizeBlock(verboseBlock);
 | 
			
		||||
      this.updateTimerProgress(timer, `got block data for ${this.currentBlockHeight}`);
 | 
			
		||||
 | 
			
		||||
      // start async callbacks
 | 
			
		||||
      this.updateTimerProgress(timer, `starting async callbacks for ${this.currentBlockHeight}`);
 | 
			
		||||
      const callbackPromises = this.newAsyncBlockCallbacks.map((cb) => cb(blockExtended, txIds, transactions));
 | 
			
		||||
 | 
			
		||||
      if (Common.indexingEnabled()) {
 | 
			
		||||
        if (!fastForwarded) {
 | 
			
		||||
          const lastBlock = await blocksRepository.$getBlockByHeight(blockExtended.height - 1);
 | 
			
		||||
          this.updateTimerProgress(timer, `got block by height for ${this.currentBlockHeight}`);
 | 
			
		||||
          if (lastBlock !== null && blockExtended.previousblockhash !== lastBlock.id) {
 | 
			
		||||
            logger.warn(`Chain divergence detected at block ${lastBlock.height}, re-indexing most recent data`, logger.tags.mining);
 | 
			
		||||
            // We assume there won't be a reorg with more than 10 block depth
 | 
			
		||||
            this.updateTimerProgress(timer, `rolling back diverged chain from ${this.currentBlockHeight}`);
 | 
			
		||||
            await BlocksRepository.$deleteBlocksFrom(lastBlock.height - 10);
 | 
			
		||||
            await HashratesRepository.$deleteLastEntries();
 | 
			
		||||
            await cpfpRepository.$deleteClustersFrom(lastBlock.height - 10);
 | 
			
		||||
            this.updateTimerProgress(timer, `rolled back chain divergence from ${this.currentBlockHeight}`);
 | 
			
		||||
            for (let i = 10; i >= 0; --i) {
 | 
			
		||||
              const newBlock = await this.$indexBlock(lastBlock.height - i);
 | 
			
		||||
              this.updateTimerProgress(timer, `reindexed block`);
 | 
			
		||||
              await this.$getStrippedBlockTransactions(newBlock.id, true, true);
 | 
			
		||||
              this.updateTimerProgress(timer, `reindexed block summary`);
 | 
			
		||||
              if (config.MEMPOOL.CPFP_INDEXING) {
 | 
			
		||||
                await this.$indexCPFP(newBlock.id, lastBlock.height - i);
 | 
			
		||||
                this.updateTimerProgress(timer, `reindexed block cpfp`);
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
            await mining.$indexDifficultyAdjustments();
 | 
			
		||||
            await DifficultyAdjustmentsRepository.$deleteLastAdjustment();
 | 
			
		||||
            this.updateTimerProgress(timer, `reindexed difficulty adjustments`);
 | 
			
		||||
            logger.info(`Re-indexed 10 blocks and summaries. Also re-indexed the last difficulty adjustments. Will re-index latest hashrates in a few seconds.`, logger.tags.mining);
 | 
			
		||||
            indexer.reindex();
 | 
			
		||||
          }
 | 
			
		||||
          await blocksRepository.$saveBlockInDatabase(blockExtended);
 | 
			
		||||
          this.updateTimerProgress(timer, `saved ${this.currentBlockHeight} to database`);
 | 
			
		||||
 | 
			
		||||
          const lastestPriceId = await PricesRepository.$getLatestPriceId();
 | 
			
		||||
          this.updateTimerProgress(timer, `got latest price id ${this.currentBlockHeight}`);
 | 
			
		||||
          if (priceUpdater.historyInserted === true && lastestPriceId !== null) {
 | 
			
		||||
            await blocksRepository.$saveBlockPrices([{
 | 
			
		||||
              height: blockExtended.height,
 | 
			
		||||
              priceId: lastestPriceId,
 | 
			
		||||
            }]);
 | 
			
		||||
            this.updateTimerProgress(timer, `saved prices for ${this.currentBlockHeight}`);
 | 
			
		||||
          } else {
 | 
			
		||||
            logger.debug(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`, logger.tags.mining);
 | 
			
		||||
            setTimeout(() => {
 | 
			
		||||
@ -625,9 +650,11 @@ class Blocks {
 | 
			
		||||
          // Save blocks summary for visualization if it's enabled
 | 
			
		||||
          if (Common.blocksSummariesIndexingEnabled() === true) {
 | 
			
		||||
            await this.$getStrippedBlockTransactions(blockExtended.id, true);
 | 
			
		||||
            this.updateTimerProgress(timer, `saved block summary for ${this.currentBlockHeight}`);
 | 
			
		||||
          }
 | 
			
		||||
          if (config.MEMPOOL.CPFP_INDEXING) {
 | 
			
		||||
            this.$saveCpfp(blockExtended.id, this.currentBlockHeight, cpfpSummary);
 | 
			
		||||
            this.updateTimerProgress(timer, `saved cpfp for ${this.currentBlockHeight}`);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
@ -640,6 +667,7 @@ class Blocks {
 | 
			
		||||
            difficulty: block.difficulty,
 | 
			
		||||
            adjustment: Math.round((block.difficulty / this.currentDifficulty) * 1000000) / 1000000, // Remove float point noise
 | 
			
		||||
          });
 | 
			
		||||
          this.updateTimerProgress(timer, `saved difficulty adjustment for ${this.currentBlockHeight}`);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        this.previousDifficultyRetarget = (block.difficulty - this.currentDifficulty) / this.currentDifficulty * 100;
 | 
			
		||||
@ -664,7 +692,33 @@ class Blocks {
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // wait for pending async callbacks to finish
 | 
			
		||||
      this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
 | 
			
		||||
      await Promise.all(callbackPromises);
 | 
			
		||||
      this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.clearTimer(timer);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private startTimer() {
 | 
			
		||||
    const state: any = {
 | 
			
		||||
      start: Date.now(),
 | 
			
		||||
      progress: 'begin $updateBlocks',
 | 
			
		||||
      timer: null,
 | 
			
		||||
    };
 | 
			
		||||
    state.timer = setTimeout(() => {
 | 
			
		||||
      logger.err(`$updateBlocks stalled at "${state.progress}`);
 | 
			
		||||
    }, this.mainLoopTimeout);
 | 
			
		||||
    return state;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateTimerProgress(state, msg) {
 | 
			
		||||
    state.progress = msg;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private clearTimer(state) {
 | 
			
		||||
    if (state.timer) {
 | 
			
		||||
      clearTimeout(state.timer);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -36,6 +36,8 @@ class Mempool {
 | 
			
		||||
  private timer = new Date().getTime();
 | 
			
		||||
  private missingTxCount = 0;
 | 
			
		||||
 | 
			
		||||
  private mainLoopTimeout: number = 120000;
 | 
			
		||||
 | 
			
		||||
  constructor() {
 | 
			
		||||
    setInterval(this.updateTxPerSecond.bind(this), 1000);
 | 
			
		||||
  }
 | 
			
		||||
@ -119,10 +121,15 @@ class Mempool {
 | 
			
		||||
 | 
			
		||||
  public async $updateMempool(): Promise<void> {
 | 
			
		||||
    logger.debug(`Updating mempool...`);
 | 
			
		||||
 | 
			
		||||
    // warn if this run stalls the main loop for more than 2 minutes
 | 
			
		||||
    const timer = this.startTimer();
 | 
			
		||||
 | 
			
		||||
    const start = new Date().getTime();
 | 
			
		||||
    let hasChange: boolean = false;
 | 
			
		||||
    const currentMempoolSize = Object.keys(this.mempoolCache).length;
 | 
			
		||||
    const transactions = await bitcoinApi.$getRawMempool();
 | 
			
		||||
    this.updateTimerProgress(timer, 'got raw mempool');
 | 
			
		||||
    const diff = transactions.length - currentMempoolSize;
 | 
			
		||||
    const newTransactions: TransactionExtended[] = [];
 | 
			
		||||
 | 
			
		||||
@ -146,6 +153,7 @@ class Mempool {
 | 
			
		||||
      if (!this.mempoolCache[txid]) {
 | 
			
		||||
        try {
 | 
			
		||||
          const transaction = await transactionUtils.$getTransactionExtended(txid);
 | 
			
		||||
          this.updateTimerProgress(timer, 'fetched new transaction');
 | 
			
		||||
          this.mempoolCache[txid] = transaction;
 | 
			
		||||
          if (this.inSync) {
 | 
			
		||||
            this.txPerSecondArray.push(new Date().getTime());
 | 
			
		||||
@ -223,12 +231,38 @@ class Mempool {
 | 
			
		||||
      this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) {
 | 
			
		||||
      this.updateTimerProgress(timer, 'running async mempool callback');
 | 
			
		||||
      await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions);
 | 
			
		||||
      this.updateTimerProgress(timer, 'completed async mempool callback');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const end = new Date().getTime();
 | 
			
		||||
    const time = end - start;
 | 
			
		||||
    logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`);
 | 
			
		||||
 | 
			
		||||
    this.clearTimer(timer);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private startTimer() {
 | 
			
		||||
    const state: any = {
 | 
			
		||||
      start: Date.now(),
 | 
			
		||||
      progress: 'begin $updateMempool',
 | 
			
		||||
      timer: null,
 | 
			
		||||
    };
 | 
			
		||||
    state.timer = setTimeout(() => {
 | 
			
		||||
      logger.err(`$updateMempool stalled at "${state.progress}`);
 | 
			
		||||
    }, this.mainLoopTimeout);
 | 
			
		||||
    return state;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateTimerProgress(state, msg) {
 | 
			
		||||
    state.progress = msg;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private clearTimer(state) {
 | 
			
		||||
    if (state.timer) {
 | 
			
		||||
      clearTimeout(state.timer);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public handleRbfTransactions(rbfTransactions: { [txid: string]: TransactionExtended; }) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user