From df596ab5bfe91c001b348cc2deee60afe5af948d Mon Sep 17 00:00:00 2001 From: Mononaut Date: Wed, 2 Aug 2023 20:04:29 +0900 Subject: [PATCH] Parallelize block indexing with worker threads --- backend/src/api/blocks.ts | 91 ++++++++++++++++++----- backend/src/index-workers/block-worker.ts | 25 +++++++ 2 files changed, 96 insertions(+), 20 deletions(-) create mode 100644 backend/src/index-workers/block-worker.ts diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 73b010b91..79612d1d5 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -29,6 +29,10 @@ import websocketHandler from './websocket-handler'; import redisCache from './redis-cache'; import rbfCache from './rbf-cache'; import { calcBitsDifference } from './difficulty-adjustment'; +import os from 'os'; +import { Worker } from 'worker_threads'; +import path from 'path'; + class Blocks { private blocks: BlockExtended[] = []; @@ -557,6 +561,7 @@ class Blocks { * [INDEXING] Index all blocks metadata for the mining dashboard */ public async $generateBlockDatabase(): Promise { + const workerPool: Worker[] = []; try { const blockchainInfo = await bitcoinClient.getBlockchainInfo(); let currentBlockHeight = blockchainInfo.blocks; @@ -575,12 +580,18 @@ class Blocks { let totalIndexed = await blocksRepository.$blockCountBetweenHeight(currentBlockHeight, lastBlockToIndex); let indexedThisRun = 0; let newlyIndexed = 0; - const startedAt = Date.now() / 1000; - let timer = Date.now() / 1000; + const startedAt = Date.now(); + let timer = Date.now(); + + if (currentBlockHeight >= lastBlockToIndex) { + const numWorkers = Math.max(1, os.cpus().length - 1); + for (let i = 0; i < numWorkers; i++) { + workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-worker.js'))); + } + } while (currentBlockHeight >= lastBlockToIndex) { const endBlock = Math.max(0, lastBlockToIndex, currentBlockHeight - chunkSize + 1); - const missingBlockHeights: number[] = await blocksRepository.$getMissingBlocksBetweenHeights( currentBlockHeight, endBlock); if (missingBlockHeights.length <= 0) { @@ -590,33 +601,65 @@ class Blocks { logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining); - for (const blockHeight of missingBlockHeights) { - if (blockHeight < lastBlockToIndex) { - break; + const promises: Promise[] = []; + + // This function assigns a task to a worker + const assignTask = (worker: Worker): boolean => { + if (missingBlockHeights.length === 0) { + return false; + } else { + worker.postMessage({ height: missingBlockHeights.shift() }); + return true; } - ++indexedThisRun; - ++totalIndexed; - const elapsedSeconds = (Date.now() / 1000) - timer; - if (elapsedSeconds > 5 || blockHeight === lastBlockToIndex) { - const runningFor = (Date.now() / 1000) - startedAt; - const blockPerSeconds = indexedThisRun / elapsedSeconds; + }; + + const handleResult = (height: number): void => { + indexedThisRun++; + totalIndexed++; + newlyIndexed++; + const elapsed = Date.now() - timer; + if (elapsed > 5000 || height === lastBlockToIndex) { + const runningFor = Date.now() - startedAt; + const blockPerSeconds = indexedThisRun / (elapsed / 1000); const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100; - logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining); - timer = Date.now() / 1000; + logger.debug(`Indexing block #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining); + timer = Date.now(); indexedThisRun = 0; loadingIndicators.setProgress('block-indexing', progress, false); } - const blockHash = await bitcoinApi.$getBlockHash(blockHeight); - const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash); - const transactions = await this.$getTransactionsExtended(blockHash, block.height, true, null, true); - const blockExtended = await this.$getBlockExtended(block, transactions); + }; - newlyIndexed++; - await blocksRepository.$saveBlockInDatabase(blockExtended); + // Start a task on each worker + for (const worker of workerPool) { + promises.push(new Promise((resolve, reject) => { + worker.removeAllListeners(); + worker.on('message', (result) => { + // Handle the result, then assign a new task to the worker + handleResult(result); + if (!assignTask(worker)) { + resolve(); + }; + }); + worker.on('error', reject); + if (!assignTask(worker)) { + resolve(); + } + })); } + await Promise.all(promises); + currentBlockHeight -= chunkSize; } + + for (const worker of workerPool) { + if (worker) { + // clean up the workers + worker.removeAllListeners(); + worker.terminate(); + } + } + if (newlyIndexed > 0) { logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining); } else { @@ -627,6 +670,14 @@ class Blocks { logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining); loadingIndicators.setProgress('block-indexing', 100); throw e; + } finally { + for (const worker of workerPool) { + if (worker) { + // clean up the workers + worker.removeAllListeners(); + worker.terminate(); + } + } } return await BlocksRepository.$validateChain(); diff --git a/backend/src/index-workers/block-worker.ts b/backend/src/index-workers/block-worker.ts new file mode 100644 index 000000000..30db427f2 --- /dev/null +++ b/backend/src/index-workers/block-worker.ts @@ -0,0 +1,25 @@ +import { parentPort } from 'worker_threads'; +import bitcoinApi from '../api/bitcoin/bitcoin-api-factory'; +import blocksRepository from '../repositories/BlocksRepository'; +import blocks from '../api/blocks'; +import { IEsploraApi } from '../api/bitcoin/esplora-api.interface'; + +if (parentPort) { + parentPort.on('message', async (params) => { + if (params.height != null) { + await indexBlock(params.height); + } + + if (parentPort) { + parentPort.postMessage(params.height); + } + }); +} + +async function indexBlock(blockHeight: number): Promise { + const blockHash = await bitcoinApi.$getBlockHash(blockHeight); + const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash); + const transactions = await blocks['$getTransactionsExtended'](blockHash, block.height, true, null, true); + const blockExtended = await blocks['$getBlockExtended'](block, transactions); + await blocksRepository.$saveBlockInDatabase(blockExtended); +} \ No newline at end of file