diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 730e603f3..6b4a14a0e 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -776,9 +776,7 @@ class Blocks { this.updateTimerProgress(timer, `saved prices for ${this.currentBlockHeight}`); } else { logger.debug(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`, logger.tags.mining); - setTimeout(() => { - indexer.runSingleTask('blocksPrices'); - }, 10000); + indexer.scheduleSingleTask('blocksPrices', 10000); } // Save blocks summary for visualization if it's enabled diff --git a/backend/src/index.ts b/backend/src/index.ts index 039aad8af..0c28df0a8 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -206,7 +206,7 @@ class Server { } const newMempool = await bitcoinApi.$getRawMempool(); const numHandledBlocks = await blocks.$updateBlocks(); - const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1); + const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerIsRunning() ? 10 : 1); if (numHandledBlocks === 0) { await memPool.$updateMempool(newMempool, pollRate); } diff --git a/backend/src/indexer.ts b/backend/src/indexer.ts index add0abca3..2e2f8b037 100644 --- a/backend/src/indexer.ts +++ b/backend/src/indexer.ts @@ -15,11 +15,18 @@ export interface CoreIndex { best_block_height: number; } +type TaskName = 'blocksPrices' | 'coinStatsIndex'; + class Indexer { - runIndexer = true; - indexerRunning = false; - tasksRunning: string[] = []; - coreIndexes: CoreIndex[] = []; + private runIndexer = true; + private indexerRunning = false; + private tasksRunning: { [key in TaskName]?: boolean; } = {}; + private tasksScheduled: { [key in TaskName]?: NodeJS.Timeout; } = {}; + private coreIndexes: CoreIndex[] = []; + + public indexerIsRunning(): boolean { + return this.indexerRunning; + } /** * Check which core index is available for indexing @@ -69,38 +76,69 @@ class Indexer { } } - public async runSingleTask(task: 'blocksPrices' | 'coinStatsIndex'): Promise { - if (!Common.indexingEnabled()) { + /** + * schedules a single task to run in `timeout` ms + * only one task of each type may be scheduled + * + * @param {TaskName} task - the type of task + * @param {number} timeout - delay in ms + * @param {boolean} replace - `true` replaces any already scheduled task (works like a debounce), `false` ignores subsequent requests (works like a throttle) + */ + public scheduleSingleTask(task: TaskName, timeout: number = 10000, replace = false): void { + if (this.tasksScheduled[task]) { + if (!replace) { //throttle + return; + } else { // debounce + clearTimeout(this.tasksScheduled[task]); + } + } + this.tasksScheduled[task] = setTimeout(async () => { + try { + await this.runSingleTask(task); + } catch (e) { + logger.err(`Unexpected error in scheduled task ${task}: ` + (e instanceof Error ? e.message : e)); + } finally { + clearTimeout(this.tasksScheduled[task]); + } + }, timeout); + } + + /** + * Runs a single task immediately + * + * (use `scheduleSingleTask` instead to queue a task to run after some timeout) + */ + public async runSingleTask(task: TaskName): Promise { + if (!Common.indexingEnabled() || this.tasksRunning[task]) { return; } + this.tasksRunning[task] = true; - if (task === 'blocksPrices' && !this.tasksRunning.includes(task) && !['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) { - this.tasksRunning.push(task); - let lastestPriceId; - try { - lastestPriceId = await PricesRepository.$getLatestPriceId(); - } catch (e) { - logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e)); - } - if (priceUpdater.historyInserted === false || lastestPriceId === null) { - logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining); - setTimeout(() => { - this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task); - this.runSingleTask('blocksPrices'); - }, 10000); - } else { - logger.debug(`Blocks prices indexer will run now`, logger.tags.mining); - await mining.$indexBlockPrices(); - this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task); - } + switch (task) { + case 'blocksPrices': { + if (!['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) { + let lastestPriceId; + try { + lastestPriceId = await PricesRepository.$getLatestPriceId(); + } catch (e) { + logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e)); + } if (priceUpdater.historyInserted === false || lastestPriceId === null) { + logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining); + this.scheduleSingleTask(task, 10000); + } else { + logger.debug(`Blocks prices indexer will run now`, logger.tags.mining); + await mining.$indexBlockPrices(); + } + } + } break; + + case 'coinStatsIndex': { + logger.debug(`Indexing coinStatsIndex now`); + await mining.$indexCoinStatsIndex(); + } break; } - if (task === 'coinStatsIndex' && !this.tasksRunning.includes(task)) { - this.tasksRunning.push(task); - logger.debug(`Indexing coinStatsIndex now`); - await mining.$indexCoinStatsIndex(); - this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task); - } + this.tasksRunning[task] = false; } public async $run(): Promise {