Merge pull request #5035 from mempool/mononaut/accelerator-websocket
Replace acceleration API polling with websocket
This commit is contained in:
		
						commit
						feab4d2a51
					
				| @ -404,6 +404,10 @@ class Mempool { | ||||
| 
 | ||||
|       const newAccelerationMap: { [txid: string]: Acceleration } = {}; | ||||
|       for (const acceleration of newAccelerations) { | ||||
|         // skip transactions we don't know about
 | ||||
|         if (!this.mempoolCache[acceleration.txid]) { | ||||
|           continue; | ||||
|         } | ||||
|         newAccelerationMap[acceleration.txid] = acceleration; | ||||
|         if (this.accelerations[acceleration.txid] == null) { | ||||
|           // new acceleration
 | ||||
|  | ||||
| @ -347,6 +347,17 @@ class WebsocketHandler { | ||||
|             } | ||||
|           } | ||||
| 
 | ||||
|           if (parsedMessage && parsedMessage['track-accelerations'] != null) { | ||||
|             if (parsedMessage['track-accelerations']) { | ||||
|               client['track-accelerations'] = true; | ||||
|               response['accelerations'] = JSON.stringify({ | ||||
|                 accelerations: Object.values(memPool.getAccelerations()), | ||||
|               }); | ||||
|             } else { | ||||
|               client['track-accelerations'] = false; | ||||
|             } | ||||
|           } | ||||
| 
 | ||||
|           if (parsedMessage.action === 'init') { | ||||
|             if (!this.socketData['blocks']?.length || !this.socketData['da'] || !this.socketData['backendInfo'] || !this.socketData['conversions']) { | ||||
|               this.updateSocketData(); | ||||
| @ -537,6 +548,7 @@ class WebsocketHandler { | ||||
|     const vBytesPerSecond = memPool.getVBytesPerSecond(); | ||||
|     const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); | ||||
|     const da = difficultyAdjustment.getDifficultyAdjustment(); | ||||
|     const accelerations = memPool.getAccelerations(); | ||||
|     memPool.handleRbfTransactions(rbfTransactions); | ||||
|     const rbfChanges = rbfCache.getRbfChanges(); | ||||
|     let rbfReplacements; | ||||
| @ -644,6 +656,12 @@ class WebsocketHandler { | ||||
|     const addressCache = this.makeAddressCache(newTransactions); | ||||
|     const removedAddressCache = this.makeAddressCache(deletedTransactions); | ||||
| 
 | ||||
|     // pre-compute acceleration delta
 | ||||
|     const accelerationUpdate = { | ||||
|       added: accelerationDelta.map(txid => accelerations[txid]).filter(acc => acc != null), | ||||
|       removed: accelerationDelta.filter(txid => !accelerations[txid]), | ||||
|     }; | ||||
| 
 | ||||
|     // TODO - Fix indentation after PR is merged
 | ||||
|     for (const server of this.webSocketServers) { | ||||
|     server.clients.forEach(async (client) => { | ||||
| @ -891,6 +909,10 @@ class WebsocketHandler { | ||||
|         response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta); | ||||
|       } | ||||
| 
 | ||||
|       if (client['track-accelerations'] && (accelerationUpdate.added.length || accelerationUpdate.removed.length)) { | ||||
|         response['accelerations'] = getCachedResponse('accelerations', accelerationUpdate); | ||||
|       } | ||||
| 
 | ||||
|       if (Object.keys(response).length) { | ||||
|         client.send(this.serializeResponse(response)); | ||||
|       } | ||||
|  | ||||
| @ -1,5 +1,5 @@ | ||||
| import { Component, OnInit, ChangeDetectionStrategy, Input, ChangeDetectorRef } from '@angular/core'; | ||||
| import { combineLatest, BehaviorSubject, Observable, catchError, of, switchMap, tap } from 'rxjs'; | ||||
| import { Component, OnInit, ChangeDetectionStrategy, Input, ChangeDetectorRef, OnDestroy } from '@angular/core'; | ||||
| import { BehaviorSubject, Observable, catchError, of, switchMap, tap } from 'rxjs'; | ||||
| import { Acceleration, BlockExtended } from '../../../interfaces/node-api.interface'; | ||||
| import { StateService } from '../../../services/state.service'; | ||||
| import { WebsocketService } from '../../../services/websocket.service'; | ||||
| @ -11,7 +11,7 @@ import { ServicesApiServices } from '../../../services/services-api.service'; | ||||
|   styleUrls: ['./accelerations-list.component.scss'], | ||||
|   changeDetection: ChangeDetectionStrategy.OnPush, | ||||
| }) | ||||
| export class AccelerationsListComponent implements OnInit { | ||||
| export class AccelerationsListComponent implements OnInit, OnDestroy { | ||||
|   @Input() widget: boolean = false; | ||||
|   @Input() pending: boolean = false; | ||||
|   @Input() accelerations$: Observable<Acceleration[]>; | ||||
| @ -44,7 +44,10 @@ export class AccelerationsListComponent implements OnInit { | ||||
|      | ||||
|     this.accelerationList$ = this.pageSubject.pipe( | ||||
|       switchMap((page) => { | ||||
|         const accelerationObservable$ = this.accelerations$ || (this.pending ? this.servicesApiService.getAccelerations$() : this.servicesApiService.getAccelerationHistoryObserveResponse$({ page: page })); | ||||
|         const accelerationObservable$ = this.accelerations$ || (this.pending ? this.stateService.liveAccelerations$ : this.servicesApiService.getAccelerationHistoryObserveResponse$({ page: page })); | ||||
|         if (!this.accelerations$ && this.pending) { | ||||
|           this.websocketService.ensureTrackAccelerations(); | ||||
|         } | ||||
|         return accelerationObservable$.pipe( | ||||
|           switchMap(response => { | ||||
|             let accelerations = response; | ||||
| @ -85,4 +88,8 @@ export class AccelerationsListComponent implements OnInit { | ||||
|   trackByBlock(index: number, block: BlockExtended): number { | ||||
|     return block.height; | ||||
|   } | ||||
| 
 | ||||
|   ngOnDestroy(): void { | ||||
|     this.websocketService.stopTrackAccelerations(); | ||||
|   } | ||||
| } | ||||
| @ -1,10 +1,10 @@ | ||||
| import { ChangeDetectionStrategy, Component, HostListener, Inject, OnInit, PLATFORM_ID } from '@angular/core'; | ||||
| import { ChangeDetectionStrategy, Component, HostListener, Inject, OnDestroy, OnInit, PLATFORM_ID } from '@angular/core'; | ||||
| import { SeoService } from '../../../services/seo.service'; | ||||
| import { OpenGraphService } from '../../../services/opengraph.service'; | ||||
| import { WebsocketService } from '../../../services/websocket.service'; | ||||
| import { Acceleration, BlockExtended } from '../../../interfaces/node-api.interface'; | ||||
| import { StateService } from '../../../services/state.service'; | ||||
| import { Observable, catchError, combineLatest, distinctUntilChanged, interval, map, of, share, startWith, switchMap, tap } from 'rxjs'; | ||||
| import { Observable, Subscription, catchError, combineLatest, distinctUntilChanged, map, of, share, switchMap, tap } from 'rxjs'; | ||||
| import { Color } from '../../block-overview-graph/sprite-types'; | ||||
| import { hexToColor } from '../../block-overview-graph/utils'; | ||||
| import TxView from '../../block-overview-graph/tx-view'; | ||||
| @ -28,7 +28,7 @@ interface AccelerationBlock extends BlockExtended { | ||||
|   styleUrls: ['./accelerator-dashboard.component.scss'], | ||||
|   changeDetection: ChangeDetectionStrategy.OnPush, | ||||
| }) | ||||
| export class AcceleratorDashboardComponent implements OnInit { | ||||
| export class AcceleratorDashboardComponent implements OnInit, OnDestroy { | ||||
|   blocks$: Observable<AccelerationBlock[]>; | ||||
|   accelerations$: Observable<Acceleration[]>; | ||||
|   pendingAccelerations$: Observable<Acceleration[]>; | ||||
| @ -39,6 +39,8 @@ export class AcceleratorDashboardComponent implements OnInit { | ||||
|   firstLoad = true; | ||||
|   timespan: '3d' | '1w' | '1m' = '1w'; | ||||
| 
 | ||||
|   accelerationDeltaSubscription: Subscription; | ||||
| 
 | ||||
|   graphHeight: number = 300; | ||||
|   theme: ThemeService; | ||||
| 
 | ||||
| @ -59,27 +61,28 @@ export class AcceleratorDashboardComponent implements OnInit { | ||||
|   ngOnInit(): void { | ||||
|     this.onResize(); | ||||
|     this.websocketService.want(['blocks', 'mempool-blocks', 'stats']); | ||||
|     this.websocketService.startTrackAccelerations(); | ||||
| 
 | ||||
|     this.pendingAccelerations$ = (this.stateService.isBrowser ? interval(30000) : of(null)).pipe( | ||||
|       startWith(true), | ||||
|       switchMap(() => { | ||||
|         return this.serviceApiServices.getAccelerations$().pipe( | ||||
|           catchError(() => { | ||||
|             return of([]); | ||||
|           }), | ||||
|         ); | ||||
|       }), | ||||
|       tap(accelerations => { | ||||
|         if (!this.firstLoad && accelerations.some(acc => !this.seen.has(acc.txid))) { | ||||
|           this.audioService.playSound('bright-harmony'); | ||||
|         } | ||||
|         for(const acc of accelerations) { | ||||
|           this.seen.add(acc.txid); | ||||
|         } | ||||
|         this.firstLoad = false; | ||||
|       }), | ||||
|     this.pendingAccelerations$ = this.stateService.liveAccelerations$.pipe( | ||||
|       share(), | ||||
|     ); | ||||
|     this.accelerationDeltaSubscription = this.stateService.accelerations$.subscribe((delta) => { | ||||
|       if (!delta.reset) { | ||||
|         let hasNewAcceleration = false; | ||||
|         for (const acc of delta.added) { | ||||
|           if (!this.seen.has(acc.txid)) { | ||||
|             hasNewAcceleration = true; | ||||
|           } | ||||
|           this.seen.add(acc.txid); | ||||
|         } | ||||
|         for (const txid of delta.removed) { | ||||
|           this.seen.delete(txid); | ||||
|         } | ||||
|         if (hasNewAcceleration) { | ||||
|           this.audioService.playSound('bright-harmony'); | ||||
|         } | ||||
|       } | ||||
|     }); | ||||
| 
 | ||||
|     this.accelerations$ = this.stateService.chainTip$.pipe( | ||||
|       distinctUntilChanged(), | ||||
| @ -154,6 +157,11 @@ export class AcceleratorDashboardComponent implements OnInit { | ||||
|     return false; | ||||
|   } | ||||
| 
 | ||||
|   ngOnDestroy(): void { | ||||
|     this.accelerationDeltaSubscription.unsubscribe(); | ||||
|     this.websocketService.stopTrackAccelerations(); | ||||
|   } | ||||
| 
 | ||||
|   @HostListener('window:resize', ['$event']) | ||||
|   onResize(): void { | ||||
|     if (window.innerWidth >= 992) { | ||||
|  | ||||
| @ -2,7 +2,8 @@ import { ChangeDetectionStrategy, Component, Input, OnInit } from '@angular/core | ||||
| import { Observable, of } from 'rxjs'; | ||||
| import { switchMap } from 'rxjs/operators'; | ||||
| import { Acceleration } from '../../../interfaces/node-api.interface'; | ||||
| import { ServicesApiServices } from '../../../services/services-api.service'; | ||||
| import { StateService } from '../../../services/state.service'; | ||||
| import { WebsocketService } from '../../../services/websocket.service'; | ||||
| 
 | ||||
| @Component({ | ||||
|   selector: 'app-pending-stats', | ||||
| @ -15,11 +16,12 @@ export class PendingStatsComponent implements OnInit { | ||||
|   public accelerationStats$: Observable<any>; | ||||
| 
 | ||||
|   constructor( | ||||
|     private servicesApiService: ServicesApiServices, | ||||
|     private stateService: StateService, | ||||
|     private websocketService: WebsocketService, | ||||
|   ) { } | ||||
| 
 | ||||
|   ngOnInit(): void { | ||||
|     this.accelerationStats$ = (this.accelerations$ || this.servicesApiService.getAccelerations$()).pipe( | ||||
|     this.accelerationStats$ = (this.accelerations$ || this.stateService.liveAccelerations$).pipe( | ||||
|       switchMap(accelerations => { | ||||
|         let totalAccelerations = 0; | ||||
|         let totalFeeDelta = 0; | ||||
|  | ||||
| @ -1,7 +1,7 @@ | ||||
| import { SafeResourceUrl } from '@angular/platform-browser'; | ||||
| import { ILoadingIndicators } from '../services/state.service'; | ||||
| import { Transaction } from './electrs.interface'; | ||||
| import { BlockExtended, DifficultyAdjustment, RbfTree, TransactionStripped } from './node-api.interface'; | ||||
| import { Acceleration, BlockExtended, DifficultyAdjustment, RbfTree, TransactionStripped } from './node-api.interface'; | ||||
| 
 | ||||
| export interface WebsocketResponse { | ||||
|   backend?: 'esplora' | 'electrum' | 'none'; | ||||
| @ -35,6 +35,7 @@ export interface WebsocketResponse { | ||||
|   'track-mempool-block'?: number; | ||||
|   'track-rbf'?: string; | ||||
|   'track-rbf-summary'?: boolean; | ||||
|   'track-accelerations'?: boolean; | ||||
|   'watch-mempool'?: boolean; | ||||
|   'refresh-blocks'?: boolean; | ||||
| } | ||||
| @ -92,6 +93,12 @@ export interface MempoolBlockDeltaCompressed { | ||||
|   changed: MempoolDeltaChange[]; | ||||
| } | ||||
| 
 | ||||
| export interface AccelerationDelta { | ||||
|   added: Acceleration[]; | ||||
|   removed: string[]; | ||||
|   reset?: boolean; | ||||
| } | ||||
| 
 | ||||
| export interface MempoolInfo { | ||||
|   loaded: boolean;                 //  (boolean) True if the mempool is fully loaded
 | ||||
|   size: number;                    //  (numeric) Current tx count
 | ||||
|  | ||||
| @ -1,8 +1,8 @@ | ||||
| import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core'; | ||||
| import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs'; | ||||
| import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs'; | ||||
| import { Transaction } from '../interfaces/electrs.interface'; | ||||
| import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; | ||||
| import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; | ||||
| import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; | ||||
| import { Acceleration, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; | ||||
| import { Router, NavigationStart } from '@angular/router'; | ||||
| import { isPlatformBrowser } from '@angular/common'; | ||||
| import { filter, map, scan, shareReplay } from 'rxjs/operators'; | ||||
| @ -129,6 +129,8 @@ export class StateService { | ||||
|   mempoolBlocks$ = new ReplaySubject<MempoolBlock[]>(1); | ||||
|   mempoolBlockUpdate$ = new Subject<MempoolBlockUpdate>(); | ||||
|   liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>; | ||||
|   accelerations$ = new Subject<AccelerationDelta>(); | ||||
|   liveAccelerations$: Observable<Acceleration[]>; | ||||
|   txConfirmed$ = new Subject<[string, BlockExtended]>(); | ||||
|   txReplaced$ = new Subject<ReplacedTransaction>(); | ||||
|   txRbfInfo$ = new Subject<RbfTree>(); | ||||
| @ -238,6 +240,24 @@ export class StateService { | ||||
|       } | ||||
|     }, {})); | ||||
| 
 | ||||
|     // Emits the full list of pending accelerations each time it changes
 | ||||
|     this.liveAccelerations$ = this.accelerations$.pipe( | ||||
|       scan((accelerations: { [txid: string]: Acceleration }, delta: AccelerationDelta) => { | ||||
|         if (delta.reset) { | ||||
|           accelerations = {}; | ||||
|         } else { | ||||
|           for (const txid of delta.removed) { | ||||
|             delete accelerations[txid]; | ||||
|           } | ||||
|         } | ||||
|         for (const acc of delta.added) { | ||||
|           accelerations[acc.txid] = acc; | ||||
|         } | ||||
|         return accelerations; | ||||
|       }, {}), | ||||
|       map((accMap) => Object.values(accMap).sort((a,b) => b.added - a.added)) | ||||
|     ); | ||||
| 
 | ||||
|     this.networkChanged$.subscribe((network) => { | ||||
|       this.transactions$ = new BehaviorSubject<TransactionStripped[]>(null); | ||||
|       this.blocksSubject$.next([]); | ||||
|  | ||||
| @ -33,6 +33,7 @@ export class WebsocketService { | ||||
|   private isTrackingRbfSummary = false; | ||||
|   private isTrackingAddress: string | false = false; | ||||
|   private isTrackingAddresses: string[] | false = false; | ||||
|   private isTrackingAccelerations: boolean = false; | ||||
|   private trackingMempoolBlock: number; | ||||
|   private latestGitCommit = ''; | ||||
|   private onlineCheckTimeout: number; | ||||
| @ -132,6 +133,9 @@ export class WebsocketService { | ||||
|           if (this.isTrackingAddresses) { | ||||
|             this.startTrackAddresses(this.isTrackingAddresses); | ||||
|           } | ||||
|           if (this.isTrackingAccelerations) { | ||||
|             this.startTrackAccelerations(); | ||||
|           } | ||||
|           this.stateService.connectionState$.next(2); | ||||
|         } | ||||
| 
 | ||||
| @ -235,6 +239,24 @@ export class WebsocketService { | ||||
|     this.isTrackingRbfSummary = false; | ||||
|   } | ||||
| 
 | ||||
|   startTrackAccelerations() { | ||||
|     this.websocketSubject.next({ 'track-accelerations': true }); | ||||
|     this.isTrackingAccelerations = true; | ||||
|   } | ||||
| 
 | ||||
|   stopTrackAccelerations() { | ||||
|     if (this.isTrackingAccelerations) { | ||||
|       this.websocketSubject.next({ 'track-accelerations': false }); | ||||
|       this.isTrackingAccelerations = false; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   ensureTrackAccelerations() { | ||||
|     if (!this.isTrackingAccelerations) { | ||||
|       this.startTrackAccelerations(); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   fetchStatistics(historicalDate: string) { | ||||
|     this.websocketSubject.next({ historicalDate }); | ||||
|   } | ||||
| @ -416,6 +438,18 @@ export class WebsocketService { | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (response['accelerations']) { | ||||
|       if (response['accelerations'].accelerations) { | ||||
|         this.stateService.accelerations$.next({ | ||||
|           added: response['accelerations'].accelerations, | ||||
|           removed: [], | ||||
|           reset: true, | ||||
|         }); | ||||
|       } else { | ||||
|         this.stateService.accelerations$.next(response['accelerations']); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (response['live-2h-chart']) { | ||||
|       this.stateService.live2Chart$.next(response['live-2h-chart']); | ||||
|     } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user