use bulk mempool/txs post api to batch mempool update requests
This commit is contained in:
		
							parent
							
								
									63993b01aa
								
							
						
					
					
						commit
						ca0c6b5e6e
					
				@ -3,7 +3,8 @@ import { IEsploraApi } from './esplora-api.interface';
 | 
			
		||||
export interface AbstractBitcoinApi {
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
 | 
			
		||||
  $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
 | 
			
		||||
  $getMempoolTransactions(lastTxid: string);
 | 
			
		||||
  $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>;
 | 
			
		||||
  $getAllMempoolTransactions(lastTxid: string);
 | 
			
		||||
  $getTransactionHex(txId: string): Promise<string>;
 | 
			
		||||
  $getBlockHeightTip(): Promise<number>;
 | 
			
		||||
  $getBlockHashTip(): Promise<string>;
 | 
			
		||||
 | 
			
		||||
@ -60,8 +60,13 @@ class BitcoinApi implements AbstractBitcoinApi {
 | 
			
		||||
      });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    return Promise.resolve([]);
 | 
			
		||||
  $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    throw new Error('Method getMempoolTransactions not supported by the Bitcoin RPC API.');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getAllMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    throw new Error('Method getAllMempoolTransactions not supported by the Bitcoin RPC API.');
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $getTransactionHex(txId: string): Promise<string> {
 | 
			
		||||
 | 
			
		||||
@ -61,6 +61,25 @@ class ElectrsApi implements AbstractBitcoinApi {
 | 
			
		||||
      });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $postWrapper<T>(url, body, responseType = 'json', params: any = undefined): Promise<T> {
 | 
			
		||||
    return axiosConnection.post<T>(url, body, { ...this.activeAxiosConfig, responseType: responseType, params })
 | 
			
		||||
      .then((response) => response.data)
 | 
			
		||||
      .catch((e) => {
 | 
			
		||||
        if (e?.code === 'ECONNREFUSED') {
 | 
			
		||||
          this.fallbackToTcpSocket();
 | 
			
		||||
          // Retry immediately
 | 
			
		||||
          return axiosConnection.post<T>(url, body, this.activeAxiosConfig)
 | 
			
		||||
            .then((response) => response.data)
 | 
			
		||||
            .catch((e) => {
 | 
			
		||||
              logger.warn(`Cannot query esplora through the unix socket nor the tcp socket. Exception ${e}`);
 | 
			
		||||
              throw e;
 | 
			
		||||
            });
 | 
			
		||||
        } else {
 | 
			
		||||
          throw e;
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
 | 
			
		||||
    return this.$queryWrapper<IEsploraApi.Transaction['txid'][]>(config.ESPLORA.REST_API_URL + '/mempool/txids');
 | 
			
		||||
  }
 | 
			
		||||
@ -69,7 +88,11 @@ class ElectrsApi implements AbstractBitcoinApi {
 | 
			
		||||
    return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $getMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
  async $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    return this.$postWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs', txids, 'json');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $getAllMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    return this.$queryWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : ''));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -123,7 +123,7 @@ class Mempool {
 | 
			
		||||
    loadingIndicators.setProgress('mempool', count / expectedCount * 100);
 | 
			
		||||
    while (!done) {
 | 
			
		||||
      try {
 | 
			
		||||
        const result = await bitcoinApi.$getMempoolTransactions(last_txid);
 | 
			
		||||
        const result = await bitcoinApi.$getAllMempoolTransactions(last_txid);
 | 
			
		||||
        if (result) {
 | 
			
		||||
          for (const tx of result) {
 | 
			
		||||
            const extendedTransaction = transactionUtils.extendMempoolTransaction(tx);
 | 
			
		||||
@ -231,31 +231,37 @@ class Mempool {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!loaded) {
 | 
			
		||||
      for (const txid of transactions) {
 | 
			
		||||
        if (!this.mempoolCache[txid]) {
 | 
			
		||||
          try {
 | 
			
		||||
            const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false);
 | 
			
		||||
            this.updateTimerProgress(timer, 'fetched new transaction');
 | 
			
		||||
            this.mempoolCache[txid] = transaction;
 | 
			
		||||
            if (this.inSync) {
 | 
			
		||||
              this.txPerSecondArray.push(new Date().getTime());
 | 
			
		||||
              this.vBytesPerSecondArray.push({
 | 
			
		||||
                unixTime: new Date().getTime(),
 | 
			
		||||
                vSize: transaction.vsize,
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
            hasChange = true;
 | 
			
		||||
            newTransactions.push(transaction);
 | 
			
		||||
      const remainingTxids = transactions.filter(txid => !this.mempoolCache[txid]);
 | 
			
		||||
      const sliceLength = 10000;
 | 
			
		||||
      for (let i = 0; i < Math.ceil(remainingTxids.length / sliceLength); i++) {
 | 
			
		||||
        const slice = remainingTxids.slice(i * sliceLength, (i + 1) * sliceLength);
 | 
			
		||||
        const txs = await transactionUtils.$getMempoolTransactionsExtended(slice, false, false, false);
 | 
			
		||||
        logger.debug(`fetched ${txs.length} transactions`);
 | 
			
		||||
        this.updateTimerProgress(timer, 'fetched new transactions');
 | 
			
		||||
 | 
			
		||||
            if (config.REDIS.ENABLED) {
 | 
			
		||||
              await redisCache.$addTransaction(transaction);
 | 
			
		||||
            }
 | 
			
		||||
          } catch (e: any) {
 | 
			
		||||
            if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
 | 
			
		||||
              this.missingTxCount++;
 | 
			
		||||
            }
 | 
			
		||||
            logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
        for (const transaction of txs) {
 | 
			
		||||
          this.mempoolCache[transaction.txid] = transaction;
 | 
			
		||||
          if (this.inSync) {
 | 
			
		||||
            this.txPerSecondArray.push(new Date().getTime());
 | 
			
		||||
            this.vBytesPerSecondArray.push({
 | 
			
		||||
              unixTime: new Date().getTime(),
 | 
			
		||||
              vSize: transaction.vsize,
 | 
			
		||||
            });
 | 
			
		||||
          }
 | 
			
		||||
          hasChange = true;
 | 
			
		||||
          newTransactions.push(transaction);
 | 
			
		||||
 | 
			
		||||
          if (config.REDIS.ENABLED) {
 | 
			
		||||
            await redisCache.$addTransaction(transaction);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (txs.length < slice.length) {
 | 
			
		||||
          const missing = slice.length - txs.length;
 | 
			
		||||
          if (config.MEMPOOL.BACKEND === 'esplora') {
 | 
			
		||||
            this.missingTxCount += missing;
 | 
			
		||||
          }
 | 
			
		||||
          logger.debug(`Error finding ${missing} transactions in the mempool: `);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (Date.now() - intervalTimer > Math.max(pollRate * 2, 5_000)) {
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,7 @@ import { Common } from './common';
 | 
			
		||||
import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
 | 
			
		||||
import * as bitcoinjs from 'bitcoinjs-lib';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import config from '../config';
 | 
			
		||||
 | 
			
		||||
class TransactionUtils {
 | 
			
		||||
  constructor() { }
 | 
			
		||||
@ -71,6 +72,24 @@ class TransactionUtils {
 | 
			
		||||
    return (await this.$getTransactionExtended(txId, addPrevouts, lazyPrevouts, forceCore, true)) as MempoolTransactionExtended;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<MempoolTransactionExtended[]> {
 | 
			
		||||
    if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') {
 | 
			
		||||
      const results = await Promise.allSettled(txids.map(txid => this.$getTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore, true)));
 | 
			
		||||
      return (results.filter(r => r.status === 'fulfilled') as PromiseFulfilledResult<MempoolTransactionExtended>[]).map(r => r.value);
 | 
			
		||||
    } else {
 | 
			
		||||
      const transactions = await bitcoinApi.$getMempoolTransactions(txids);
 | 
			
		||||
      return transactions.map(transaction => {
 | 
			
		||||
        if (Common.isLiquid()) {
 | 
			
		||||
          if (!isFinite(Number(transaction.fee))) {
 | 
			
		||||
            transaction.fee = Object.values(transaction.fee || {}).reduce((total, output) => total + output, 0);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return this.extendMempoolTransaction(transaction);
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public extendTransaction(transaction: IEsploraApi.Transaction): TransactionExtended {
 | 
			
		||||
    // @ts-ignore
 | 
			
		||||
    if (transaction.vsize) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user