Parallelize block summary/cpfp indexing with worker threads

This commit is contained in:
Mononaut 2023-08-02 20:07:00 +09:00
parent e02637718f
commit 82383d112c
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
2 changed files with 98 additions and 22 deletions

View File

@ -410,6 +410,8 @@ class Blocks {
return;
}
const workerPool: Worker[] = [];
try {
// Get all indexed block hash
const indexedBlocks = await blocksRepository.$getIndexedBlocks();
@ -424,39 +426,67 @@ class Blocks {
let newlyIndexed = 0;
let totalIndexed = indexedBlockSummariesHashesArray.length;
let indexedThisRun = 0;
let timer = Date.now() / 1000;
const startedAt = Date.now() / 1000;
let timer = Date.now();
const startedAt = Date.now();
for (const block of indexedBlocks) {
if (indexedBlockSummariesHashes[block.hash] === true) {
continue;
}
const blocksToIndex = indexedBlocks.filter(block => !indexedBlockSummariesHashes[block.hash]);
// Logging
const elapsedSeconds = (Date.now() / 1000) - timer;
if (elapsedSeconds > 5) {
const runningFor = (Date.now() / 1000) - startedAt;
const blockPerSeconds = indexedThisRun / elapsedSeconds;
const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining);
timer = Date.now() / 1000;
indexedThisRun = 0;
}
if (!blocksToIndex.length) {
return;
}
const numWorkers = Math.max(1, os.cpus().length - 1);
for (let i = 0; i < numWorkers; i++) {
workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-summary-worker.js')));
}
if (config.MEMPOOL.BACKEND === 'esplora') {
const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx));
const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs);
await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary
const promises: Promise<void>[] = [];
// This function assigns a task to a worker
const assignTask = (worker: Worker): boolean => {
if (blocksToIndex.length === 0) {
return false;
} else {
await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary
worker.postMessage(blocksToIndex.shift());
return true;
}
};
// Logging
const handleResult = (height: number): void => {
indexedThisRun++;
totalIndexed++;
newlyIndexed++;
const elapsed = Date.now() - timer;
if (elapsed > 5000) {
const runningFor = Date.now() - startedAt;
const blockPerSeconds = indexedThisRun / (elapsed / 1000);
const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
logger.debug(`Indexing block summary for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
timer = Date.now();
indexedThisRun = 0;
}
};
// Start a task on each worker
for (const worker of workerPool) {
promises.push(new Promise((resolve, reject) => {
worker.removeAllListeners();
worker.on('message', (result) => {
// Handle the result, then assign a new task to the worker
handleResult(result);
if (!assignTask(worker)) {
resolve();
};
});
worker.on('error', reject);
if (!assignTask(worker)) {
resolve();
}
}));
}
await Promise.all(promises);
if (newlyIndexed > 0) {
logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
} else {
@ -465,6 +495,14 @@ class Blocks {
} catch (e) {
logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
throw e;
} finally {
for (const worker of workerPool) {
if (worker) {
// clean up the workers
worker.removeAllListeners();
worker.terminate();
}
}
}
}

View File

@ -0,0 +1,38 @@
import { parentPort } from 'worker_threads';
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
import blocks from '../api/blocks';
import config from '../config';
import transactionUtils from '../api/transaction-utils';
import bitcoinClient from '../api/bitcoin/bitcoin-client';
if (parentPort) {
parentPort.on('message', async ({ hash, height }) => {
if (hash != null && height != null) {
await indexBlockSummary(hash, height);
}
if (parentPort) {
parentPort.postMessage(height);
}
});
}
async function indexBlockSummary(hash: string, height: number): Promise<void> {
let txs;
if (config.MEMPOOL.BACKEND === 'esplora') {
txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx));
} else {
const block = await bitcoinClient.getBlock(hash, 2);
txs = block.tx.map(tx => {
tx.fee = Math.round(tx.fee * 100_000_000);
tx.vout.forEach((vout) => {
vout.value = Math.round(vout.value * 100000000);
});
tx.vsize = Math.round(tx.weight / 4); // required for backwards compatibility
return tx;
});
}
const cpfpSummary = await blocks.$indexCPFP(hash, height, txs);
await blocks.$getStrippedBlockTransactions(hash, true, true, cpfpSummary, height); // This will index the block summary
}