Parallelize block indexing with worker threads
This commit is contained in:
parent
f12fabe030
commit
df596ab5bf
@ -29,6 +29,10 @@ import websocketHandler from './websocket-handler';
|
||||
import redisCache from './redis-cache';
|
||||
import rbfCache from './rbf-cache';
|
||||
import { calcBitsDifference } from './difficulty-adjustment';
|
||||
import os from 'os';
|
||||
import { Worker } from 'worker_threads';
|
||||
import path from 'path';
|
||||
|
||||
|
||||
class Blocks {
|
||||
private blocks: BlockExtended[] = [];
|
||||
@ -557,6 +561,7 @@ class Blocks {
|
||||
* [INDEXING] Index all blocks metadata for the mining dashboard
|
||||
*/
|
||||
public async $generateBlockDatabase(): Promise<boolean> {
|
||||
const workerPool: Worker[] = [];
|
||||
try {
|
||||
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
|
||||
let currentBlockHeight = blockchainInfo.blocks;
|
||||
@ -575,12 +580,18 @@ class Blocks {
|
||||
let totalIndexed = await blocksRepository.$blockCountBetweenHeight(currentBlockHeight, lastBlockToIndex);
|
||||
let indexedThisRun = 0;
|
||||
let newlyIndexed = 0;
|
||||
const startedAt = Date.now() / 1000;
|
||||
let timer = Date.now() / 1000;
|
||||
const startedAt = Date.now();
|
||||
let timer = Date.now();
|
||||
|
||||
if (currentBlockHeight >= lastBlockToIndex) {
|
||||
const numWorkers = Math.max(1, os.cpus().length - 1);
|
||||
for (let i = 0; i < numWorkers; i++) {
|
||||
workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-worker.js')));
|
||||
}
|
||||
}
|
||||
|
||||
while (currentBlockHeight >= lastBlockToIndex) {
|
||||
const endBlock = Math.max(0, lastBlockToIndex, currentBlockHeight - chunkSize + 1);
|
||||
|
||||
const missingBlockHeights: number[] = await blocksRepository.$getMissingBlocksBetweenHeights(
|
||||
currentBlockHeight, endBlock);
|
||||
if (missingBlockHeights.length <= 0) {
|
||||
@ -590,33 +601,65 @@ class Blocks {
|
||||
|
||||
logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining);
|
||||
|
||||
for (const blockHeight of missingBlockHeights) {
|
||||
if (blockHeight < lastBlockToIndex) {
|
||||
break;
|
||||
const promises: Promise<void>[] = [];
|
||||
|
||||
// This function assigns a task to a worker
|
||||
const assignTask = (worker: Worker): boolean => {
|
||||
if (missingBlockHeights.length === 0) {
|
||||
return false;
|
||||
} else {
|
||||
worker.postMessage({ height: missingBlockHeights.shift() });
|
||||
return true;
|
||||
}
|
||||
++indexedThisRun;
|
||||
++totalIndexed;
|
||||
const elapsedSeconds = (Date.now() / 1000) - timer;
|
||||
if (elapsedSeconds > 5 || blockHeight === lastBlockToIndex) {
|
||||
const runningFor = (Date.now() / 1000) - startedAt;
|
||||
const blockPerSeconds = indexedThisRun / elapsedSeconds;
|
||||
};
|
||||
|
||||
const handleResult = (height: number): void => {
|
||||
indexedThisRun++;
|
||||
totalIndexed++;
|
||||
newlyIndexed++;
|
||||
const elapsed = Date.now() - timer;
|
||||
if (elapsed > 5000 || height === lastBlockToIndex) {
|
||||
const runningFor = Date.now() - startedAt;
|
||||
const blockPerSeconds = indexedThisRun / (elapsed / 1000);
|
||||
const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100;
|
||||
logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining);
|
||||
timer = Date.now() / 1000;
|
||||
logger.debug(`Indexing block #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
|
||||
timer = Date.now();
|
||||
indexedThisRun = 0;
|
||||
loadingIndicators.setProgress('block-indexing', progress, false);
|
||||
}
|
||||
const blockHash = await bitcoinApi.$getBlockHash(blockHeight);
|
||||
const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
|
||||
const transactions = await this.$getTransactionsExtended(blockHash, block.height, true, null, true);
|
||||
const blockExtended = await this.$getBlockExtended(block, transactions);
|
||||
};
|
||||
|
||||
newlyIndexed++;
|
||||
await blocksRepository.$saveBlockInDatabase(blockExtended);
|
||||
// Start a task on each worker
|
||||
for (const worker of workerPool) {
|
||||
promises.push(new Promise((resolve, reject) => {
|
||||
worker.removeAllListeners();
|
||||
worker.on('message', (result) => {
|
||||
// Handle the result, then assign a new task to the worker
|
||||
handleResult(result);
|
||||
if (!assignTask(worker)) {
|
||||
resolve();
|
||||
};
|
||||
});
|
||||
worker.on('error', reject);
|
||||
if (!assignTask(worker)) {
|
||||
resolve();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
await Promise.all(promises);
|
||||
|
||||
currentBlockHeight -= chunkSize;
|
||||
}
|
||||
|
||||
for (const worker of workerPool) {
|
||||
if (worker) {
|
||||
// clean up the workers
|
||||
worker.removeAllListeners();
|
||||
worker.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
if (newlyIndexed > 0) {
|
||||
logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
|
||||
} else {
|
||||
@ -627,6 +670,14 @@ class Blocks {
|
||||
logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
|
||||
loadingIndicators.setProgress('block-indexing', 100);
|
||||
throw e;
|
||||
} finally {
|
||||
for (const worker of workerPool) {
|
||||
if (worker) {
|
||||
// clean up the workers
|
||||
worker.removeAllListeners();
|
||||
worker.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return await BlocksRepository.$validateChain();
|
||||
|
25
backend/src/index-workers/block-worker.ts
Normal file
25
backend/src/index-workers/block-worker.ts
Normal file
@ -0,0 +1,25 @@
|
||||
import { parentPort } from 'worker_threads';
|
||||
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
|
||||
import blocksRepository from '../repositories/BlocksRepository';
|
||||
import blocks from '../api/blocks';
|
||||
import { IEsploraApi } from '../api/bitcoin/esplora-api.interface';
|
||||
|
||||
if (parentPort) {
|
||||
parentPort.on('message', async (params) => {
|
||||
if (params.height != null) {
|
||||
await indexBlock(params.height);
|
||||
}
|
||||
|
||||
if (parentPort) {
|
||||
parentPort.postMessage(params.height);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function indexBlock(blockHeight: number): Promise<void> {
|
||||
const blockHash = await bitcoinApi.$getBlockHash(blockHeight);
|
||||
const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
|
||||
const transactions = await blocks['$getTransactionsExtended'](blockHash, block.height, true, null, true);
|
||||
const blockExtended = await blocks['$getBlockExtended'](block, transactions);
|
||||
await blocksRepository.$saveBlockInDatabase(blockExtended);
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user