Compare commits

...

5 Commits

Author SHA1 Message Date
Mononaut
e3bc33e41b
Always use Core verbose block for summaries indexing 2023-08-24 22:01:20 +09:00
Mononaut
20a8725450
Avoid initializing rbf cache in worker threads 2023-08-24 22:01:20 +09:00
Mononaut
a509a52993
Avoid initializing redis in worker threads 2023-08-24 22:01:20 +09:00
Mononaut
82383d112c
Parallelize block summary/cpfp indexing with worker threads 2023-08-24 22:01:20 +09:00
Mononaut
e02637718f
Parallelize block indexing with worker threads 2023-08-24 22:01:20 +09:00
6 changed files with 206 additions and 59 deletions

View File

@ -29,6 +29,10 @@ import websocketHandler from './websocket-handler';
import redisCache from './redis-cache'; import redisCache from './redis-cache';
import rbfCache from './rbf-cache'; import rbfCache from './rbf-cache';
import { calcBitsDifference } from './difficulty-adjustment'; import { calcBitsDifference } from './difficulty-adjustment';
import os from 'os';
import { Worker } from 'worker_threads';
import path from 'path';
class Blocks { class Blocks {
private blocks: BlockExtended[] = []; private blocks: BlockExtended[] = [];
@ -406,6 +410,8 @@ class Blocks {
return; return;
} }
const workerPool: Worker[] = [];
try { try {
// Get all indexed block hash // Get all indexed block hash
const indexedBlocks = await blocksRepository.$getIndexedBlocks(); const indexedBlocks = await blocksRepository.$getIndexedBlocks();
@ -420,39 +426,67 @@ class Blocks {
let newlyIndexed = 0; let newlyIndexed = 0;
let totalIndexed = indexedBlockSummariesHashesArray.length; let totalIndexed = indexedBlockSummariesHashesArray.length;
let indexedThisRun = 0; let indexedThisRun = 0;
let timer = Date.now() / 1000; let timer = Date.now();
const startedAt = Date.now() / 1000; const startedAt = Date.now();
for (const block of indexedBlocks) { const blocksToIndex = indexedBlocks.filter(block => !indexedBlockSummariesHashes[block.hash]);
if (indexedBlockSummariesHashes[block.hash] === true) {
continue;
}
// Logging if (!blocksToIndex.length) {
const elapsedSeconds = (Date.now() / 1000) - timer; return;
if (elapsedSeconds > 5) { }
const runningFor = (Date.now() / 1000) - startedAt;
const blockPerSeconds = indexedThisRun / elapsedSeconds;
const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining);
timer = Date.now() / 1000;
indexedThisRun = 0;
}
const numWorkers = Math.max(1, os.cpus().length - 1);
for (let i = 0; i < numWorkers; i++) {
workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-summary-worker.js')));
}
if (config.MEMPOOL.BACKEND === 'esplora') { const promises: Promise<void>[] = [];
const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx));
const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs); // This function assigns a task to a worker
await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary const assignTask = (worker: Worker): boolean => {
if (blocksToIndex.length === 0) {
return false;
} else { } else {
await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary worker.postMessage(blocksToIndex.shift());
return true;
} }
};
// Logging const handleResult = (height: number): void => {
indexedThisRun++; indexedThisRun++;
totalIndexed++; totalIndexed++;
newlyIndexed++; newlyIndexed++;
const elapsed = Date.now() - timer;
if (elapsed > 5000) {
const runningFor = Date.now() - startedAt;
const blockPerSeconds = indexedThisRun / (elapsed / 1000);
const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
logger.debug(`Indexing block summary for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
timer = Date.now();
indexedThisRun = 0;
}
};
// Start a task on each worker
for (const worker of workerPool) {
promises.push(new Promise((resolve, reject) => {
worker.removeAllListeners();
worker.on('message', (result) => {
// Handle the result, then assign a new task to the worker
handleResult(result);
if (!assignTask(worker)) {
resolve();
};
});
worker.on('error', reject);
if (!assignTask(worker)) {
resolve();
}
}));
} }
await Promise.all(promises);
if (newlyIndexed > 0) { if (newlyIndexed > 0) {
logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining); logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
} else { } else {
@ -461,6 +495,14 @@ class Blocks {
} catch (e) { } catch (e) {
logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining); logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
throw e; throw e;
} finally {
for (const worker of workerPool) {
if (worker) {
// clean up the workers
worker.removeAllListeners();
worker.terminate();
}
}
} }
} }
@ -557,6 +599,7 @@ class Blocks {
* [INDEXING] Index all blocks metadata for the mining dashboard * [INDEXING] Index all blocks metadata for the mining dashboard
*/ */
public async $generateBlockDatabase(): Promise<boolean> { public async $generateBlockDatabase(): Promise<boolean> {
const workerPool: Worker[] = [];
try { try {
const blockchainInfo = await bitcoinClient.getBlockchainInfo(); const blockchainInfo = await bitcoinClient.getBlockchainInfo();
let currentBlockHeight = blockchainInfo.blocks; let currentBlockHeight = blockchainInfo.blocks;
@ -575,12 +618,18 @@ class Blocks {
let totalIndexed = await blocksRepository.$blockCountBetweenHeight(currentBlockHeight, lastBlockToIndex); let totalIndexed = await blocksRepository.$blockCountBetweenHeight(currentBlockHeight, lastBlockToIndex);
let indexedThisRun = 0; let indexedThisRun = 0;
let newlyIndexed = 0; let newlyIndexed = 0;
const startedAt = Date.now() / 1000; const startedAt = Date.now();
let timer = Date.now() / 1000; let timer = Date.now();
if (currentBlockHeight >= lastBlockToIndex) {
const numWorkers = Math.max(1, os.cpus().length - 1);
for (let i = 0; i < numWorkers; i++) {
workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-worker.js')));
}
}
while (currentBlockHeight >= lastBlockToIndex) { while (currentBlockHeight >= lastBlockToIndex) {
const endBlock = Math.max(0, lastBlockToIndex, currentBlockHeight - chunkSize + 1); const endBlock = Math.max(0, lastBlockToIndex, currentBlockHeight - chunkSize + 1);
const missingBlockHeights: number[] = await blocksRepository.$getMissingBlocksBetweenHeights( const missingBlockHeights: number[] = await blocksRepository.$getMissingBlocksBetweenHeights(
currentBlockHeight, endBlock); currentBlockHeight, endBlock);
if (missingBlockHeights.length <= 0) { if (missingBlockHeights.length <= 0) {
@ -590,33 +639,65 @@ class Blocks {
logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining); logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining);
for (const blockHeight of missingBlockHeights) { const promises: Promise<void>[] = [];
if (blockHeight < lastBlockToIndex) {
break; // This function assigns a task to a worker
const assignTask = (worker: Worker): boolean => {
if (missingBlockHeights.length === 0) {
return false;
} else {
worker.postMessage({ height: missingBlockHeights.shift() });
return true;
} }
++indexedThisRun; };
++totalIndexed;
const elapsedSeconds = (Date.now() / 1000) - timer; const handleResult = (height: number): void => {
if (elapsedSeconds > 5 || blockHeight === lastBlockToIndex) { indexedThisRun++;
const runningFor = (Date.now() / 1000) - startedAt; totalIndexed++;
const blockPerSeconds = indexedThisRun / elapsedSeconds; newlyIndexed++;
const elapsed = Date.now() - timer;
if (elapsed > 5000 || height === lastBlockToIndex) {
const runningFor = Date.now() - startedAt;
const blockPerSeconds = indexedThisRun / (elapsed / 1000);
const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100; const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100;
logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining); logger.debug(`Indexing block #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
timer = Date.now() / 1000; timer = Date.now();
indexedThisRun = 0; indexedThisRun = 0;
loadingIndicators.setProgress('block-indexing', progress, false); loadingIndicators.setProgress('block-indexing', progress, false);
} }
const blockHash = await bitcoinApi.$getBlockHash(blockHeight); };
const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
const transactions = await this.$getTransactionsExtended(blockHash, block.height, true, null, true);
const blockExtended = await this.$getBlockExtended(block, transactions);
newlyIndexed++; // Start a task on each worker
await blocksRepository.$saveBlockInDatabase(blockExtended); for (const worker of workerPool) {
promises.push(new Promise((resolve, reject) => {
worker.removeAllListeners();
worker.on('message', (result) => {
// Handle the result, then assign a new task to the worker
handleResult(result);
if (!assignTask(worker)) {
resolve();
};
});
worker.on('error', reject);
if (!assignTask(worker)) {
resolve();
}
}));
} }
await Promise.all(promises);
currentBlockHeight -= chunkSize; currentBlockHeight -= chunkSize;
} }
for (const worker of workerPool) {
if (worker) {
// clean up the workers
worker.removeAllListeners();
worker.terminate();
}
}
if (newlyIndexed > 0) { if (newlyIndexed > 0) {
logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining); logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
} else { } else {
@ -627,6 +708,14 @@ class Blocks {
logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining); logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
loadingIndicators.setProgress('block-indexing', 100); loadingIndicators.setProgress('block-indexing', 100);
throw e; throw e;
} finally {
for (const worker of workerPool) {
if (worker) {
// clean up the workers
worker.removeAllListeners();
worker.terminate();
}
}
} }
return await BlocksRepository.$validateChain(); return await BlocksRepository.$validateChain();

