Compare commits
	
		
			2 Commits
		
	
	
		
			master
			...
			mononaut/f
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					6bb9ffd21a | ||
| 
						 | 
					9a8e5b7896 | 
@ -1,7 +1,7 @@
 | 
			
		||||
import { IEsploraApi } from './esplora-api.interface';
 | 
			
		||||
 | 
			
		||||
export interface AbstractBitcoinApi {
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
 | 
			
		||||
  $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}>;
 | 
			
		||||
  $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
 | 
			
		||||
  $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>;
 | 
			
		||||
  $getAllMempoolTransactions(lastTxid: string);
 | 
			
		||||
@ -25,6 +25,7 @@ export interface AbstractBitcoinApi {
 | 
			
		||||
  $getBatchedOutspends(txId: string[]): Promise<IEsploraApi.Outspend[][]>;
 | 
			
		||||
 | 
			
		||||
  startHealthChecks(): void;
 | 
			
		||||
  isFailedOver(): boolean;
 | 
			
		||||
}
 | 
			
		||||
export interface BitcoinRpcCredentials {
 | 
			
		||||
  host: string;
 | 
			
		||||
 | 
			
		||||
@ -137,8 +137,12 @@ class BitcoinApi implements AbstractBitcoinApi {
 | 
			
		||||
    throw new Error('Method getScriptHashTransactions not supported by the Bitcoin RPC API.');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
 | 
			
		||||
    return this.bitcoindClient.getRawMemPool();
 | 
			
		||||
  async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
 | 
			
		||||
    const txids = await this.bitcoindClient.getRawMemPool();
 | 
			
		||||
    return {
 | 
			
		||||
      txids,
 | 
			
		||||
      local: true,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getAddressPrefix(prefix: string): string[] {
 | 
			
		||||
@ -356,6 +360,9 @@ class BitcoinApi implements AbstractBitcoinApi {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public startHealthChecks(): void {};
 | 
			
		||||
  public isFailedOver(): boolean {
 | 
			
		||||
    return false;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default BitcoinApi;
 | 
			
		||||
 | 
			
		||||
@ -638,8 +638,8 @@ class BitcoinRoutes {
 | 
			
		||||
 | 
			
		||||
  private async getMempoolTxIds(req: Request, res: Response) {
 | 
			
		||||
    try {
 | 
			
		||||
      const rawMempool = await bitcoinApi.$getRawMempool();
 | 
			
		||||
      res.send(rawMempool);
 | 
			
		||||
      const { txids } = await bitcoinApi.$getRawMempool();
 | 
			
		||||
      res.send(txids);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      res.status(500).send(e instanceof Error ? e.message : e);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,7 @@ import http from 'http';
 | 
			
		||||
import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory';
 | 
			
		||||
import { IEsploraApi } from './esplora-api.interface';
 | 
			
		||||
import logger from '../../logger';
 | 
			
		||||
import mempool from '../mempool';
 | 
			
		||||
 | 
			
		||||
interface FailoverHost {
 | 
			
		||||
  host: string,
 | 
			
		||||
@ -17,6 +18,8 @@ interface FailoverHost {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class FailoverRouter {
 | 
			
		||||
  isFailedOver: boolean = false;
 | 
			
		||||
  preferredHost: FailoverHost;
 | 
			
		||||
  activeHost: FailoverHost;
 | 
			
		||||
  fallbackHost: FailoverHost;
 | 
			
		||||
  hosts: FailoverHost[];
 | 
			
		||||
@ -46,6 +49,7 @@ class FailoverRouter {
 | 
			
		||||
      socket: !!config.ESPLORA.UNIX_SOCKET_PATH,
 | 
			
		||||
      preferred: true,
 | 
			
		||||
    };
 | 
			
		||||
    this.preferredHost = this.activeHost;
 | 
			
		||||
    this.fallbackHost = this.activeHost;
 | 
			
		||||
    this.hosts.unshift(this.activeHost);
 | 
			
		||||
    this.multihost = this.hosts.length > 1;
 | 
			
		||||
@ -151,6 +155,7 @@ class FailoverRouter {
 | 
			
		||||
    this.sortHosts();
 | 
			
		||||
    this.activeHost = this.hosts[0];
 | 
			
		||||
    logger.warn(`Switching esplora host to ${this.activeHost.host}`);
 | 
			
		||||
    this.isFailedOver = this.activeHost !== this.preferredHost;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private addFailure(host: FailoverHost): FailoverHost {
 | 
			
		||||
@ -164,7 +169,7 @@ class FailoverRouter {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true): Promise<T> {
 | 
			
		||||
  private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true, withSource = false): Promise<T | { data: T, host: FailoverHost }> {
 | 
			
		||||
    let axiosConfig;
 | 
			
		||||
    let url;
 | 
			
		||||
    if (host.socket) {
 | 
			
		||||
@ -177,8 +182,17 @@ class FailoverRouter {
 | 
			
		||||
    return (method === 'post'
 | 
			
		||||
        ? this.requestConnection.post<T>(url, data, axiosConfig)
 | 
			
		||||
        : this.requestConnection.get<T>(url, axiosConfig)
 | 
			
		||||
    ).then((response) => { host.failures = Math.max(0, host.failures - 1); return response.data; })
 | 
			
		||||
      .catch((e) => {
 | 
			
		||||
    ).then((response) => {
 | 
			
		||||
      host.failures = Math.max(0, host.failures - 1);
 | 
			
		||||
      if (withSource) {
 | 
			
		||||
        return {
 | 
			
		||||
          data: response.data,
 | 
			
		||||
          host,
 | 
			
		||||
        };
 | 
			
		||||
      } else {
 | 
			
		||||
        return response.data;
 | 
			
		||||
      }
 | 
			
		||||
    }).catch((e) => {
 | 
			
		||||
        let fallbackHost = this.fallbackHost;
 | 
			
		||||
        if (e?.response?.status !== 404) {
 | 
			
		||||
          logger.warn(`esplora request failed ${e?.response?.status || 500} ${host.host}${path}`);
 | 
			
		||||
@ -186,7 +200,7 @@ class FailoverRouter {
 | 
			
		||||
        }
 | 
			
		||||
        if (retry && e?.code === 'ECONNREFUSED' && this.multihost) {
 | 
			
		||||
          // Retry immediately
 | 
			
		||||
          return this.$query(method, path, data, responseType, fallbackHost, false);
 | 
			
		||||
          return this.$query(method, path, data, responseType, fallbackHost, false, withSource);
 | 
			
		||||
        } else {
 | 
			
		||||
          throw e;
 | 
			
		||||
        }
 | 
			
		||||
@ -194,19 +208,27 @@ class FailoverRouter {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $get<T>(path, responseType = 'json'): Promise<T> {
 | 
			
		||||
    return this.$query<T>('get', path, null, responseType);
 | 
			
		||||
    return this.$query<T>('get', path, null, responseType, this.activeHost, true) as Promise<T>;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $post<T>(path, data: any, responseType = 'json'): Promise<T> {
 | 
			
		||||
    return this.$query<T>('post', path, data, responseType);
 | 
			
		||||
    return this.$query<T>('post', path, data, responseType) as Promise<T>;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $getWithSource<T>(path, responseType = 'json'): Promise<{ data: T, host: FailoverHost }> {
 | 
			
		||||
    return this.$query<T>('get', path, null, responseType, this.activeHost, true, true) as Promise<{ data: T, host: FailoverHost }>;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class ElectrsApi implements AbstractBitcoinApi {
 | 
			
		||||
  private failoverRouter = new FailoverRouter();
 | 
			
		||||
 | 
			
		||||
  $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
 | 
			
		||||
    return this.failoverRouter.$get<IEsploraApi.Transaction['txid'][]>('/mempool/txids');
 | 
			
		||||
  async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
 | 
			
		||||
    const result = await this.failoverRouter.$getWithSource<IEsploraApi.Transaction['txid'][]>('/mempool/txids', 'json');
 | 
			
		||||
    return {
 | 
			
		||||
      txids: result.data,
 | 
			
		||||
      local: result.host === this.failoverRouter.preferredHost,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $getRawTransaction(txId: string): Promise<IEsploraApi.Transaction> {
 | 
			
		||||
@ -302,6 +324,10 @@ class ElectrsApi implements AbstractBitcoinApi {
 | 
			
		||||
  public startHealthChecks(): void {
 | 
			
		||||
    this.failoverRouter.startHealthChecks();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public isFailedOver(): boolean {
 | 
			
		||||
    return this.failoverRouter.isFailedOver;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default ElectrsApi;
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,9 @@ class Mempool {
 | 
			
		||||
 | 
			
		||||
  private accelerations: { [txId: string]: Acceleration } = {};
 | 
			
		||||
 | 
			
		||||
  private failoverTimes: number[] = [];
 | 
			
		||||
  private statisticsPaused: boolean = false;
 | 
			
		||||
 | 
			
		||||
  private txPerSecondArray: number[] = [];
 | 
			
		||||
  private txPerSecond: number = 0;
 | 
			
		||||
 | 
			
		||||
@ -164,6 +167,15 @@ class Mempool {
 | 
			
		||||
    return this.mempoolInfo;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public getStatisticsIsPaused(): boolean {
 | 
			
		||||
    return this.statisticsPaused;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public logFailover(): void {
 | 
			
		||||
    this.failoverTimes.push(Date.now());
 | 
			
		||||
    this.statisticsPaused = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public getTxPerSecond(): number {
 | 
			
		||||
    return this.txPerSecond;
 | 
			
		||||
  }
 | 
			
		||||
@ -242,6 +254,10 @@ class Mempool {
 | 
			
		||||
        logger.debug(`fetched ${txs.length} transactions`);
 | 
			
		||||
        this.updateTimerProgress(timer, 'fetched new transactions');
 | 
			
		||||
 | 
			
		||||
        if (bitcoinApi.isFailedOver()) {
 | 
			
		||||
          this.failoverTimes.push(Date.now());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (const transaction of txs) {
 | 
			
		||||
          this.mempoolCache[transaction.txid] = transaction;
 | 
			
		||||
          if (this.inSync) {
 | 
			
		||||
@ -259,6 +275,10 @@ class Mempool {
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (bitcoinApi.isFailedOver()) {
 | 
			
		||||
          this.failoverTimes.push(Date.now());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (txs.length < slice.length) {
 | 
			
		||||
          const missing = slice.length - txs.length;
 | 
			
		||||
          if (config.MEMPOOL.BACKEND === 'esplora') {
 | 
			
		||||
@ -491,6 +511,10 @@ class Mempool {
 | 
			
		||||
 | 
			
		||||
  private updateTxPerSecond() {
 | 
			
		||||
    const nowMinusTimeSpan = new Date().getTime() - (1000 * config.STATISTICS.TX_PER_SECOND_SAMPLE_PERIOD);
 | 
			
		||||
 | 
			
		||||
    this.failoverTimes = this.failoverTimes.filter((unixTime) => unixTime > nowMinusTimeSpan);
 | 
			
		||||
    this.statisticsPaused = this.failoverTimes.length > 0;
 | 
			
		||||
 | 
			
		||||
    this.txPerSecondArray = this.txPerSecondArray.filter((unixTime) => unixTime > nowMinusTimeSpan);
 | 
			
		||||
    this.txPerSecond = this.txPerSecondArray.length / config.STATISTICS.TX_PER_SECOND_SAMPLE_PERIOD || 0;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -29,9 +29,10 @@ class Statistics {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async runStatistics(): Promise<void> {
 | 
			
		||||
    if (!memPool.isInSync()) {
 | 
			
		||||
    if (!memPool.isInSync() || memPool.getStatisticsIsPaused()) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const currentMempool = memPool.getMempool();
 | 
			
		||||
    const txPerSecond = memPool.getTxPerSecond();
 | 
			
		||||
    const vBytesPerSecond = memPool.getVBytesPerSecond();
 | 
			
		||||
 | 
			
		||||
@ -71,9 +71,8 @@ class WebsocketHandler {
 | 
			
		||||
  private updateSocketData(): void {
 | 
			
		||||
    const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
 | 
			
		||||
    const da = difficultyAdjustment.getDifficultyAdjustment();
 | 
			
		||||
    this.updateSocketDataFields({
 | 
			
		||||
    const socketData = {
 | 
			
		||||
      'mempoolInfo': memPool.getMempoolInfo(),
 | 
			
		||||
      'vBytesPerSecond': memPool.getVBytesPerSecond(),
 | 
			
		||||
      'blocks': _blocks,
 | 
			
		||||
      'conversions': priceUpdater.getLatestPrices(),
 | 
			
		||||
      'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
 | 
			
		||||
@ -82,7 +81,11 @@ class WebsocketHandler {
 | 
			
		||||
      'loadingIndicators': loadingIndicators.getLoadingIndicators(),
 | 
			
		||||
      'da': da?.previousTime ? da : undefined,
 | 
			
		||||
      'fees': feeApi.getRecommendedFee(),
 | 
			
		||||
    });
 | 
			
		||||
    };
 | 
			
		||||
    if (!memPool.getStatisticsIsPaused()) {
 | 
			
		||||
      socketData['vBytesPerSecond'] = memPool.getVBytesPerSecond();
 | 
			
		||||
    }
 | 
			
		||||
    this.updateSocketDataFields(socketData);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public getSerializedInitData(): string {
 | 
			
		||||
@ -414,7 +417,7 @@ class WebsocketHandler {
 | 
			
		||||
    const mBlocks = mempoolBlocks.getMempoolBlocks();
 | 
			
		||||
    const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
 | 
			
		||||
    const mempoolInfo = memPool.getMempoolInfo();
 | 
			
		||||
    const vBytesPerSecond = memPool.getVBytesPerSecond();
 | 
			
		||||
    const vBytesPerSecond = memPool.getStatisticsIsPaused() ? null : memPool.getVBytesPerSecond();
 | 
			
		||||
    const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions);
 | 
			
		||||
    const da = difficultyAdjustment.getDifficultyAdjustment();
 | 
			
		||||
    memPool.handleRbfTransactions(rbfTransactions);
 | 
			
		||||
@ -440,13 +443,15 @@ class WebsocketHandler {
 | 
			
		||||
    // update init data
 | 
			
		||||
    const socketDataFields = {
 | 
			
		||||
      'mempoolInfo': mempoolInfo,
 | 
			
		||||
      'vBytesPerSecond': vBytesPerSecond,
 | 
			
		||||
      'mempool-blocks': mBlocks,
 | 
			
		||||
      'transactions': latestTransactions,
 | 
			
		||||
      'loadingIndicators': loadingIndicators.getLoadingIndicators(),
 | 
			
		||||
      'da': da?.previousTime ? da : undefined,
 | 
			
		||||
      'fees': recommendedFees,
 | 
			
		||||
    };
 | 
			
		||||
    if (vBytesPerSecond != null) {
 | 
			
		||||
      socketDataFields['vBytesPerSecond'] = vBytesPerSecond;
 | 
			
		||||
    }
 | 
			
		||||
    if (rbfSummary) {
 | 
			
		||||
      socketDataFields['rbfSummary'] = rbfSummary;
 | 
			
		||||
    }
 | 
			
		||||
@ -496,7 +501,9 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
      if (client['want-stats']) {
 | 
			
		||||
        response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
 | 
			
		||||
        response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
 | 
			
		||||
        if (vBytesPerSecond != null) {
 | 
			
		||||
          response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
 | 
			
		||||
        }
 | 
			
		||||
        response['transactions'] = getCachedResponse('transactions', latestTransactions);
 | 
			
		||||
        if (da?.previousTime) {
 | 
			
		||||
          response['da'] = getCachedResponse('da', da);
 | 
			
		||||
@ -784,7 +791,9 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
      if (client['want-stats']) {
 | 
			
		||||
        response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
 | 
			
		||||
        response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
 | 
			
		||||
        if (!memPool.getStatisticsIsPaused()) {
 | 
			
		||||
          response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
 | 
			
		||||
        }
 | 
			
		||||
        response['fees'] = getCachedResponse('fees', fees);
 | 
			
		||||
 | 
			
		||||
        if (da?.previousTime) {
 | 
			
		||||
 | 
			
		||||
@ -191,10 +191,14 @@ class Server {
 | 
			
		||||
          logger.debug(msg);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      const newMempool = await bitcoinApi.$getRawMempool();
 | 
			
		||||
 | 
			
		||||
      const { txids: newMempool, local: fromLocalNode } = await bitcoinApi.$getRawMempool();
 | 
			
		||||
      const numHandledBlocks = await blocks.$updateBlocks();
 | 
			
		||||
      const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1);
 | 
			
		||||
      if (numHandledBlocks === 0) {
 | 
			
		||||
        if (!fromLocalNode) {
 | 
			
		||||
          memPool.logFailover();
 | 
			
		||||
        }
 | 
			
		||||
        await memPool.$updateMempool(newMempool, pollRate);
 | 
			
		||||
      }
 | 
			
		||||
      indexer.$run();
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user