Merge pull request #4271 from mempool/mononaut/refactor-task-scheduler
Refactor indexer scheduling to avoid accumulating identical tasks
This commit is contained in:
		
						commit
						3a8d19062f
					
				@ -776,9 +776,7 @@ class Blocks {
 | 
			
		||||
            this.updateTimerProgress(timer, `saved prices for ${this.currentBlockHeight}`);
 | 
			
		||||
          } else {
 | 
			
		||||
            logger.debug(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`, logger.tags.mining);
 | 
			
		||||
            setTimeout(() => {
 | 
			
		||||
              indexer.runSingleTask('blocksPrices');
 | 
			
		||||
            }, 10000);
 | 
			
		||||
            indexer.scheduleSingleTask('blocksPrices', 10000);
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          // Save blocks summary for visualization if it's enabled
 | 
			
		||||
 | 
			
		||||
@ -206,7 +206,7 @@ class Server {
 | 
			
		||||
      }
 | 
			
		||||
      const newMempool = await bitcoinApi.$getRawMempool();
 | 
			
		||||
      const numHandledBlocks = await blocks.$updateBlocks();
 | 
			
		||||
      const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1);
 | 
			
		||||
      const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerIsRunning() ? 10 : 1);
 | 
			
		||||
      if (numHandledBlocks === 0) {
 | 
			
		||||
        await memPool.$updateMempool(newMempool, pollRate);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -15,11 +15,18 @@ export interface CoreIndex {
 | 
			
		||||
  best_block_height: number;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type TaskName = 'blocksPrices' | 'coinStatsIndex';
 | 
			
		||||
 | 
			
		||||
class Indexer {
 | 
			
		||||
  runIndexer = true;
 | 
			
		||||
  indexerRunning = false;
 | 
			
		||||
  tasksRunning: string[] = [];
 | 
			
		||||
  coreIndexes: CoreIndex[] = [];
 | 
			
		||||
  private runIndexer = true;
 | 
			
		||||
  private indexerRunning = false;
 | 
			
		||||
  private tasksRunning: { [key in TaskName]?: boolean; } = {};
 | 
			
		||||
  private tasksScheduled: { [key in TaskName]?: NodeJS.Timeout; } = {};
 | 
			
		||||
  private coreIndexes: CoreIndex[] = [];
 | 
			
		||||
 | 
			
		||||
  public indexerIsRunning(): boolean {
 | 
			
		||||
    return this.indexerRunning;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Check which core index is available for indexing
 | 
			
		||||
@ -69,38 +76,69 @@ class Indexer {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async runSingleTask(task: 'blocksPrices' | 'coinStatsIndex'): Promise<void> {
 | 
			
		||||
    if (!Common.indexingEnabled()) {
 | 
			
		||||
  /**
 | 
			
		||||
   * schedules a single task to run in `timeout` ms
 | 
			
		||||
   * only one task of each type may be scheduled
 | 
			
		||||
   *
 | 
			
		||||
   * @param {TaskName} task - the type of task
 | 
			
		||||
   * @param {number} timeout - delay in ms
 | 
			
		||||
   * @param {boolean} replace - `true` replaces any already scheduled task (works like a debounce), `false` ignores subsequent requests (works like a throttle)
 | 
			
		||||
   */
 | 
			
		||||
  public scheduleSingleTask(task: TaskName, timeout: number = 10000, replace = false): void {
 | 
			
		||||
    if (this.tasksScheduled[task]) {
 | 
			
		||||
      if (!replace) { //throttle
 | 
			
		||||
        return;
 | 
			
		||||
      } else { // debounce
 | 
			
		||||
        clearTimeout(this.tasksScheduled[task]);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    this.tasksScheduled[task] = setTimeout(async () => {
 | 
			
		||||
      try {
 | 
			
		||||
        await this.runSingleTask(task);
 | 
			
		||||
      } catch (e) {
 | 
			
		||||
        logger.err(`Unexpected error in scheduled task ${task}: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
      } finally {
 | 
			
		||||
        clearTimeout(this.tasksScheduled[task]);
 | 
			
		||||
      }
 | 
			
		||||
    }, timeout);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Runs a single task immediately
 | 
			
		||||
   *
 | 
			
		||||
   * (use `scheduleSingleTask` instead to queue a task to run after some timeout)
 | 
			
		||||
   */
 | 
			
		||||
  public async runSingleTask(task: TaskName): Promise<void> {
 | 
			
		||||
    if (!Common.indexingEnabled() || this.tasksRunning[task]) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    this.tasksRunning[task] = true;
 | 
			
		||||
 | 
			
		||||
    if (task === 'blocksPrices' && !this.tasksRunning.includes(task) && !['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) {
 | 
			
		||||
      this.tasksRunning.push(task);
 | 
			
		||||
      let lastestPriceId;
 | 
			
		||||
      try {
 | 
			
		||||
        lastestPriceId = await PricesRepository.$getLatestPriceId();
 | 
			
		||||
      } catch (e) {
 | 
			
		||||
        logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e));
 | 
			
		||||
      }
 | 
			
		||||
      if (priceUpdater.historyInserted === false || lastestPriceId === null) {
 | 
			
		||||
        logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining);
 | 
			
		||||
        setTimeout(() => {
 | 
			
		||||
          this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
 | 
			
		||||
          this.runSingleTask('blocksPrices');
 | 
			
		||||
        }, 10000);
 | 
			
		||||
      } else {
 | 
			
		||||
        logger.debug(`Blocks prices indexer will run now`, logger.tags.mining);
 | 
			
		||||
        await mining.$indexBlockPrices();
 | 
			
		||||
        this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
 | 
			
		||||
      }
 | 
			
		||||
    switch (task) {
 | 
			
		||||
      case 'blocksPrices': {
 | 
			
		||||
        if (!['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) {
 | 
			
		||||
          let lastestPriceId;
 | 
			
		||||
          try {
 | 
			
		||||
            lastestPriceId = await PricesRepository.$getLatestPriceId();
 | 
			
		||||
          } catch (e) {
 | 
			
		||||
            logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e));
 | 
			
		||||
          }          if (priceUpdater.historyInserted === false || lastestPriceId === null) {
 | 
			
		||||
            logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining);
 | 
			
		||||
            this.scheduleSingleTask(task, 10000);
 | 
			
		||||
          } else {
 | 
			
		||||
            logger.debug(`Blocks prices indexer will run now`, logger.tags.mining);
 | 
			
		||||
            await mining.$indexBlockPrices();
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } break;
 | 
			
		||||
 | 
			
		||||
      case 'coinStatsIndex': {
 | 
			
		||||
        logger.debug(`Indexing coinStatsIndex now`);
 | 
			
		||||
        await mining.$indexCoinStatsIndex();
 | 
			
		||||
      } break;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (task === 'coinStatsIndex' && !this.tasksRunning.includes(task)) {
 | 
			
		||||
      this.tasksRunning.push(task);
 | 
			
		||||
      logger.debug(`Indexing coinStatsIndex now`);
 | 
			
		||||
      await mining.$indexCoinStatsIndex();
 | 
			
		||||
      this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
 | 
			
		||||
    }
 | 
			
		||||
    this.tasksRunning[task] = false;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $run(): Promise<void> {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user