Improvments to the mempool transaction subscription.
This commit is contained in:
		
							parent
							
								
									d432b3ce4a
								
							
						
					
					
						commit
						bb90a622d2
					
				@ -2,7 +2,7 @@ const config = require('../../mempool-config.json');
 | 
			
		||||
 | 
			
		||||
import * as WebSocket from 'ws';
 | 
			
		||||
import * as fs from 'fs';
 | 
			
		||||
import { Block, TransactionExtended, Statistic } from '../interfaces';
 | 
			
		||||
import { Block, TransactionExtended, Statistic, WebsocketResponse } from '../interfaces';
 | 
			
		||||
import blocks from './blocks';
 | 
			
		||||
import memPool from './mempool';
 | 
			
		||||
import mempoolBlocks from './mempool-blocks';
 | 
			
		||||
@ -36,7 +36,8 @@ class WebsocketHandler {
 | 
			
		||||
    this.wss.on('connection', (client: WebSocket) => {
 | 
			
		||||
      client.on('message', (message: any) => {
 | 
			
		||||
        try {
 | 
			
		||||
          const parsedMessage = JSON.parse(message);
 | 
			
		||||
          const parsedMessage: WebsocketResponse = JSON.parse(message);
 | 
			
		||||
          const response = {};
 | 
			
		||||
 | 
			
		||||
          if (parsedMessage.action === 'want') {
 | 
			
		||||
            client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1;
 | 
			
		||||
@ -48,6 +49,15 @@ class WebsocketHandler {
 | 
			
		||||
          if (parsedMessage && parsedMessage['track-tx']) {
 | 
			
		||||
            if (/^[a-fA-F0-9]{64}$/.test(parsedMessage['track-tx'])) {
 | 
			
		||||
              client['track-tx'] = parsedMessage['track-tx'];
 | 
			
		||||
              // Client is telling the transaction wasn't found but it might have appeared before we had the time to start watching for it
 | 
			
		||||
              if (parsedMessage['watch-mempool']) {
 | 
			
		||||
                const tx = memPool.getMempool()[client['track-tx']];
 | 
			
		||||
                if (tx) {
 | 
			
		||||
                  response['tx'] = tx;
 | 
			
		||||
                } else {
 | 
			
		||||
                  client['track-mempool-tx'] = parsedMessage['track-tx'];
 | 
			
		||||
                }
 | 
			
		||||
              }
 | 
			
		||||
            } else {
 | 
			
		||||
              client['track-tx'] = null;
 | 
			
		||||
            }
 | 
			
		||||
@ -78,7 +88,11 @@ class WebsocketHandler {
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          if (parsedMessage.action === 'ping') {
 | 
			
		||||
            client.send(JSON.stringify({'pong': true}));
 | 
			
		||||
            response['pong'] = true;
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          if (Object.keys(response).length) {
 | 
			
		||||
            client.send(JSON.stringify(response));
 | 
			
		||||
          }
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
          console.log(e);
 | 
			
		||||
@ -133,10 +147,11 @@ class WebsocketHandler {
 | 
			
		||||
        response['mempool-blocks'] = mBlocks;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (client['track-tx']) {
 | 
			
		||||
        const tx = newTransactions.find((t) => t.txid === client['track-tx']);
 | 
			
		||||
      if (client['track-mempool-tx']) {
 | 
			
		||||
        const tx = newTransactions.find((t) => t.txid === client['track-mempool-tx']);
 | 
			
		||||
        if (tx) {
 | 
			
		||||
          response['tx'] = tx;
 | 
			
		||||
          client['track-mempool-tx'] = null;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -180,4 +180,10 @@ export interface Outspend {
 | 
			
		||||
  vin: number;
 | 
			
		||||
  status: Status;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface WebsocketResponse {
 | 
			
		||||
  action: string;
 | 
			
		||||
  data: string[];
 | 
			
		||||
  'track-tx': string;
 | 
			
		||||
  'track-address': string;
 | 
			
		||||
  'watch-mempool': boolean;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,9 +1,9 @@
 | 
			
		||||
import { Component, OnInit, OnDestroy } from '@angular/core';
 | 
			
		||||
import { ElectrsApiService } from '../../services/electrs-api.service';
 | 
			
		||||
import { ActivatedRoute, ParamMap } from '@angular/router';
 | 
			
		||||
import { switchMap, filter, take, catchError, mergeMap, flatMap, mergeAll, tap, map } from 'rxjs/operators';
 | 
			
		||||
import { switchMap, filter, take, catchError, flatMap } from 'rxjs/operators';
 | 
			
		||||
import { Transaction, Block } from '../../interfaces/electrs.interface';
 | 
			
		||||
import { of, merge, Subscription, Observable, scheduled } from 'rxjs';
 | 
			
		||||
import { of, merge, Subscription, Observable } from 'rxjs';
 | 
			
		||||
import { StateService } from '../../services/state.service';
 | 
			
		||||
import { WebsocketService } from '../../services/websocket.service';
 | 
			
		||||
import { AudioService } from 'src/app/services/audio.service';
 | 
			
		||||
@ -122,7 +122,7 @@ export class TransactionComponent implements OnInit, OnDestroy {
 | 
			
		||||
 | 
			
		||||
  handleLoadElectrsTransactionError(error: any): Observable<any> {
 | 
			
		||||
    if (error.status === 404 && /^[a-fA-F0-9]{64}$/.test(this.txId)) {
 | 
			
		||||
      this.websocketService.startTrackTransaction(this.txId);
 | 
			
		||||
      this.websocketService.startMultiTrackTransaction(this.txId);
 | 
			
		||||
      this.waitingForTransaction = true;
 | 
			
		||||
    }
 | 
			
		||||
    this.error = error;
 | 
			
		||||
 | 
			
		||||
@ -13,6 +13,7 @@ export interface WebsocketResponse {
 | 
			
		||||
  tx?: Transaction;
 | 
			
		||||
  'track-tx'?: string;
 | 
			
		||||
  'track-address'?: string;
 | 
			
		||||
  'watch-mempool'?: boolean;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface MempoolBlock {
 | 
			
		||||
 | 
			
		||||
@ -133,6 +133,11 @@ export class WebsocketService {
 | 
			
		||||
    this.isTrackingTx = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  startMultiTrackTransaction(txId: string) {
 | 
			
		||||
    this.websocketSubject.next({ 'track-tx': txId, 'watch-mempool': true });
 | 
			
		||||
    this.isTrackingTx = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  stopTrackingTransaction() {
 | 
			
		||||
    if (this.isTrackingTx === false) {
 | 
			
		||||
      return;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user