Merge pull request #3736 from mempool/mononaut/optimize-websocket-updates
Optimize websocket updates
This commit is contained in:
		
						commit
						785d1b35b2
					
				@ -130,8 +130,9 @@ class BitcoinRoutes {
 | 
			
		||||
 | 
			
		||||
  private getInitData(req: Request, res: Response) {
 | 
			
		||||
    try {
 | 
			
		||||
      const result = websocketHandler.getInitData();
 | 
			
		||||
      res.json(result);
 | 
			
		||||
      const result = websocketHandler.getSerializedInitData();
 | 
			
		||||
      res.set('Content-Type', 'application/json');
 | 
			
		||||
      res.send(result);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      res.status(500).send(e instanceof Error ? e.message : e);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -30,6 +30,9 @@ class WebsocketHandler {
 | 
			
		||||
  private numConnected = 0;
 | 
			
		||||
  private numDisconnected = 0;
 | 
			
		||||
 | 
			
		||||
  private initData: { [key: string]: string } = {};
 | 
			
		||||
  private serializedInitData: string = '{}';
 | 
			
		||||
 | 
			
		||||
  constructor() { }
 | 
			
		||||
 | 
			
		||||
  setWebsocketServer(wss: WebSocket.Server) {
 | 
			
		||||
@ -38,6 +41,41 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
  setExtraInitProperties(property: string, value: any) {
 | 
			
		||||
    this.extraInitProperties[property] = value;
 | 
			
		||||
    this.setInitDataFields(this.extraInitProperties);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private setInitDataFields(data: { [property: string]: any }): void {
 | 
			
		||||
    for (const property of Object.keys(data)) {
 | 
			
		||||
      if (data[property] != null) {
 | 
			
		||||
        this.initData[property] = JSON.stringify(data[property]);
 | 
			
		||||
      } else {
 | 
			
		||||
        delete this.initData[property];
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    this.serializedInitData = '{'
 | 
			
		||||
      + Object.keys(this.initData).map(key => `"${key}": ${this.initData[key]}`).join(', ')
 | 
			
		||||
      + '}';
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateInitData(): void {
 | 
			
		||||
    const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
 | 
			
		||||
    const da = difficultyAdjustment.getDifficultyAdjustment();
 | 
			
		||||
    this.setInitDataFields({
 | 
			
		||||
      'mempoolInfo': memPool.getMempoolInfo(),
 | 
			
		||||
      'vBytesPerSecond': memPool.getVBytesPerSecond(),
 | 
			
		||||
      'blocks': _blocks,
 | 
			
		||||
      'conversions': priceUpdater.getLatestPrices(),
 | 
			
		||||
      'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
 | 
			
		||||
      'transactions': memPool.getLatestTransactions(),
 | 
			
		||||
      'backendInfo': backendInfo.getBackendInfo(),
 | 
			
		||||
      'loadingIndicators': loadingIndicators.getLoadingIndicators(),
 | 
			
		||||
      'da': da?.previousTime ? da : undefined,
 | 
			
		||||
      'fees': feeApi.getRecommendedFee(),
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public getSerializedInitData(): string {
 | 
			
		||||
    return this.serializedInitData;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  setupConnectionHandling() {
 | 
			
		||||
@ -157,11 +195,13 @@ class WebsocketHandler {
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          if (parsedMessage.action === 'init') {
 | 
			
		||||
            const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
 | 
			
		||||
            if (!_blocks) {
 | 
			
		||||
            if (!this.initData['blocks']?.length || !this.initData['da']) {
 | 
			
		||||
              this.updateInitData();
 | 
			
		||||
            }
 | 
			
		||||
            if (!this.initData['blocks']?.length) {
 | 
			
		||||
              return;
 | 
			
		||||
            }
 | 
			
		||||
            client.send(JSON.stringify(this.getInitData(_blocks)));
 | 
			
		||||
            client.send(this.serializedInitData);
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          if (parsedMessage.action === 'ping') {
 | 
			
		||||
@ -210,11 +250,14 @@ class WebsocketHandler {
 | 
			
		||||
      throw new Error('WebSocket.Server is not set');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.setInitDataFields({ 'loadingIndicators': indicators });
 | 
			
		||||
 | 
			
		||||
    const response = JSON.stringify({ loadingIndicators: indicators });
 | 
			
		||||
    this.wss.clients.forEach((client) => {
 | 
			
		||||
      if (client.readyState !== WebSocket.OPEN) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      client.send(JSON.stringify({ loadingIndicators: indicators }));
 | 
			
		||||
      client.send(response);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -223,34 +266,17 @@ class WebsocketHandler {
 | 
			
		||||
      throw new Error('WebSocket.Server is not set');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.setInitDataFields({ 'conversions': conversionRates });
 | 
			
		||||
 | 
			
		||||
    const response = JSON.stringify({ conversions: conversionRates });
 | 
			
		||||
    this.wss.clients.forEach((client) => {
 | 
			
		||||
      if (client.readyState !== WebSocket.OPEN) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      client.send(JSON.stringify({ conversions: conversionRates }));
 | 
			
		||||
      client.send(response);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  getInitData(_blocks?: BlockExtended[]) {
 | 
			
		||||
    if (!_blocks) {
 | 
			
		||||
      _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
 | 
			
		||||
    }
 | 
			
		||||
    const da = difficultyAdjustment.getDifficultyAdjustment();
 | 
			
		||||
    return {
 | 
			
		||||
      'mempoolInfo': memPool.getMempoolInfo(),
 | 
			
		||||
      'vBytesPerSecond': memPool.getVBytesPerSecond(),
 | 
			
		||||
      'blocks': _blocks,
 | 
			
		||||
      'conversions': priceUpdater.getLatestPrices(),
 | 
			
		||||
      'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
 | 
			
		||||
      'transactions': memPool.getLatestTransactions(),
 | 
			
		||||
      'backendInfo': backendInfo.getBackendInfo(),
 | 
			
		||||
      'loadingIndicators': loadingIndicators.getLoadingIndicators(),
 | 
			
		||||
      'da': da?.previousTime ? da : undefined,
 | 
			
		||||
      'fees': feeApi.getRecommendedFee(),
 | 
			
		||||
      ...this.extraInitProperties
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  handleNewStatistic(stats: OptimizedStatistic) {
 | 
			
		||||
    if (!this.wss) {
 | 
			
		||||
      throw new Error('WebSocket.Server is not set');
 | 
			
		||||
@ -258,6 +284,10 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
    this.printLogs();
 | 
			
		||||
 | 
			
		||||
    const response = JSON.stringify({
 | 
			
		||||
      'live-2h-chart': stats
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    this.wss.clients.forEach((client) => {
 | 
			
		||||
      if (client.readyState !== WebSocket.OPEN) {
 | 
			
		||||
        return;
 | 
			
		||||
@ -267,9 +297,7 @@ class WebsocketHandler {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      client.send(JSON.stringify({
 | 
			
		||||
        'live-2h-chart': stats
 | 
			
		||||
      }));
 | 
			
		||||
      client.send(response);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -306,6 +334,43 @@ class WebsocketHandler {
 | 
			
		||||
    }
 | 
			
		||||
    const recommendedFees = feeApi.getRecommendedFee();
 | 
			
		||||
 | 
			
		||||
    // update init data
 | 
			
		||||
    this.updateInitData();
 | 
			
		||||
 | 
			
		||||
    // cache serialized objects to avoid stringify-ing the same thing for every client
 | 
			
		||||
    const responseCache = { ...this.initData };
 | 
			
		||||
    function getCachedResponse(key: string,  data): string {
 | 
			
		||||
      if (!responseCache[key]) {
 | 
			
		||||
        responseCache[key] = JSON.stringify(data);
 | 
			
		||||
      }
 | 
			
		||||
      return responseCache[key];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // pre-compute new tracked outspends
 | 
			
		||||
    const outspendCache: { [txid: string]: { [vout: number]: { vin: number, txid: string } } } = {};
 | 
			
		||||
    const trackedTxs = new Set<string>();
 | 
			
		||||
    this.wss.clients.forEach((client) => {
 | 
			
		||||
      if (client['track-tx']) {
 | 
			
		||||
        trackedTxs.add(client['track-tx']);
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
    if (trackedTxs.size > 0) {
 | 
			
		||||
      for (const tx of newTransactions) {
 | 
			
		||||
        for (let i = 0; i < tx.vin.length; i++) {
 | 
			
		||||
          const vin = tx.vin[i];
 | 
			
		||||
          if (trackedTxs.has(vin.txid)) {
 | 
			
		||||
            if (!outspendCache[vin.txid]) {
 | 
			
		||||
              outspendCache[vin.txid] = { [vin.vout]: { vin: i, txid: tx.txid }};
 | 
			
		||||
            } else {
 | 
			
		||||
              outspendCache[vin.txid][vin.vout] = { vin: i, txid: tx.txid };
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
 | 
			
		||||
 | 
			
		||||
    this.wss.clients.forEach(async (client) => {
 | 
			
		||||
      if (client.readyState !== WebSocket.OPEN) {
 | 
			
		||||
        return;
 | 
			
		||||
@ -314,17 +379,17 @@ class WebsocketHandler {
 | 
			
		||||
      const response = {};
 | 
			
		||||
 | 
			
		||||
      if (client['want-stats']) {
 | 
			
		||||
        response['mempoolInfo'] = mempoolInfo;
 | 
			
		||||
        response['vBytesPerSecond'] = vBytesPerSecond;
 | 
			
		||||
        response['transactions'] = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
 | 
			
		||||
        response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
 | 
			
		||||
        response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
 | 
			
		||||
        response['transactions'] = getCachedResponse('transactions', latestTransactions);
 | 
			
		||||
        if (da?.previousTime) {
 | 
			
		||||
          response['da'] = da;
 | 
			
		||||
          response['da'] = getCachedResponse('da', da);
 | 
			
		||||
        }
 | 
			
		||||
        response['fees'] = recommendedFees;
 | 
			
		||||
        response['fees'] = getCachedResponse('fees', recommendedFees);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['want-mempool-blocks']) {
 | 
			
		||||
        response['mempool-blocks'] = mBlocks;
 | 
			
		||||
        response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-tx']) {
 | 
			
		||||
@ -333,12 +398,12 @@ class WebsocketHandler {
 | 
			
		||||
          if (config.MEMPOOL.BACKEND !== 'esplora') {
 | 
			
		||||
            try {
 | 
			
		||||
              const fullTx = await transactionUtils.$getTransactionExtended(tx.txid, true);
 | 
			
		||||
              response['tx'] = fullTx;
 | 
			
		||||
              response['tx'] = JSON.stringify(fullTx);
 | 
			
		||||
            } catch (e) {
 | 
			
		||||
              logger.debug('Error finding transaction in mempool: ' + (e instanceof Error ? e.message : e));
 | 
			
		||||
            }
 | 
			
		||||
          } else {
 | 
			
		||||
            response['tx'] = tx;
 | 
			
		||||
            response['tx'] = JSON.stringify(tx);
 | 
			
		||||
          }
 | 
			
		||||
          client['track-mempool-tx'] = null;
 | 
			
		||||
        }
 | 
			
		||||
@ -378,7 +443,7 @@ class WebsocketHandler {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (foundTransactions.length) {
 | 
			
		||||
          response['address-transactions'] = foundTransactions;
 | 
			
		||||
          response['address-transactions'] = JSON.stringify(foundTransactions);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -407,65 +472,60 @@ class WebsocketHandler {
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        if (foundTransactions.length) {
 | 
			
		||||
          response['address-transactions'] = foundTransactions;
 | 
			
		||||
          response['address-transactions'] = JSON.stringify(foundTransactions);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-tx']) {
 | 
			
		||||
        const trackTxid = client['track-tx'];
 | 
			
		||||
        const outspends: object = {};
 | 
			
		||||
        newTransactions.forEach((tx) => tx.vin.forEach((vin, i) => {
 | 
			
		||||
          if (vin.txid === trackTxid) {
 | 
			
		||||
            outspends[vin.vout] = {
 | 
			
		||||
              vin: i,
 | 
			
		||||
              txid: tx.txid,
 | 
			
		||||
            };
 | 
			
		||||
          }
 | 
			
		||||
        }));
 | 
			
		||||
        const outspends = outspendCache[trackTxid];
 | 
			
		||||
 | 
			
		||||
        if (Object.keys(outspends).length) {
 | 
			
		||||
          response['utxoSpent'] = outspends;
 | 
			
		||||
        if (outspends && Object.keys(outspends).length) {
 | 
			
		||||
          response['utxoSpent'] = JSON.stringify(outspends);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        const rbfReplacedBy = rbfCache.getReplacedBy(client['track-tx']);
 | 
			
		||||
        if (rbfReplacedBy) {
 | 
			
		||||
          response['rbfTransaction'] = {
 | 
			
		||||
          response['rbfTransaction'] = JSON.stringify({
 | 
			
		||||
            txid: rbfReplacedBy,
 | 
			
		||||
          }
 | 
			
		||||
          })
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        const rbfChange = rbfChanges.map[client['track-tx']];
 | 
			
		||||
        if (rbfChange) {
 | 
			
		||||
          response['rbfInfo'] = rbfChanges.trees[rbfChange];
 | 
			
		||||
          response['rbfInfo'] = JSON.stringify(rbfChanges.trees[rbfChange]);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        const mempoolTx = newMempool[trackTxid];
 | 
			
		||||
        if (mempoolTx && mempoolTx.position) {
 | 
			
		||||
          response['txPosition'] = {
 | 
			
		||||
          response['txPosition'] = JSON.stringify({
 | 
			
		||||
            txid: trackTxid,
 | 
			
		||||
            position: mempoolTx.position,
 | 
			
		||||
          };
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-block'] >= 0) {
 | 
			
		||||
        const index = client['track-mempool-block'];
 | 
			
		||||
        if (mBlockDeltas[index]) {
 | 
			
		||||
          response['projected-block-transactions'] = {
 | 
			
		||||
          response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
 | 
			
		||||
            index: index,
 | 
			
		||||
            delta: mBlockDeltas[index],
 | 
			
		||||
          };
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-rbf'] === 'all' && rbfReplacements) {
 | 
			
		||||
        response['rbfLatest'] = rbfReplacements;
 | 
			
		||||
        response['rbfLatest'] = getCachedResponse('rbfLatest', rbfReplacements);
 | 
			
		||||
      } else if (client['track-rbf'] === 'fullRbf' && fullRbfReplacements) {
 | 
			
		||||
        response['rbfLatest'] = fullRbfReplacements;
 | 
			
		||||
        response['rbfLatest'] = getCachedResponse('fullrbfLatest', fullRbfReplacements);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Object.keys(response).length) {
 | 
			
		||||
        client.send(JSON.stringify(response));
 | 
			
		||||
        const serializedResponse = '{'
 | 
			
		||||
          + Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
 | 
			
		||||
          + '}';
 | 
			
		||||
        client.send(serializedResponse);
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
@ -556,6 +616,19 @@ class WebsocketHandler {
 | 
			
		||||
    const da = difficultyAdjustment.getDifficultyAdjustment();
 | 
			
		||||
    const fees = feeApi.getRecommendedFee();
 | 
			
		||||
 | 
			
		||||
    // update init data
 | 
			
		||||
    this.updateInitData();
 | 
			
		||||
 | 
			
		||||
    const responseCache = { ...this.initData };
 | 
			
		||||
    function getCachedResponse(key, data): string {
 | 
			
		||||
      if (!responseCache[key]) {
 | 
			
		||||
        responseCache[key] = JSON.stringify(data);
 | 
			
		||||
      }
 | 
			
		||||
      return responseCache[key];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const mempoolInfo = memPool.getMempoolInfo();
 | 
			
		||||
 | 
			
		||||
    this.wss.clients.forEach((client) => {
 | 
			
		||||
      if (client.readyState !== WebSocket.OPEN) {
 | 
			
		||||
        return;
 | 
			
		||||
@ -565,28 +638,27 @@ class WebsocketHandler {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const response = {
 | 
			
		||||
        'block': block,
 | 
			
		||||
        'mempoolInfo': memPool.getMempoolInfo(),
 | 
			
		||||
        'da': da?.previousTime ? da : undefined,
 | 
			
		||||
        'fees': fees,
 | 
			
		||||
      };
 | 
			
		||||
      const response = {};
 | 
			
		||||
      response['block'] = getCachedResponse('block', block);
 | 
			
		||||
      response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
 | 
			
		||||
      response['da'] = getCachedResponse('da', da?.previousTime ? da : undefined);
 | 
			
		||||
      response['fees'] = getCachedResponse('fees', fees);
 | 
			
		||||
 | 
			
		||||
      if (mBlocks && client['want-mempool-blocks']) {
 | 
			
		||||
        response['mempool-blocks'] = mBlocks;
 | 
			
		||||
        response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-tx']) {
 | 
			
		||||
        const trackTxid = client['track-tx'];
 | 
			
		||||
        if (txIds.indexOf(trackTxid) > -1) {
 | 
			
		||||
          response['txConfirmed'] = true;
 | 
			
		||||
          response['txConfirmed'] = 'true';
 | 
			
		||||
        } else {
 | 
			
		||||
          const mempoolTx = _memPool[trackTxid];
 | 
			
		||||
          if (mempoolTx && mempoolTx.position) {
 | 
			
		||||
            response['txPosition'] = {
 | 
			
		||||
            response['txPosition'] = JSON.stringify({
 | 
			
		||||
              txid: trackTxid,
 | 
			
		||||
              position: mempoolTx.position,
 | 
			
		||||
            };
 | 
			
		||||
            });
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
@ -614,7 +686,7 @@ class WebsocketHandler {
 | 
			
		||||
            };
 | 
			
		||||
          });
 | 
			
		||||
 | 
			
		||||
          response['block-transactions'] = foundTransactions;
 | 
			
		||||
          response['block-transactions'] = JSON.stringify(foundTransactions);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -651,21 +723,24 @@ class WebsocketHandler {
 | 
			
		||||
            };
 | 
			
		||||
          });
 | 
			
		||||
 | 
			
		||||
          response['block-transactions'] = foundTransactions;
 | 
			
		||||
          response['block-transactions'] = JSON.stringify(foundTransactions);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-block'] >= 0) {
 | 
			
		||||
        const index = client['track-mempool-block'];
 | 
			
		||||
        if (mBlockDeltas && mBlockDeltas[index]) {
 | 
			
		||||
          response['projected-block-transactions'] = {
 | 
			
		||||
          response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
 | 
			
		||||
            index: index,
 | 
			
		||||
            delta: mBlockDeltas[index],
 | 
			
		||||
          };
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      client.send(JSON.stringify(response));
 | 
			
		||||
      const serializedResponse = '{'
 | 
			
		||||
        + Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
 | 
			
		||||
        + '}';
 | 
			
		||||
      client.send(serializedResponse);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user