Compare commits
	
		
			4 Commits
		
	
	
		
			master
			...
			mononaut/p
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					7569c6a394 | ||
| 
						 | 
					d67285c683 | ||
| 
						 | 
					2b7ac32c22 | ||
| 
						 | 
					df596ab5bf | 
@ -29,6 +29,10 @@ import websocketHandler from './websocket-handler';
 | 
			
		||||
import redisCache from './redis-cache';
 | 
			
		||||
import rbfCache from './rbf-cache';
 | 
			
		||||
import { calcBitsDifference } from './difficulty-adjustment';
 | 
			
		||||
import os from 'os';
 | 
			
		||||
import { Worker } from 'worker_threads';
 | 
			
		||||
import path from 'path';
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Blocks {
 | 
			
		||||
  private blocks: BlockExtended[] = [];
 | 
			
		||||
@ -406,6 +410,8 @@ class Blocks {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const workerPool: Worker[] = [];
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      // Get all indexed block hash
 | 
			
		||||
      const indexedBlocks = await blocksRepository.$getIndexedBlocks();
 | 
			
		||||
@ -420,39 +426,67 @@ class Blocks {
 | 
			
		||||
      let newlyIndexed = 0;
 | 
			
		||||
      let totalIndexed = indexedBlockSummariesHashesArray.length;
 | 
			
		||||
      let indexedThisRun = 0;
 | 
			
		||||
      let timer = Date.now() / 1000;
 | 
			
		||||
      const startedAt = Date.now() / 1000;
 | 
			
		||||
      let timer = Date.now();
 | 
			
		||||
      const startedAt = Date.now();
 | 
			
		||||
 | 
			
		||||
      for (const block of indexedBlocks) {
 | 
			
		||||
        if (indexedBlockSummariesHashes[block.hash] === true) {
 | 
			
		||||
          continue;
 | 
			
		||||
        }
 | 
			
		||||
      const blocksToIndex = indexedBlocks.filter(block => !indexedBlockSummariesHashes[block.hash]);
 | 
			
		||||
 | 
			
		||||
        // Logging
 | 
			
		||||
        const elapsedSeconds = (Date.now() / 1000) - timer;
 | 
			
		||||
        if (elapsedSeconds > 5) {
 | 
			
		||||
          const runningFor = (Date.now() / 1000) - startedAt;
 | 
			
		||||
          const blockPerSeconds = indexedThisRun / elapsedSeconds;
 | 
			
		||||
          const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
 | 
			
		||||
          logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining);
 | 
			
		||||
          timer = Date.now() / 1000;
 | 
			
		||||
          indexedThisRun = 0;
 | 
			
		||||
        }
 | 
			
		||||
      if (!blocksToIndex.length) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const numWorkers = Math.max(1, os.cpus().length - 1);
 | 
			
		||||
      for (let i = 0; i < numWorkers; i++) {
 | 
			
		||||
        workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-summary-worker.js')));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
        if (config.MEMPOOL.BACKEND === 'esplora') {
 | 
			
		||||
          const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx));
 | 
			
		||||
          const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs);
 | 
			
		||||
          await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary
 | 
			
		||||
      const promises: Promise<void>[] = [];
 | 
			
		||||
 | 
			
		||||
      // This function assigns a task to a worker
 | 
			
		||||
      const assignTask = (worker: Worker): boolean => {
 | 
			
		||||
        if (blocksToIndex.length === 0) {
 | 
			
		||||
          return false;
 | 
			
		||||
        } else {
 | 
			
		||||
          await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary
 | 
			
		||||
          worker.postMessage(blocksToIndex.shift());
 | 
			
		||||
          return true;
 | 
			
		||||
        }
 | 
			
		||||
      };
 | 
			
		||||
 | 
			
		||||
        // Logging
 | 
			
		||||
      const handleResult = (height: number): void => {
 | 
			
		||||
        indexedThisRun++;
 | 
			
		||||
        totalIndexed++;
 | 
			
		||||
        newlyIndexed++;
 | 
			
		||||
        const elapsed = Date.now() - timer;
 | 
			
		||||
        if (elapsed > 5000) {
 | 
			
		||||
          const runningFor = Date.now() - startedAt;
 | 
			
		||||
          const blockPerSeconds = indexedThisRun / (elapsed / 1000);
 | 
			
		||||
          const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
 | 
			
		||||
          logger.debug(`Indexing block summary for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
 | 
			
		||||
          timer = Date.now();
 | 
			
		||||
          indexedThisRun = 0;
 | 
			
		||||
        }
 | 
			
		||||
      };
 | 
			
		||||
 | 
			
		||||
      // Start a task on each worker
 | 
			
		||||
      for (const worker of workerPool) {
 | 
			
		||||
        promises.push(new Promise((resolve, reject) => {
 | 
			
		||||
          worker.removeAllListeners();
 | 
			
		||||
          worker.on('message', (result) => {
 | 
			
		||||
            // Handle the result, then assign a new task to the worker
 | 
			
		||||
            handleResult(result);
 | 
			
		||||
            if (!assignTask(worker)) {
 | 
			
		||||
              resolve();
 | 
			
		||||
            };
 | 
			
		||||
          });
 | 
			
		||||
          worker.on('error', reject);
 | 
			
		||||
          if (!assignTask(worker)) {
 | 
			
		||||
            resolve();
 | 
			
		||||
          }
 | 
			
		||||
        }));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      await Promise.all(promises);
 | 
			
		||||
 | 
			
		||||
      if (newlyIndexed > 0) {
 | 
			
		||||
        logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
 | 
			
		||||
      } else {
 | 
			
		||||
@ -461,6 +495,14 @@ class Blocks {
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
 | 
			
		||||
      throw e;
 | 
			
		||||
    } finally {
 | 
			
		||||
      for (const worker of workerPool) {
 | 
			
		||||
        if (worker) {
 | 
			
		||||
          // clean up the workers
 | 
			
		||||
          worker.removeAllListeners();
 | 
			
		||||
          worker.terminate();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -557,6 +599,7 @@ class Blocks {
 | 
			
		||||
   * [INDEXING] Index all blocks metadata for the mining dashboard
 | 
			
		||||
   */
 | 
			
		||||
  public async $generateBlockDatabase(): Promise<boolean> {
 | 
			
		||||
    const workerPool: Worker[] = [];
 | 
			
		||||
    try {
 | 
			
		||||
      const blockchainInfo = await bitcoinClient.getBlockchainInfo();
 | 
			
		||||
      let currentBlockHeight = blockchainInfo.blocks;
 | 
			
		||||
@ -575,12 +618,18 @@ class Blocks {
 | 
			
		||||
      let totalIndexed = await blocksRepository.$blockCountBetweenHeight(currentBlockHeight, lastBlockToIndex);
 | 
			
		||||
      let indexedThisRun = 0;
 | 
			
		||||
      let newlyIndexed = 0;
 | 
			
		||||
      const startedAt = Date.now() / 1000;
 | 
			
		||||
      let timer = Date.now() / 1000;
 | 
			
		||||
      const startedAt = Date.now();
 | 
			
		||||
      let timer = Date.now();
 | 
			
		||||
 | 
			
		||||
      if (currentBlockHeight >= lastBlockToIndex) {
 | 
			
		||||
        const numWorkers = Math.max(1, os.cpus().length - 1);
 | 
			
		||||
        for (let i = 0; i < numWorkers; i++) {
 | 
			
		||||
          workerPool.push(new Worker(path.resolve(__dirname, '../index-workers/block-worker.js')));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      while (currentBlockHeight >= lastBlockToIndex) {
 | 
			
		||||
        const endBlock = Math.max(0, lastBlockToIndex, currentBlockHeight - chunkSize + 1);
 | 
			
		||||
 | 
			
		||||
        const missingBlockHeights: number[] = await blocksRepository.$getMissingBlocksBetweenHeights(
 | 
			
		||||
          currentBlockHeight, endBlock);
 | 
			
		||||
        if (missingBlockHeights.length <= 0) {
 | 
			
		||||
@ -590,33 +639,65 @@ class Blocks {
 | 
			
		||||
 | 
			
		||||
        logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining);
 | 
			
		||||
 | 
			
		||||
        for (const blockHeight of missingBlockHeights) {
 | 
			
		||||
          if (blockHeight < lastBlockToIndex) {
 | 
			
		||||
            break;
 | 
			
		||||
        const promises: Promise<void>[] = [];
 | 
			
		||||
 | 
			
		||||
        // This function assigns a task to a worker
 | 
			
		||||
        const assignTask = (worker: Worker): boolean => {
 | 
			
		||||
          if (missingBlockHeights.length === 0) {
 | 
			
		||||
            return false;
 | 
			
		||||
          } else {
 | 
			
		||||
            worker.postMessage({ height: missingBlockHeights.shift() });
 | 
			
		||||
            return true;
 | 
			
		||||
          }
 | 
			
		||||
          ++indexedThisRun;
 | 
			
		||||
          ++totalIndexed;
 | 
			
		||||
          const elapsedSeconds = (Date.now() / 1000) - timer;
 | 
			
		||||
          if (elapsedSeconds > 5 || blockHeight === lastBlockToIndex) {
 | 
			
		||||
            const runningFor = (Date.now() / 1000) - startedAt;
 | 
			
		||||
            const blockPerSeconds = indexedThisRun / elapsedSeconds;
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        const handleResult = (height: number): void => {
 | 
			
		||||
          indexedThisRun++;
 | 
			
		||||
          totalIndexed++;
 | 
			
		||||
          newlyIndexed++;
 | 
			
		||||
          const elapsed = Date.now() - timer;
 | 
			
		||||
          if (elapsed > 5000 || height === lastBlockToIndex) {
 | 
			
		||||
            const runningFor = Date.now() - startedAt;
 | 
			
		||||
            const blockPerSeconds = indexedThisRun / (elapsed / 1000);
 | 
			
		||||
            const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100;
 | 
			
		||||
            logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${runningFor.toFixed(2)} seconds`, logger.tags.mining);
 | 
			
		||||
            timer = Date.now() / 1000;
 | 
			
		||||
            logger.debug(`Indexing block #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress.toFixed(2)}%) | elapsed: ${(runningFor / 1000).toFixed(2)} seconds`, logger.tags.mining);
 | 
			
		||||
            timer = Date.now();
 | 
			
		||||
            indexedThisRun = 0;
 | 
			
		||||
            loadingIndicators.setProgress('block-indexing', progress, false);
 | 
			
		||||
          }
 | 
			
		||||
          const blockHash = await bitcoinApi.$getBlockHash(blockHeight);
 | 
			
		||||
          const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
 | 
			
		||||
          const transactions = await this.$getTransactionsExtended(blockHash, block.height, true, null, true);
 | 
			
		||||
          const blockExtended = await this.$getBlockExtended(block, transactions);
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
          newlyIndexed++;
 | 
			
		||||
          await blocksRepository.$saveBlockInDatabase(blockExtended);
 | 
			
		||||
        // Start a task on each worker
 | 
			
		||||
        for (const worker of workerPool) {
 | 
			
		||||
          promises.push(new Promise((resolve, reject) => {
 | 
			
		||||
            worker.removeAllListeners();
 | 
			
		||||
            worker.on('message', (result) => {
 | 
			
		||||
              // Handle the result, then assign a new task to the worker
 | 
			
		||||
              handleResult(result);
 | 
			
		||||
              if (!assignTask(worker)) {
 | 
			
		||||
                resolve();
 | 
			
		||||
              };
 | 
			
		||||
            });
 | 
			
		||||
            worker.on('error', reject);
 | 
			
		||||
            if (!assignTask(worker)) {
 | 
			
		||||
              resolve();
 | 
			
		||||
            }
 | 
			
		||||
          }));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        await Promise.all(promises);
 | 
			
		||||
 | 
			
		||||
        currentBlockHeight -= chunkSize;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      for (const worker of workerPool) {
 | 
			
		||||
        if (worker) {
 | 
			
		||||
          // clean up the workers
 | 
			
		||||
          worker.removeAllListeners();
 | 
			
		||||
          worker.terminate();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (newlyIndexed > 0) {
 | 
			
		||||
        logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
 | 
			
		||||
      } else {
 | 
			
		||||
@ -627,6 +708,14 @@ class Blocks {
 | 
			
		||||
      logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
 | 
			
		||||
      loadingIndicators.setProgress('block-indexing', 100);
 | 
			
		||||
      throw e;
 | 
			
		||||
    } finally {
 | 
			
		||||
      for (const worker of workerPool) {
 | 
			
		||||
        if (worker) {
 | 
			
		||||
          // clean up the workers
 | 
			
		||||
          worker.removeAllListeners();
 | 
			
		||||
          worker.terminate();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return await BlocksRepository.$validateChain();
 | 
			
		||||
 | 
			
		||||
@ -53,7 +53,7 @@ class RbfCache {
 | 
			
		||||
  private expiring: Map<string, number> = new Map();
 | 
			
		||||
  private cacheQueue: CacheEvent[] = [];
 | 
			
		||||
 | 
			
		||||
  constructor() {
 | 
			
		||||
  public init(): void {
 | 
			
		||||
    setInterval(this.cleanup.bind(this), 1000 * 60 * 10);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -23,24 +23,21 @@ class RedisCache {
 | 
			
		||||
  private cacheQueue: MempoolTransactionExtended[] = [];
 | 
			
		||||
  private txFlushLimit: number = 10000;
 | 
			
		||||
 | 
			
		||||
  constructor() {
 | 
			
		||||
    if (config.REDIS.ENABLED) {
 | 
			
		||||
      const redisConfig = {
 | 
			
		||||
        socket: {
 | 
			
		||||
          path: config.REDIS.UNIX_SOCKET_PATH
 | 
			
		||||
        },
 | 
			
		||||
        database: NetworkDB[config.MEMPOOL.NETWORK],
 | 
			
		||||
      };
 | 
			
		||||
      this.client = createClient(redisConfig);
 | 
			
		||||
      this.client.on('error', (e) => {
 | 
			
		||||
        logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      });
 | 
			
		||||
      this.$ensureConnected();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $ensureConnected(): Promise<void> {
 | 
			
		||||
    if (!this.connected && config.REDIS.ENABLED) {
 | 
			
		||||
      if (!this.client) {
 | 
			
		||||
        const redisConfig = {
 | 
			
		||||
          socket: {
 | 
			
		||||
            path: config.REDIS.UNIX_SOCKET_PATH
 | 
			
		||||
          },
 | 
			
		||||
          database: NetworkDB[config.MEMPOOL.NETWORK],
 | 
			
		||||
        };
 | 
			
		||||
        this.client = createClient(redisConfig);
 | 
			
		||||
        this.client.on('error', (e) => {
 | 
			
		||||
          logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
        });
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      return this.client.connect().then(async () => {
 | 
			
		||||
        this.connected = true;
 | 
			
		||||
        logger.info(`Redis client connected`);
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										38
									
								
								backend/src/index-workers/block-summary-worker.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								backend/src/index-workers/block-summary-worker.ts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,38 @@
 | 
			
		||||
import { parentPort } from 'worker_threads';
 | 
			
		||||
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
 | 
			
		||||
import blocks from '../api/blocks';
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import transactionUtils from '../api/transaction-utils';
 | 
			
		||||
import bitcoinClient from '../api/bitcoin/bitcoin-client';
 | 
			
		||||
 | 
			
		||||
if (parentPort) {
 | 
			
		||||
  parentPort.on('message', async ({ hash, height }) => {
 | 
			
		||||
    if (hash != null && height != null) {
 | 
			
		||||
      await indexBlockSummary(hash, height);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (parentPort) {
 | 
			
		||||
      parentPort.postMessage(height);
 | 
			
		||||
    }
 | 
			
		||||
  });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function indexBlockSummary(hash: string, height: number): Promise<void> {
 | 
			
		||||
  let txs;
 | 
			
		||||
  if (config.MEMPOOL.BACKEND === 'esplora') {
 | 
			
		||||
    txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx));
 | 
			
		||||
  } else {
 | 
			
		||||
    const block = await bitcoinClient.getBlock(hash, 2);
 | 
			
		||||
    txs = block.tx.map(tx => {
 | 
			
		||||
      tx.fee = Math.round(tx.fee * 100_000_000);
 | 
			
		||||
      tx.vout.forEach((vout) => {
 | 
			
		||||
        vout.value = Math.round(vout.value * 100000000);
 | 
			
		||||
      });
 | 
			
		||||
      tx.vsize = Math.round(tx.weight / 4); // required for backwards compatibility
 | 
			
		||||
      return tx;
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  const cpfpSummary = await blocks.$indexCPFP(hash, height, txs);
 | 
			
		||||
  await blocks.$getStrippedBlockTransactions(hash, true, true, cpfpSummary, height); // This will index the block summary
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										25
									
								
								backend/src/index-workers/block-worker.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								backend/src/index-workers/block-worker.ts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,25 @@
 | 
			
		||||
import { parentPort } from 'worker_threads';
 | 
			
		||||
import bitcoinApi from '../api/bitcoin/bitcoin-api-factory';
 | 
			
		||||
import blocksRepository from '../repositories/BlocksRepository';
 | 
			
		||||
import blocks from '../api/blocks';
 | 
			
		||||
import { IEsploraApi } from '../api/bitcoin/esplora-api.interface';
 | 
			
		||||
 | 
			
		||||
if (parentPort) {
 | 
			
		||||
  parentPort.on('message', async (params) => {
 | 
			
		||||
    if (params.height != null) {
 | 
			
		||||
      await indexBlock(params.height);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (parentPort) {
 | 
			
		||||
      parentPort.postMessage(params.height);
 | 
			
		||||
    }
 | 
			
		||||
  });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function indexBlock(blockHeight: number): Promise<void> {
 | 
			
		||||
  const blockHash = await bitcoinApi.$getBlockHash(blockHeight);
 | 
			
		||||
  const block: IEsploraApi.Block = await bitcoinApi.$getBlock(blockHash);
 | 
			
		||||
  const transactions = await blocks['$getTransactionsExtended'](blockHash, block.height, true, null, true);
 | 
			
		||||
  const blockExtended = await blocks['$getBlockExtended'](block, transactions);
 | 
			
		||||
  await blocksRepository.$saveBlockInDatabase(blockExtended);
 | 
			
		||||
}
 | 
			
		||||
@ -43,6 +43,7 @@ import { AxiosError } from 'axios';
 | 
			
		||||
import v8 from 'v8';
 | 
			
		||||
import { formatBytes, getBytesUnit } from './utils/format';
 | 
			
		||||
import redisCache from './api/redis-cache';
 | 
			
		||||
import rbfCache from './api/rbf-cache';
 | 
			
		||||
 | 
			
		||||
class Server {
 | 
			
		||||
  private wss: WebSocket.Server | undefined;
 | 
			
		||||
@ -107,6 +108,8 @@ class Server {
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    rbfCache.init();
 | 
			
		||||
 | 
			
		||||
    this.app
 | 
			
		||||
      .use((req: Request, res: Response, next: NextFunction) => {
 | 
			
		||||
        res.setHeader('Access-Control-Allow-Origin', '*');
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user