From 202d4122b4d811dab03ab9eaf0b79c4df6b64be0 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sat, 22 Jul 2023 14:09:11 +0900 Subject: [PATCH] load mempool txs in bulk from esplora --- .../bitcoin/bitcoin-api-abstract-factory.ts | 1 + backend/src/api/bitcoin/bitcoin-api.ts | 4 + backend/src/api/bitcoin/esplora-api.ts | 23 +++++ backend/src/api/mempool.ts | 93 ++++++++++++------- 4 files changed, 86 insertions(+), 35 deletions(-) diff --git a/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts b/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts index c233ed5d7..d195b0eeb 100644 --- a/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts +++ b/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts @@ -3,6 +3,7 @@ import { IEsploraApi } from './esplora-api.interface'; export interface AbstractBitcoinApi { $getRawMempool(): Promise; $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise; + $getMempoolTransactions(expectedCount: number); $getTransactionHex(txId: string): Promise; $getBlockHeightTip(): Promise; $getBlockHashTip(): Promise; diff --git a/backend/src/api/bitcoin/bitcoin-api.ts b/backend/src/api/bitcoin/bitcoin-api.ts index c045d8664..237c69834 100644 --- a/backend/src/api/bitcoin/bitcoin-api.ts +++ b/backend/src/api/bitcoin/bitcoin-api.ts @@ -59,6 +59,10 @@ class BitcoinApi implements AbstractBitcoinApi { }); } + $getMempoolTransactions(expectedCount: number): Promise { + return Promise.resolve([]); + } + $getTransactionHex(txId: string): Promise { return this.$getRawTransaction(txId, true) .then((tx) => tx.hex || ''); diff --git a/backend/src/api/bitcoin/esplora-api.ts b/backend/src/api/bitcoin/esplora-api.ts index 5bfff5730..34de4f94f 100644 --- a/backend/src/api/bitcoin/esplora-api.ts +++ b/backend/src/api/bitcoin/esplora-api.ts @@ -5,6 +5,8 @@ import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory'; import { IEsploraApi } from './esplora-api.interface'; import logger from '../../logger'; +import JsonStream from 'JSONStream'; + const axiosConnection = axios.create({ httpAgent: new http.Agent({ keepAlive: true, }) }); @@ -69,6 +71,27 @@ class ElectrsApi implements AbstractBitcoinApi { return this.$queryWrapper(config.ESPLORA.REST_API_URL + '/tx/' + txId); } + async $getMempoolTransactions(expectedCount: number): Promise { + const transactions: IEsploraApi.Transaction[] = []; + let count = 0; + return new Promise((resolve, reject) => { + axiosConnection.get(config.ESPLORA.REST_API_URL + '/mempool/txs', { ...this.activeAxiosConfig, timeout: 60000, responseType: 'stream' }).then(response => { + response.data.pipe(JsonStream.parse('*')).on('data', transaction => { + count++; + if (count % 10000 === 0) { + logger.info(`Fetched ${count} of ${expectedCount} mempool transactions from esplora`); + } + transactions.push(transaction); + }).on('end', () => { + logger.info(`Fetched all ${count} of ${expectedCount} mempool transactions from esplora`); + resolve(transactions); + }).on('error', (err) => { + reject(err); + }); + }); + }); + } + $getTransactionHex(txId: string): Promise { return this.$queryWrapper(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex'); } diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index d988ea47a..d0e63ae78 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -1,5 +1,5 @@ import config from '../config'; -import bitcoinApi from './bitcoin/bitcoin-api-factory'; +import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; import logger from '../logger'; import { Common } from './common'; @@ -103,6 +103,16 @@ class Mempool { this.addToSpendMap(Object.values(this.mempoolCache)); } + public async $reloadMempool(expectedCount: number): Promise { + const rawTransactions = await bitcoinApi.$getMempoolTransactions(expectedCount); + logger.info(`Inserting loaded mempool transactions into local cache`); + for (const transaction of rawTransactions) { + const extendedTransaction = transactionUtils.extendMempoolTransaction(transaction); + this.mempoolCache[extendedTransaction.txid] = extendedTransaction; + } + logger.info(`Done inserting loaded mempool transactions into local cache`); + } + public async $updateMemPoolInfo() { this.mempoolInfo = await this.$getMempoolInfo(); } @@ -162,41 +172,54 @@ class Mempool { }; let intervalTimer = Date.now(); - for (const txid of transactions) { - if (!this.mempoolCache[txid]) { - try { - const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); - this.updateTimerProgress(timer, 'fetched new transaction'); - this.mempoolCache[txid] = transaction; - if (this.inSync) { - this.txPerSecondArray.push(new Date().getTime()); - this.vBytesPerSecondArray.push({ - unixTime: new Date().getTime(), - vSize: transaction.vsize, - }); - } - hasChange = true; - newTransactions.push(transaction); - } catch (e: any) { - if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { - this.missingTxCount++; - } - logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); - } - } - if (Date.now() - intervalTimer > 5_000) { - - if (this.inSync) { - // Break and restart mempool loop if we spend too much time processing - // new transactions that may lead to falling behind on block height - logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); - break; - } else { - const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; - logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); - loadingIndicators.setProgress('mempool', progress); - intervalTimer = Date.now() + let loaded = false; + if (config.MEMPOOL.BACKEND === 'esplora' && currentMempoolSize < transactions.length * 0.5 && transactions.length > 20_000) { + logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); + try { + await this.$reloadMempool(transactions.length); + loaded = true; + } catch (e) { + logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); + } + } + + if (!loaded) { + for (const txid of transactions) { + if (!this.mempoolCache[txid]) { + try { + const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); + this.updateTimerProgress(timer, 'fetched new transaction'); + this.mempoolCache[txid] = transaction; + if (this.inSync) { + this.txPerSecondArray.push(new Date().getTime()); + this.vBytesPerSecondArray.push({ + unixTime: new Date().getTime(), + vSize: transaction.vsize, + }); + } + hasChange = true; + newTransactions.push(transaction); + } catch (e: any) { + if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { + this.missingTxCount++; + } + logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); + } + } + + if (Date.now() - intervalTimer > 5_000) { + if (this.inSync) { + // Break and restart mempool loop if we spend too much time processing + // new transactions that may lead to falling behind on block height + logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); + break; + } else { + const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; + logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); + loadingIndicators.setProgress('mempool', progress); + intervalTimer = Date.now() + } } } }