View File

@ -53,7 +53,7 @@ class RbfCache {
private expiring: Map<string, number> = new Map(); private expiring: Map<string, number> = new Map();
private cacheQueue: CacheEvent[] = []; private cacheQueue: CacheEvent[] = [];
constructor() { public init(): void {
setInterval(this.cleanup.bind(this), 1000 * 60 * 10); setInterval(this.cleanup.bind(this), 1000 * 60 * 10);
} }

View File

@ -23,24 +23,21 @@ class RedisCache {
private cacheQueue: MempoolTransactionExtended[] = []; private cacheQueue: MempoolTransactionExtended[] = [];
private txFlushLimit: number = 10000; private txFlushLimit: number = 10000;
constructor() {
if (config.REDIS.ENABLED) {
const redisConfig = {
socket: {
path: config.REDIS.UNIX_SOCKET_PATH
},
database: NetworkDB[config.MEMPOOL.NETWORK],
};
this.client = createClient(redisConfig);
this.client.on('error', (e) => {
logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
});
this.$ensureConnected();
}
}
private async $ensureConnected(): Promise<void> { private async $ensureConnected(): Promise<void> {
if (!this.connected && config.REDIS.ENABLED) { if (!this.connected && config.REDIS.ENABLED) {
if (!this.client) {
const redisConfig = {
socket: {
path: config.REDIS.UNIX_SOCKET_PATH
},
database: NetworkDB[config.MEMPOOL.NETWORK],
};
this.client = createClient(redisConfig);
this.client.on('error', (e) => {
logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
});
}
return this.client.connect().then(async () => { return this.client.connect().then(async () => {
this.connected = true; this.connected = true;
logger.info(`Redis client connected`); logger.info(`Redis client connected`);

View File

@ -0,0 +1,33 @@
import { parentPort } from 'worker_threads';
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
import blocks from '../api/blocks';
import config from '../config';
import transactionUtils from '../api/transaction-utils';
import bitcoinClient from '../api/bitcoin/bitcoin-client';
if (parentPort) {
parentPort.on('message', async ({ hash, height }) => {
if (hash != null && height != null) {
await indexBlockSummary(hash, height);
}
if (parentPort) {
parentPort.postMessage(height);
}
});
}
async function indexBlockSummary(hash: string, height: number): Promise<void> {
const block = await bitcoinClient.getBlock(hash, 2);
const txs = block.tx.map(tx => {
tx.fee = Math.round(tx.fee * 100_000_000);
tx.vout.forEach((vout) => {
vout.value = Math.round(vout.value * 100000000);
});
tx.vsize = Math.round(tx.weight / 4); // required for backwards compatibility
return tx;
});
const cpfpSummary = await blocks.$indexCPFP(hash, height, txs);
await blocks.$getStrippedBlockTransactions(hash, true, true, cpfpSummary, height); // This will index the block summary
}

View File

@ -0,0 +1,25 @@
import { parentPort } from 'worker_threads';
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
import blocksRepository from '../repositories/BlocksRepository';
import blocks from '../api/blocks';
import { IEsploraApi } from '../api/bitcoin/esplora-api.interface';
if (parentPort) {
parentPort.on('message', async (params) => {
if (params.height != null) {
await indexBlock(params.height);
}
if (parentPort) {
parentPort.postMessage(params.height);
}
});
}
async function indexBlock(blockHeight: number): Promise<void> {
const blockHash = await bitcoinApi.$getBlockHash(blockHeight);
const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
const transactions = await blocks['$getTransactionsExtended'](blockHash, block.height, true, null, true);
const blockExtended = await blocks['$getBlockExtended'](block, transactions);
await blocksRepository.$saveBlockInDatabase(blockExtended);
}

View File

@ -43,6 +43,7 @@ import { AxiosError } from 'axios';
import v8 from 'v8'; import v8 from 'v8';
import { formatBytes, getBytesUnit } from './utils/format'; import { formatBytes, getBytesUnit } from './utils/format';
import redisCache from './api/redis-cache'; import redisCache from './api/redis-cache';
import rbfCache from './api/rbf-cache';
class Server { class Server {
private wss: WebSocket.Server | undefined; private wss: WebSocket.Server | undefined;
@ -107,6 +108,8 @@ class Server {
} }
} }
rbfCache.init();
this.app this.app
.use((req: Request, res: Response, next: NextFunction) => { .use((req: Request, res: Response, next: NextFunction) => {
res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Origin', '*');