Optimize websocket updates on new block
This commit is contained in:
		
							parent
							
								
									fb61fddcc4
								
							
						
					
					
						commit
						23ececca95
					
				@ -282,8 +282,7 @@ class WebsocketHandler {
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          if (Object.keys(response).length) {
 | 
			
		||||
            const serializedResponse = this.serializeResponse(response);
 | 
			
		||||
            client.send(serializedResponse);
 | 
			
		||||
            client.send(this.serializeResponse(response));
 | 
			
		||||
          }
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
          logger.debug(`Error parsing websocket message from ${client['remoteAddress']}: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
@ -392,8 +391,7 @@ class WebsocketHandler {
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Object.keys(response).length) {
 | 
			
		||||
        const serializedResponse = this.serializeResponse(response);
 | 
			
		||||
        client.send(serializedResponse);
 | 
			
		||||
        client.send(this.serializeResponse(response));
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
@ -639,8 +637,7 @@ class WebsocketHandler {
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Object.keys(response).length) {
 | 
			
		||||
        const serializedResponse = this.serializeResponse(response);
 | 
			
		||||
        client.send(serializedResponse);
 | 
			
		||||
        client.send(this.serializeResponse(response));
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
@ -738,10 +735,13 @@ class WebsocketHandler {
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const confirmedTxids: { [txid: string]: boolean } = {};
 | 
			
		||||
 | 
			
		||||
    // Update mempool to remove transactions included in the new block
 | 
			
		||||
    for (const txId of txIds) {
 | 
			
		||||
      delete _memPool[txId];
 | 
			
		||||
      rbfCache.mined(txId);
 | 
			
		||||
      confirmedTxids[txId] = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
 | 
			
		||||
@ -773,6 +773,8 @@ class WebsocketHandler {
 | 
			
		||||
      'fees': fees,
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions();
 | 
			
		||||
 | 
			
		||||
    const responseCache = { ...this.socketData };
 | 
			
		||||
    function getCachedResponse(key, data): string {
 | 
			
		||||
      if (!responseCache[key]) {
 | 
			
		||||
@ -808,7 +810,7 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
      if (client['track-tx']) {
 | 
			
		||||
        const trackTxid = client['track-tx'];
 | 
			
		||||
        if (trackTxid && txIds.indexOf(trackTxid) > -1) {
 | 
			
		||||
        if (trackTxid && confirmedTxids[trackTxid]) {
 | 
			
		||||
          response['txConfirmed'] = JSON.stringify(trackTxid);
 | 
			
		||||
        } else {
 | 
			
		||||
          const mempoolTx = _memPool[trackTxid];
 | 
			
		||||
@ -880,17 +882,24 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
      if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
 | 
			
		||||
        const index = client['track-mempool-block'];
 | 
			
		||||
        if (mBlockDeltas && mBlockDeltas[index]) {
 | 
			
		||||
          response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
 | 
			
		||||
            index: index,
 | 
			
		||||
            delta: mBlockDeltas[index],
 | 
			
		||||
          });
 | 
			
		||||
 | 
			
		||||
        if (mBlockDeltas && mBlockDeltas[index] && mBlocksWithTransactions[index]?.transactions?.length) {
 | 
			
		||||
          if (mBlockDeltas[index].added.length > (mBlocksWithTransactions[index]?.transactions.length / 2)) {
 | 
			
		||||
            response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-full-${index}`, {
 | 
			
		||||
              index: index,
 | 
			
		||||
              blockTransactions: mBlocksWithTransactions[index].transactions,
 | 
			
		||||
            });
 | 
			
		||||
          } else {
 | 
			
		||||
            response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-delta-${index}`, {
 | 
			
		||||
              index: index,
 | 
			
		||||
              delta: mBlockDeltas[index],
 | 
			
		||||
            });
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (Object.keys(response).length) {
 | 
			
		||||
        const serializedResponse = this.serializeResponse(response);
 | 
			
		||||
        client.send(serializedResponse);
 | 
			
		||||
        client.send(this.serializeResponse(response));
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
@ -951,10 +960,27 @@ class WebsocketHandler {
 | 
			
		||||
 | 
			
		||||
  private printLogs(): void {
 | 
			
		||||
    if (this.wss) {
 | 
			
		||||
      let numTxSubs = 0;
 | 
			
		||||
      let numProjectedSubs = 0;
 | 
			
		||||
      let numRbfSubs = 0;
 | 
			
		||||
 | 
			
		||||
      this.wss.clients.forEach((client) => {
 | 
			
		||||
        if (client['track-tx']) {
 | 
			
		||||
          numTxSubs++;
 | 
			
		||||
        }
 | 
			
		||||
        if (client['track-mempool-block'] >= 0) {
 | 
			
		||||
          numProjectedSubs++;
 | 
			
		||||
        }
 | 
			
		||||
        if (client['track-rbf']) {
 | 
			
		||||
          numRbfSubs++;
 | 
			
		||||
        }
 | 
			
		||||
      })
 | 
			
		||||
 | 
			
		||||
      const count = this.wss?.clients?.size || 0;
 | 
			
		||||
      const diff = count - this.numClients;
 | 
			
		||||
      this.numClients = count;
 | 
			
		||||
      logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);
 | 
			
		||||
      logger.debug(`websocket subscriptions: track-tx: ${numTxSubs}, track-mempool-block: ${numProjectedSubs} track-rbf: ${numRbfSubs}`);
 | 
			
		||||
      this.numConnected = 0;
 | 
			
		||||
      this.numDisconnected = 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -35,6 +35,8 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
 | 
			
		||||
  blockSub: Subscription;
 | 
			
		||||
  deltaSub: Subscription;
 | 
			
		||||
 | 
			
		||||
  firstLoad: boolean = true;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    public stateService: StateService,
 | 
			
		||||
    private websocketService: WebsocketService,
 | 
			
		||||
@ -58,7 +60,40 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
 | 
			
		||||
      )
 | 
			
		||||
      .pipe(switchMap(() => this.stateService.mempoolBlockTransactions$))
 | 
			
		||||
      .subscribe((transactionsStripped) => {
 | 
			
		||||
        this.replaceBlock(transactionsStripped);
 | 
			
		||||
        if (this.firstLoad) {
 | 
			
		||||
          this.replaceBlock(transactionsStripped);
 | 
			
		||||
        } else {
 | 
			
		||||
          const inOldBlock = {};
 | 
			
		||||
          const inNewBlock = {};
 | 
			
		||||
          const added: TransactionStripped[] = [];
 | 
			
		||||
          const changed: { txid: string, rate: number | undefined, acc: boolean | undefined }[] = [];
 | 
			
		||||
          const removed: string[] = [];
 | 
			
		||||
          for (const tx of transactionsStripped) {
 | 
			
		||||
            inNewBlock[tx.txid] = true;
 | 
			
		||||
          }
 | 
			
		||||
          for (const txid of Object.keys(this.blockGraph?.scene?.txs || {})) {
 | 
			
		||||
            inOldBlock[txid] = true;
 | 
			
		||||
            if (!inNewBlock[txid]) {
 | 
			
		||||
              removed.push(txid);
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
          for (const tx of transactionsStripped) {
 | 
			
		||||
            if (!inOldBlock[tx.txid]) {
 | 
			
		||||
              added.push(tx);
 | 
			
		||||
            } else {
 | 
			
		||||
              changed.push({
 | 
			
		||||
                txid: tx.txid,
 | 
			
		||||
                rate: tx.rate,
 | 
			
		||||
                acc: tx.acc
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
          this.updateBlock({
 | 
			
		||||
            removed,
 | 
			
		||||
            changed,
 | 
			
		||||
            added
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    this.deltaSub = this.stateService.mempoolBlockDelta$.subscribe((delta) => {
 | 
			
		||||
      this.updateBlock(delta);
 | 
			
		||||
@ -67,6 +102,7 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
 | 
			
		||||
 | 
			
		||||
  ngOnChanges(changes): void {
 | 
			
		||||
    if (changes.index) {
 | 
			
		||||
      this.firstLoad = true;
 | 
			
		||||
      if (this.blockGraph) {
 | 
			
		||||
        this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user