Merge pull request #4043 from mempool/mononaut/mempool-sync-status
Mempool inSync status
This commit is contained in:
		
						commit
						ec6355dc75
					
				@ -3,6 +3,7 @@ import { IEsploraApi } from './esplora-api.interface';
 | 
			
		||||
export interface AbstractBitcoinApi {
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
 | 
			
		||||
  $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
 | 
			
		||||
  $getMempoolTransactions(lastTxid: string);
 | 
			
		||||
  $getTransactionHex(txId: string): Promise<string>;
 | 
			
		||||
  $getBlockHeightTip(): Promise<number>;
 | 
			
		||||
  $getBlockHashTip(): Promise<string>;
 | 
			
		||||
 | 
			
		||||
@ -59,6 +59,10 @@ class BitcoinApi implements AbstractBitcoinApi {
 | 
			
		||||
      });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    return Promise.resolve([]);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getTransactionHex(txId: string): Promise<string> {
 | 
			
		||||
    return this.$getRawTransaction(txId, true)
 | 
			
		||||
      .then((tx) => tx.hex || '');
 | 
			
		||||
 | 
			
		||||
@ -69,6 +69,10 @@ class ElectrsApi implements AbstractBitcoinApi {
 | 
			
		||||
    return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $getMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> {
 | 
			
		||||
    return this.$queryWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : ''));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getTransactionHex(txId: string): Promise<string> {
 | 
			
		||||
    return this.$queryWrapper<string>(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,5 @@
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import bitcoinApi from './bitcoin/bitcoin-api-factory';
 | 
			
		||||
import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
 | 
			
		||||
import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { Common } from './common';
 | 
			
		||||
@ -9,6 +9,7 @@ import loadingIndicators from './loading-indicators';
 | 
			
		||||
import bitcoinClient from './bitcoin/bitcoin-client';
 | 
			
		||||
import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
 | 
			
		||||
import rbfCache from './rbf-cache';
 | 
			
		||||
import { IEsploraApi } from './bitcoin/esplora-api.interface';
 | 
			
		||||
 | 
			
		||||
class Mempool {
 | 
			
		||||
  private inSync: boolean = false;
 | 
			
		||||
@ -103,6 +104,44 @@ class Mempool {
 | 
			
		||||
    this.addToSpendMap(Object.values(this.mempoolCache));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $reloadMempool(expectedCount: number): Promise<MempoolTransactionExtended[]> {
 | 
			
		||||
    let count = 0;
 | 
			
		||||
    let done = false;
 | 
			
		||||
    let last_txid;
 | 
			
		||||
    const newTransactions: MempoolTransactionExtended[] = [];
 | 
			
		||||
    loadingIndicators.setProgress('mempool', count / expectedCount * 100);
 | 
			
		||||
    while (!done) {
 | 
			
		||||
      try {
 | 
			
		||||
        const result = await bitcoinApi.$getMempoolTransactions(last_txid);
 | 
			
		||||
        if (result) {
 | 
			
		||||
          for (const tx of result) {
 | 
			
		||||
            const extendedTransaction = transactionUtils.extendMempoolTransaction(tx);
 | 
			
		||||
            if (!this.mempoolCache[extendedTransaction.txid]) {
 | 
			
		||||
              newTransactions.push(extendedTransaction);
 | 
			
		||||
              this.mempoolCache[extendedTransaction.txid] = extendedTransaction;
 | 
			
		||||
            }
 | 
			
		||||
            count++;
 | 
			
		||||
          }
 | 
			
		||||
          logger.info(`Fetched ${count} of ${expectedCount} mempool transactions from esplora`);
 | 
			
		||||
          if (result.length > 0) {
 | 
			
		||||
            last_txid = result[result.length - 1].txid;
 | 
			
		||||
          } else {
 | 
			
		||||
            done = true;
 | 
			
		||||
          }
 | 
			
		||||
          if (Math.floor((count / expectedCount) * 100) < 100) {
 | 
			
		||||
            loadingIndicators.setProgress('mempool', count / expectedCount * 100);
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          done = true;
 | 
			
		||||
        }
 | 
			
		||||
      } catch(err) {
 | 
			
		||||
        logger.err('failed to fetch bulk mempool transactions from esplora');
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    return newTransactions;
 | 
			
		||||
    logger.info(`Done inserting loaded mempool transactions into local cache`);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $updateMemPoolInfo() {
 | 
			
		||||
    this.mempoolInfo = await this.$getMempoolInfo();
 | 
			
		||||
  }
 | 
			
		||||
@ -143,7 +182,7 @@ class Mempool {
 | 
			
		||||
    const currentMempoolSize = Object.keys(this.mempoolCache).length;
 | 
			
		||||
    this.updateTimerProgress(timer, 'got raw mempool');
 | 
			
		||||
    const diff = transactions.length - currentMempoolSize;
 | 
			
		||||
    const newTransactions: MempoolTransactionExtended[] = [];
 | 
			
		||||
    let newTransactions: MempoolTransactionExtended[] = [];
 | 
			
		||||
 | 
			
		||||
    this.mempoolCacheDelta = Math.abs(diff);
 | 
			
		||||
 | 
			
		||||
@ -162,41 +201,57 @@ class Mempool {
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let intervalTimer = Date.now();
 | 
			
		||||
    for (const txid of transactions) {
 | 
			
		||||
      if (!this.mempoolCache[txid]) {
 | 
			
		||||
        try {
 | 
			
		||||
          const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false);
 | 
			
		||||
          this.updateTimerProgress(timer, 'fetched new transaction');
 | 
			
		||||
          this.mempoolCache[txid] = transaction;
 | 
			
		||||
          if (this.inSync) {
 | 
			
		||||
            this.txPerSecondArray.push(new Date().getTime());
 | 
			
		||||
            this.vBytesPerSecondArray.push({
 | 
			
		||||
              unixTime: new Date().getTime(),
 | 
			
		||||
              vSize: transaction.vsize,
 | 
			
		||||
            });
 | 
			
		||||
          }
 | 
			
		||||
          hasChange = true;
 | 
			
		||||
          newTransactions.push(transaction);
 | 
			
		||||
        } catch (e: any) {
 | 
			
		||||
          if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
 | 
			
		||||
            this.missingTxCount++;
 | 
			
		||||
          }
 | 
			
		||||
          logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Date.now() - intervalTimer > 5_000) {
 | 
			
		||||
        
 | 
			
		||||
        if (this.inSync) {
 | 
			
		||||
          // Break and restart mempool loop if we spend too much time processing
 | 
			
		||||
          // new transactions that may lead to falling behind on block height
 | 
			
		||||
          logger.debug('Breaking mempool loop because the 5s time limit exceeded.');
 | 
			
		||||
          break;
 | 
			
		||||
        } else {
 | 
			
		||||
          const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100;
 | 
			
		||||
          logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`);
 | 
			
		||||
          loadingIndicators.setProgress('mempool', progress);
 | 
			
		||||
          intervalTimer = Date.now()
 | 
			
		||||
    let loaded = false;
 | 
			
		||||
    if (config.MEMPOOL.BACKEND === 'esplora' && currentMempoolSize < transactions.length * 0.5 && transactions.length > 20_000) {
 | 
			
		||||
      this.inSync = false;
 | 
			
		||||
      logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`);
 | 
			
		||||
      try {
 | 
			
		||||
        newTransactions = await this.$reloadMempool(transactions.length);
 | 
			
		||||
        loaded = true;
 | 
			
		||||
      } catch (e) {
 | 
			
		||||
        logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions');
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!loaded) {
 | 
			
		||||
      for (const txid of transactions) {
 | 
			
		||||
        if (!this.mempoolCache[txid]) {
 | 
			
		||||
          try {
 | 
			
		||||
            const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false);
 | 
			
		||||
            this.updateTimerProgress(timer, 'fetched new transaction');
 | 
			
		||||
            this.mempoolCache[txid] = transaction;
 | 
			
		||||
            if (this.inSync) {
 | 
			
		||||
              this.txPerSecondArray.push(new Date().getTime());
 | 
			
		||||
              this.vBytesPerSecondArray.push({
 | 
			
		||||
                unixTime: new Date().getTime(),
 | 
			
		||||
                vSize: transaction.vsize,
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
            hasChange = true;
 | 
			
		||||
            newTransactions.push(transaction);
 | 
			
		||||
          } catch (e: any) {
 | 
			
		||||
            if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
 | 
			
		||||
              this.missingTxCount++;
 | 
			
		||||
            }
 | 
			
		||||
            logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (Date.now() - intervalTimer > 5_000) {
 | 
			
		||||
          if (this.inSync) {
 | 
			
		||||
            // Break and restart mempool loop if we spend too much time processing
 | 
			
		||||
            // new transactions that may lead to falling behind on block height
 | 
			
		||||
            logger.debug('Breaking mempool loop because the 5s time limit exceeded.');
 | 
			
		||||
            break;
 | 
			
		||||
          } else {
 | 
			
		||||
            const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100;
 | 
			
		||||
            logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`);
 | 
			
		||||
            if (Math.floor(progress) < 100) {
 | 
			
		||||
              loadingIndicators.setProgress('mempool', progress);
 | 
			
		||||
            }
 | 
			
		||||
            intervalTimer = Date.now()
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
@ -246,12 +301,6 @@ class Mempool {
 | 
			
		||||
    const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
 | 
			
		||||
    this.latestTransactions = newTransactionsStripped.concat(this.latestTransactions).slice(0, 6);
 | 
			
		||||
 | 
			
		||||
    if (!this.inSync && transactions.length === newMempoolSize) {
 | 
			
		||||
      this.inSync = true;
 | 
			
		||||
      logger.notice('The mempool is now in sync!');
 | 
			
		||||
      loadingIndicators.setProgress('mempool', 100);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.mempoolCacheDelta = Math.abs(transactions.length - newMempoolSize);
 | 
			
		||||
 | 
			
		||||
    if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) {
 | 
			
		||||
@ -263,6 +312,12 @@ class Mempool {
 | 
			
		||||
      this.updateTimerProgress(timer, 'completed async mempool callback');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!this.inSync && transactions.length === newMempoolSize) {
 | 
			
		||||
      this.inSync = true;
 | 
			
		||||
      logger.notice('The mempool is now in sync!');
 | 
			
		||||
      loadingIndicators.setProgress('mempool', 100);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const end = new Date().getTime();
 | 
			
		||||
    const time = end - start;
 | 
			
		||||
    logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`);
 | 
			
		||||
 | 
			
		||||
@ -604,7 +604,7 @@ class WebsocketHandler {
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-block'] >= 0) {
 | 
			
		||||
      if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
 | 
			
		||||
        const index = client['track-mempool-block'];
 | 
			
		||||
        if (mBlockDeltas[index]) {
 | 
			
		||||
          response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
 | 
			
		||||
@ -644,7 +644,7 @@ class WebsocketHandler {
 | 
			
		||||
    memPool.handleMinedRbfTransactions(rbfTransactions);
 | 
			
		||||
    memPool.removeFromSpendMap(transactions);
 | 
			
		||||
 | 
			
		||||
    if (config.MEMPOOL.AUDIT) {
 | 
			
		||||
    if (config.MEMPOOL.AUDIT && memPool.isInSync()) {
 | 
			
		||||
      let projectedBlocks;
 | 
			
		||||
      let auditMempool = _memPool;
 | 
			
		||||
      // template calculation functions have mempool side effects, so calculate audits using
 | 
			
		||||
@ -665,7 +665,7 @@ class WebsocketHandler {
 | 
			
		||||
        projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Common.indexingEnabled() && memPool.isInSync()) {
 | 
			
		||||
      if (Common.indexingEnabled()) {
 | 
			
		||||
        const { censored, added, fresh, sigop, fullrbf, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool);
 | 
			
		||||
        const matchRate = Math.round(score * 100 * 100) / 100;
 | 
			
		||||
 | 
			
		||||
@ -858,7 +858,7 @@ class WebsocketHandler {
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-block'] >= 0) {
 | 
			
		||||
      if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
 | 
			
		||||
        const index = client['track-mempool-block'];
 | 
			
		||||
        if (mBlockDeltas && mBlockDeltas[index]) {
 | 
			
		||||
          response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
 | 
			
		||||
 | 
			
		||||
@ -117,7 +117,14 @@ export class MempoolBlocksComponent implements OnInit, OnChanges, OnDestroy {
 | 
			
		||||
    });
 | 
			
		||||
    this.reduceMempoolBlocksToFitScreen(this.mempoolBlocks);
 | 
			
		||||
    this.stateService.isTabHidden$.subscribe((tabHidden) => this.tabHidden = tabHidden);
 | 
			
		||||
    this.loadingBlocks$ = this.stateService.isLoadingWebSocket$;
 | 
			
		||||
    this.loadingBlocks$ = combineLatest([
 | 
			
		||||
      this.stateService.isLoadingWebSocket$,
 | 
			
		||||
      this.stateService.isLoadingMempool$
 | 
			
		||||
    ]).pipe(
 | 
			
		||||
      switchMap(([loadingBlocks, loadingMempool]) => {
 | 
			
		||||
        return of(loadingBlocks || loadingMempool);
 | 
			
		||||
      })
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    this.mempoolBlocks$ = merge(
 | 
			
		||||
      of(true),
 | 
			
		||||
 | 
			
		||||
@ -113,6 +113,7 @@ export class StateService {
 | 
			
		||||
  mempoolTxPosition$ = new Subject<{ txid: string, position: MempoolPosition}>();
 | 
			
		||||
  blockTransactions$ = new Subject<Transaction>();
 | 
			
		||||
  isLoadingWebSocket$ = new ReplaySubject<boolean>(1);
 | 
			
		||||
  isLoadingMempool$ = new BehaviorSubject<boolean>(true);
 | 
			
		||||
  vbytesPerSecond$ = new ReplaySubject<number>(1);
 | 
			
		||||
  previousRetarget$ = new ReplaySubject<number>(1);
 | 
			
		||||
  backendInfo$ = new ReplaySubject<IBackendInfo>(1);
 | 
			
		||||
 | 
			
		||||
@ -368,6 +368,11 @@ export class WebsocketService {
 | 
			
		||||
 | 
			
		||||
    if (response.loadingIndicators) {
 | 
			
		||||
      this.stateService.loadingIndicators$.next(response.loadingIndicators);
 | 
			
		||||
      if (response.loadingIndicators.mempool != null && response.loadingIndicators.mempool < 100) {
 | 
			
		||||
        this.stateService.isLoadingMempool$.next(true);
 | 
			
		||||
      } else {
 | 
			
		||||
        this.stateService.isLoadingMempool$.next(false);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (response.mempoolInfo) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user