load mempool txs in bulk from esplora

This commit is contained in:
Mononaut 2023-07-22 14:09:11 +09:00
parent e2fdacfddd
commit 202d4122b4
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
4 changed files with 86 additions and 35 deletions

View File

@ -3,6 +3,7 @@ import { IEsploraApi } from './esplora-api.interface';
export interface AbstractBitcoinApi { export interface AbstractBitcoinApi {
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>; $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
$getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>; $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
$getMempoolTransactions(expectedCount: number);
$getTransactionHex(txId: string): Promise<string>; $getTransactionHex(txId: string): Promise<string>;
$getBlockHeightTip(): Promise<number>; $getBlockHeightTip(): Promise<number>;
$getBlockHashTip(): Promise<string>; $getBlockHashTip(): Promise<string>;

View File

@ -59,6 +59,10 @@ class BitcoinApi implements AbstractBitcoinApi {
}); });
} }
$getMempoolTransactions(expectedCount: number): Promise<IEsploraApi.Transaction[]> {
return Promise.resolve([]);
}
$getTransactionHex(txId: string): Promise<string> { $getTransactionHex(txId: string): Promise<string> {
return this.$getRawTransaction(txId, true) return this.$getRawTransaction(txId, true)
.then((tx) => tx.hex || ''); .then((tx) => tx.hex || '');

View File

@ -5,6 +5,8 @@ import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory';
import { IEsploraApi } from './esplora-api.interface'; import { IEsploraApi } from './esplora-api.interface';
import logger from '../../logger'; import logger from '../../logger';
import JsonStream from 'JSONStream';
const axiosConnection = axios.create({ const axiosConnection = axios.create({
httpAgent: new http.Agent({ keepAlive: true, }) httpAgent: new http.Agent({ keepAlive: true, })
}); });
@ -69,6 +71,27 @@ class ElectrsApi implements AbstractBitcoinApi {
return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId); return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId);
} }
async $getMempoolTransactions(expectedCount: number): Promise<IEsploraApi.Transaction[]> {
const transactions: IEsploraApi.Transaction[] = [];
let count = 0;
return new Promise((resolve, reject) => {
axiosConnection.get(config.ESPLORA.REST_API_URL + '/mempool/txs', { ...this.activeAxiosConfig, timeout: 60000, responseType: 'stream' }).then(response => {
response.data.pipe(JsonStream.parse('*')).on('data', transaction => {
count++;
if (count % 10000 === 0) {
logger.info(`Fetched ${count} of ${expectedCount} mempool transactions from esplora`);
}
transactions.push(transaction);
}).on('end', () => {
logger.info(`Fetched all ${count} of ${expectedCount} mempool transactions from esplora`);
resolve(transactions);
}).on('error', (err) => {
reject(err);
});
});
});
}
$getTransactionHex(txId: string): Promise<string> { $getTransactionHex(txId: string): Promise<string> {
return this.$queryWrapper<string>(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex'); return this.$queryWrapper<string>(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex');
} }

View File

@ -1,5 +1,5 @@
import config from '../config'; import config from '../config';
import bitcoinApi from './bitcoin/bitcoin-api-factory'; import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces';
import logger from '../logger'; import logger from '../logger';
import { Common } from './common'; import { Common } from './common';
@ -103,6 +103,16 @@ class Mempool {
this.addToSpendMap(Object.values(this.mempoolCache)); this.addToSpendMap(Object.values(this.mempoolCache));
} }
public async $reloadMempool(expectedCount: number): Promise<void> {
const rawTransactions = await bitcoinApi.$getMempoolTransactions(expectedCount);
logger.info(`Inserting loaded mempool transactions into local cache`);
for (const transaction of rawTransactions) {
const extendedTransaction = transactionUtils.extendMempoolTransaction(transaction);
this.mempoolCache[extendedTransaction.txid] = extendedTransaction;
}
logger.info(`Done inserting loaded mempool transactions into local cache`);
}
public async $updateMemPoolInfo() { public async $updateMemPoolInfo() {
this.mempoolInfo = await this.$getMempoolInfo(); this.mempoolInfo = await this.$getMempoolInfo();
} }
@ -162,41 +172,54 @@ class Mempool {
}; };
let intervalTimer = Date.now(); let intervalTimer = Date.now();
for (const txid of transactions) {
if (!this.mempoolCache[txid]) { let loaded = false;
try { if (config.MEMPOOL.BACKEND === 'esplora' && currentMempoolSize < transactions.length * 0.5 && transactions.length > 20_000) {
const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`);
this.updateTimerProgress(timer, 'fetched new transaction'); try {
this.mempoolCache[txid] = transaction; await this.$reloadMempool(transactions.length);
if (this.inSync) { loaded = true;
this.txPerSecondArray.push(new Date().getTime()); } catch (e) {
this.vBytesPerSecondArray.push({ logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions');
unixTime: new Date().getTime(),
vSize: transaction.vsize,
});
}
hasChange = true;
newTransactions.push(transaction);
} catch (e: any) {
if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
this.missingTxCount++;
}
logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
}
} }
}
if (Date.now() - intervalTimer > 5_000) { if (!loaded) {
for (const txid of transactions) {
if (!this.mempoolCache[txid]) {
try {
const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false);
this.updateTimerProgress(timer, 'fetched new transaction');
this.mempoolCache[txid] = transaction;
if (this.inSync) {
this.txPerSecondArray.push(new Date().getTime());
this.vBytesPerSecondArray.push({
unixTime: new Date().getTime(),
vSize: transaction.vsize,
});
}
hasChange = true;
newTransactions.push(transaction);
} catch (e: any) {
if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
this.missingTxCount++;
}
logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
}
}
if (this.inSync) { if (Date.now() - intervalTimer > 5_000) {
// Break and restart mempool loop if we spend too much time processing if (this.inSync) {
// new transactions that may lead to falling behind on block height // Break and restart mempool loop if we spend too much time processing
logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); // new transactions that may lead to falling behind on block height
break; logger.debug('Breaking mempool loop because the 5s time limit exceeded.');
} else { break;
const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; } else {
logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100;
loadingIndicators.setProgress('mempool', progress); logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`);
intervalTimer = Date.now() loadingIndicators.setProgress('mempool', progress);
intervalTimer = Date.now()
}
} }
} }
} }