Merge pull request #4085 from mempool/mononaut/fast-mempool-sync

use bulk mempool post api to batch mempool update requests
This commit is contained in:
softsimon 2023-08-04 17:26:01 +09:00 committed by GitHub
commit d2641cc927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 28 deletions

View File

@ -3,7 +3,8 @@ import { IEsploraApi } from './esplora-api.interface';
export interface AbstractBitcoinApi { export interface AbstractBitcoinApi {
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>; $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
$getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>; $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
$getMempoolTransactions(lastTxid: string); $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>;
$getAllMempoolTransactions(lastTxid: string);
$getTransactionHex(txId: string): Promise<string>; $getTransactionHex(txId: string): Promise<string>;
$getBlockHeightTip(): Promise<number>; $getBlockHeightTip(): Promise<number>;
$getBlockHashTip(): Promise<string>; $getBlockHashTip(): Promise<string>;

View File

@ -60,8 +60,13 @@ class BitcoinApi implements AbstractBitcoinApi {
}); });
} }
$getMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> { $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
return Promise.resolve([]); throw new Error('Method getMempoolTransactions not supported by the Bitcoin RPC API.');
}
$getAllMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> {
throw new Error('Method getAllMempoolTransactions not supported by the Bitcoin RPC API.');
} }
async $getTransactionHex(txId: string): Promise<string> { async $getTransactionHex(txId: string): Promise<string> {

View File

@ -61,6 +61,25 @@ class ElectrsApi implements AbstractBitcoinApi {
}); });
} }
$postWrapper<T>(url, body, responseType = 'json', params: any = undefined): Promise<T> {
return axiosConnection.post<T>(url, body, { ...this.activeAxiosConfig, responseType: responseType, params })
.then((response) => response.data)
.catch((e) => {
if (e?.code === 'ECONNREFUSED') {
this.fallbackToTcpSocket();
// Retry immediately
return axiosConnection.post<T>(url, body, this.activeAxiosConfig)
.then((response) => response.data)
.catch((e) => {
logger.warn(`Cannot query esplora through the unix socket nor the tcp socket. Exception ${e}`);
throw e;
});
} else {
throw e;
}
});
}
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> { $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
return this.$queryWrapper<IEsploraApi.Transaction['txid'][]>(config.ESPLORA.REST_API_URL + '/mempool/txids'); return this.$queryWrapper<IEsploraApi.Transaction['txid'][]>(config.ESPLORA.REST_API_URL + '/mempool/txids');
} }
@ -69,7 +88,11 @@ class ElectrsApi implements AbstractBitcoinApi {
return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId); return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId);
} }
async $getMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> { async $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
return this.$postWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs', txids, 'json');
}
async $getAllMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> {
return this.$queryWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : '')); return this.$queryWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : ''));
} }

View File

@ -126,7 +126,7 @@ class Mempool {
loadingIndicators.setProgress('mempool', count / expectedCount * 100); loadingIndicators.setProgress('mempool', count / expectedCount * 100);
while (!done) { while (!done) {
try { try {
const result = await bitcoinApi.$getMempoolTransactions(last_txid); const result = await bitcoinApi.$getAllMempoolTransactions(last_txid);
if (result) { if (result) {
for (const tx of result) { for (const tx of result) {
const extendedTransaction = transactionUtils.extendMempoolTransaction(tx); const extendedTransaction = transactionUtils.extendMempoolTransaction(tx);
@ -234,31 +234,37 @@ class Mempool {
} }
if (!loaded) { if (!loaded) {
for (const txid of transactions) { const remainingTxids = transactions.filter(txid => !this.mempoolCache[txid]);
if (!this.mempoolCache[txid]) { const sliceLength = 10000;
try { for (let i = 0; i < Math.ceil(remainingTxids.length / sliceLength); i++) {
const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); const slice = remainingTxids.slice(i * sliceLength, (i + 1) * sliceLength);
this.updateTimerProgress(timer, 'fetched new transaction'); const txs = await transactionUtils.$getMempoolTransactionsExtended(slice, false, false, false);
this.mempoolCache[txid] = transaction; logger.debug(`fetched ${txs.length} transactions`);
if (this.inSync) { this.updateTimerProgress(timer, 'fetched new transactions');
this.txPerSecondArray.push(new Date().getTime());
this.vBytesPerSecondArray.push({
unixTime: new Date().getTime(),
vSize: transaction.vsize,
});
}
hasChange = true;
newTransactions.push(transaction);
if (config.REDIS.ENABLED) { for (const transaction of txs) {
await redisCache.$addTransaction(transaction); this.mempoolCache[transaction.txid] = transaction;
} if (this.inSync) {
} catch (e: any) { this.txPerSecondArray.push(new Date().getTime());
if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { this.vBytesPerSecondArray.push({
this.missingTxCount++; unixTime: new Date().getTime(),
} vSize: transaction.vsize,
logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); });
} }
hasChange = true;
newTransactions.push(transaction);
if (config.REDIS.ENABLED) {
await redisCache.$addTransaction(transaction);
}
}
if (txs.length < slice.length) {
const missing = slice.length - txs.length;
if (config.MEMPOOL.BACKEND === 'esplora') {
this.missingTxCount += missing;
}
logger.debug(`Error finding ${missing} transactions in the mempool: `);
} }
if (Date.now() - intervalTimer > Math.max(pollRate * 2, 5_000)) { if (Date.now() - intervalTimer > Math.max(pollRate * 2, 5_000)) {

View File

@ -4,6 +4,7 @@ import { Common } from './common';
import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
import * as bitcoinjs from 'bitcoinjs-lib'; import * as bitcoinjs from 'bitcoinjs-lib';
import logger from '../logger'; import logger from '../logger';
import config from '../config';
class TransactionUtils { class TransactionUtils {
constructor() { } constructor() { }
@ -71,6 +72,24 @@ class TransactionUtils {
return (await this.$getTransactionExtended(txId, addPrevouts, lazyPrevouts, forceCore, true)) as MempoolTransactionExtended; return (await this.$getTransactionExtended(txId, addPrevouts, lazyPrevouts, forceCore, true)) as MempoolTransactionExtended;
} }
public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<MempoolTransactionExtended[]> {
if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') {
const results = await Promise.allSettled(txids.map(txid => this.$getTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore, true)));
return (results.filter(r => r.status === 'fulfilled') as PromiseFulfilledResult<MempoolTransactionExtended>[]).map(r => r.value);
} else {
const transactions = await bitcoinApi.$getMempoolTransactions(txids);
return transactions.map(transaction => {
if (Common.isLiquid()) {
if (!isFinite(Number(transaction.fee))) {
transaction.fee = Object.values(transaction.fee || {}).reduce((total, output) => total + output, 0);
}
}
return this.extendMempoolTransaction(transaction);
});
}
}
public extendTransaction(transaction: IEsploraApi.Transaction): TransactionExtended { public extendTransaction(transaction: IEsploraApi.Transaction): TransactionExtended {
// @ts-ignore // @ts-ignore
if (transaction.vsize) { if (transaction.vsize) {