diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 79612d1d5..7465fc9e9 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -410,6 +410,8 @@ class Blocks { return; } + const workerPool: Worker[] = []; + try { // Get all indexed block hash const indexedBlocks = await blocksRepository.$getIndexedBlocks(); @@ -424,39 +426,67 @@ class Blocks { let newlyIndexed = 0; let totalIndexed = indexedBlockSummariesHashesArray.length; let indexedThisRun = 0; - let timer = Date.now() / 1000; - const startedAt = Date.now() / 1000; + let timer = Date.now(); + const startedAt = Date.now(); - for (const block of indexedBlocks) { - if (indexedBlockSummariesHashes[block.hash] === true) { - continue; - } + const blocksToIndex = indexedBlocks.filter(block => !indexedBlockSummariesHashes[block.hash]); - // Logging - const elapsedSeconds = (Date.now() / 1000) - timer; - if (elapsedSeconds > 5) { - const runningFor = (Date.now() / 1000) - startedAt; - const blockPerSeconds = indexedThisRun / elapsedSeconds; - const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100; - logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining); - timer = Date.now() / 1000; - indexedThisRun = 0; - } + if (!blocksToIndex.length) { + return; + } + const numWorkers = Math.max(1, os.cpus().length - 1); + for (let i = 0; i < numWorkers; i++) { + workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-summary-worker.js'))); + } - if (config.MEMPOOL.BACKEND === 'esplora') { - const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx)); - const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs); - await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary + const promises: Promise[] = []; + + // This function assigns a task to a worker + const assignTask = (worker: Worker): boolean => { + if (blocksToIndex.length === 0) { + return false; } else { - await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary + worker.postMessage(blocksToIndex.shift()); + return true; } + }; - // Logging + const handleResult = (height: number): void => { indexedThisRun++; totalIndexed++; newlyIndexed++; + const elapsed = Date.now() - timer; + if (elapsed > 5000) { + const runningFor = Date.now() - startedAt; + const blockPerSeconds = indexedThisRun / (elapsed / 1000); + const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100; + logger.debug(`Indexing block summary for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining); + timer = Date.now(); + indexedThisRun = 0; + } + }; + + // Start a task on each worker + for (const worker of workerPool) { + promises.push(new Promise((resolve, reject) => { + worker.removeAllListeners(); + worker.on('message', (result) => { + // Handle the result, then assign a new task to the worker + handleResult(result); + if (!assignTask(worker)) { + resolve(); + }; + }); + worker.on('error', reject); + if (!assignTask(worker)) { + resolve(); + } + })); } + + await Promise.all(promises); + if (newlyIndexed > 0) { logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining); } else { @@ -465,6 +495,14 @@ class Blocks { } catch (e) { logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining); throw e; + } finally { + for (const worker of workerPool) { + if (worker) { + // clean up the workers + worker.removeAllListeners(); + worker.terminate(); + } + } } } diff --git a/backend/src/index-workers/block-summary-worker.ts b/backend/src/index-workers/block-summary-worker.ts new file mode 100644 index 000000000..9ceb7bd88 --- /dev/null +++ b/backend/src/index-workers/block-summary-worker.ts @@ -0,0 +1,38 @@ +import { parentPort } from 'worker_threads'; +import bitcoinApi from '../api/bitcoin/bitcoin-api-factory'; +import blocks from '../api/blocks'; +import config from '../config'; +import transactionUtils from '../api/transaction-utils'; +import bitcoinClient from '../api/bitcoin/bitcoin-client'; + +if (parentPort) { + parentPort.on('message', async ({ hash, height }) => { + if (hash != null && height != null) { + await indexBlockSummary(hash, height); + } + + if (parentPort) { + parentPort.postMessage(height); + } + }); +} + +async function indexBlockSummary(hash: string, height: number): Promise { + let txs; + if (config.MEMPOOL.BACKEND === 'esplora') { + txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx)); + } else { + const block = await bitcoinClient.getBlock(hash, 2); + txs = block.tx.map(tx => { + tx.fee = Math.round(tx.fee * 100_000_000); + tx.vout.forEach((vout) => { + vout.value = Math.round(vout.value * 100000000); + }); + tx.vsize = Math.round(tx.weight / 4); // required for backwards compatibility + return tx; + }); + } + + const cpfpSummary = await blocks.$indexCPFP(hash, height, txs); + await blocks.$getStrippedBlockTransactions(hash, true, true, cpfpSummary, height); // This will index the block summary +} \ No newline at end of file