diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 176bedddb..c93d51005 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -404,6 +404,10 @@ class Mempool { const newAccelerationMap: { [txid: string]: Acceleration } = {}; for (const acceleration of newAccelerations) { + // skip transactions we don't know about + if (!this.mempoolCache[acceleration.txid]) { + continue; + } newAccelerationMap[acceleration.txid] = acceleration; if (this.accelerations[acceleration.txid] == null) { // new acceleration diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index fdda3df88..f92e6cdfe 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -347,6 +347,17 @@ class WebsocketHandler { } } + if (parsedMessage && parsedMessage['track-accelerations'] != null) { + if (parsedMessage['track-accelerations']) { + client['track-accelerations'] = true; + response['accelerations'] = JSON.stringify({ + accelerations: Object.values(memPool.getAccelerations()), + }); + } else { + client['track-accelerations'] = false; + } + } + if (parsedMessage.action === 'init') { if (!this.socketData['blocks']?.length || !this.socketData['da'] || !this.socketData['backendInfo'] || !this.socketData['conversions']) { this.updateSocketData(); @@ -537,6 +548,7 @@ class WebsocketHandler { const vBytesPerSecond = memPool.getVBytesPerSecond(); const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); const da = difficultyAdjustment.getDifficultyAdjustment(); + const accelerations = memPool.getAccelerations(); memPool.handleRbfTransactions(rbfTransactions); const rbfChanges = rbfCache.getRbfChanges(); let rbfReplacements; @@ -644,6 +656,12 @@ class WebsocketHandler { const addressCache = this.makeAddressCache(newTransactions); const removedAddressCache = this.makeAddressCache(deletedTransactions); + // pre-compute acceleration delta + const accelerationUpdate = { + added: accelerationDelta.map(txid => accelerations[txid]).filter(acc => acc != null), + removed: accelerationDelta.filter(txid => !accelerations[txid]), + }; + // TODO - Fix indentation after PR is merged for (const server of this.webSocketServers) { server.clients.forEach(async (client) => { @@ -891,6 +909,10 @@ class WebsocketHandler { response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta); } + if (client['track-accelerations'] && (accelerationUpdate.added.length || accelerationUpdate.removed.length)) { + response['accelerations'] = getCachedResponse('accelerations', accelerationUpdate); + } + if (Object.keys(response).length) { client.send(this.serializeResponse(response)); } diff --git a/frontend/src/app/components/acceleration/accelerations-list/accelerations-list.component.ts b/frontend/src/app/components/acceleration/accelerations-list/accelerations-list.component.ts index f2c082fc8..237b14317 100644 --- a/frontend/src/app/components/acceleration/accelerations-list/accelerations-list.component.ts +++ b/frontend/src/app/components/acceleration/accelerations-list/accelerations-list.component.ts @@ -1,5 +1,5 @@ -import { Component, OnInit, ChangeDetectionStrategy, Input, ChangeDetectorRef } from '@angular/core'; -import { combineLatest, BehaviorSubject, Observable, catchError, of, switchMap, tap } from 'rxjs'; +import { Component, OnInit, ChangeDetectionStrategy, Input, ChangeDetectorRef, OnDestroy } from '@angular/core'; +import { BehaviorSubject, Observable, catchError, of, switchMap, tap } from 'rxjs'; import { Acceleration, BlockExtended } from '../../../interfaces/node-api.interface'; import { StateService } from '../../../services/state.service'; import { WebsocketService } from '../../../services/websocket.service'; @@ -11,7 +11,7 @@ import { ServicesApiServices } from '../../../services/services-api.service'; styleUrls: ['./accelerations-list.component.scss'], changeDetection: ChangeDetectionStrategy.OnPush, }) -export class AccelerationsListComponent implements OnInit { +export class AccelerationsListComponent implements OnInit, OnDestroy { @Input() widget: boolean = false; @Input() pending: boolean = false; @Input() accelerations$: Observable; @@ -44,7 +44,10 @@ export class AccelerationsListComponent implements OnInit { this.accelerationList$ = this.pageSubject.pipe( switchMap((page) => { - const accelerationObservable$ = this.accelerations$ || (this.pending ? this.servicesApiService.getAccelerations$() : this.servicesApiService.getAccelerationHistoryObserveResponse$({ page: page })); + const accelerationObservable$ = this.accelerations$ || (this.pending ? this.stateService.liveAccelerations$ : this.servicesApiService.getAccelerationHistoryObserveResponse$({ page: page })); + if (!this.accelerations$ && this.pending) { + this.websocketService.ensureTrackAccelerations(); + } return accelerationObservable$.pipe( switchMap(response => { let accelerations = response; @@ -85,4 +88,8 @@ export class AccelerationsListComponent implements OnInit { trackByBlock(index: number, block: BlockExtended): number { return block.height; } + + ngOnDestroy(): void { + this.websocketService.stopTrackAccelerations(); + } } \ No newline at end of file diff --git a/frontend/src/app/components/acceleration/accelerator-dashboard/accelerator-dashboard.component.ts b/frontend/src/app/components/acceleration/accelerator-dashboard/accelerator-dashboard.component.ts index dc53d8f95..282927b4a 100644 --- a/frontend/src/app/components/acceleration/accelerator-dashboard/accelerator-dashboard.component.ts +++ b/frontend/src/app/components/acceleration/accelerator-dashboard/accelerator-dashboard.component.ts @@ -1,10 +1,10 @@ -import { ChangeDetectionStrategy, Component, HostListener, Inject, OnInit, PLATFORM_ID } from '@angular/core'; +import { ChangeDetectionStrategy, Component, HostListener, Inject, OnDestroy, OnInit, PLATFORM_ID } from '@angular/core'; import { SeoService } from '../../../services/seo.service'; import { OpenGraphService } from '../../../services/opengraph.service'; import { WebsocketService } from '../../../services/websocket.service'; import { Acceleration, BlockExtended } from '../../../interfaces/node-api.interface'; import { StateService } from '../../../services/state.service'; -import { Observable, catchError, combineLatest, distinctUntilChanged, interval, map, of, share, startWith, switchMap, tap } from 'rxjs'; +import { Observable, Subscription, catchError, combineLatest, distinctUntilChanged, map, of, share, switchMap, tap } from 'rxjs'; import { Color } from '../../block-overview-graph/sprite-types'; import { hexToColor } from '../../block-overview-graph/utils'; import TxView from '../../block-overview-graph/tx-view'; @@ -28,7 +28,7 @@ interface AccelerationBlock extends BlockExtended { styleUrls: ['./accelerator-dashboard.component.scss'], changeDetection: ChangeDetectionStrategy.OnPush, }) -export class AcceleratorDashboardComponent implements OnInit { +export class AcceleratorDashboardComponent implements OnInit, OnDestroy { blocks$: Observable; accelerations$: Observable; pendingAccelerations$: Observable; @@ -39,6 +39,8 @@ export class AcceleratorDashboardComponent implements OnInit { firstLoad = true; timespan: '3d' | '1w' | '1m' = '1w'; + accelerationDeltaSubscription: Subscription; + graphHeight: number = 300; theme: ThemeService; @@ -59,27 +61,28 @@ export class AcceleratorDashboardComponent implements OnInit { ngOnInit(): void { this.onResize(); this.websocketService.want(['blocks', 'mempool-blocks', 'stats']); + this.websocketService.startTrackAccelerations(); - this.pendingAccelerations$ = (this.stateService.isBrowser ? interval(30000) : of(null)).pipe( - startWith(true), - switchMap(() => { - return this.serviceApiServices.getAccelerations$().pipe( - catchError(() => { - return of([]); - }), - ); - }), - tap(accelerations => { - if (!this.firstLoad && accelerations.some(acc => !this.seen.has(acc.txid))) { - this.audioService.playSound('bright-harmony'); - } - for(const acc of accelerations) { - this.seen.add(acc.txid); - } - this.firstLoad = false; - }), + this.pendingAccelerations$ = this.stateService.liveAccelerations$.pipe( share(), ); + this.accelerationDeltaSubscription = this.stateService.accelerations$.subscribe((delta) => { + if (!delta.reset) { + let hasNewAcceleration = false; + for (const acc of delta.added) { + if (!this.seen.has(acc.txid)) { + hasNewAcceleration = true; + } + this.seen.add(acc.txid); + } + for (const txid of delta.removed) { + this.seen.delete(txid); + } + if (hasNewAcceleration) { + this.audioService.playSound('bright-harmony'); + } + } + }); this.accelerations$ = this.stateService.chainTip$.pipe( distinctUntilChanged(), @@ -154,6 +157,11 @@ export class AcceleratorDashboardComponent implements OnInit { return false; } + ngOnDestroy(): void { + this.accelerationDeltaSubscription.unsubscribe(); + this.websocketService.stopTrackAccelerations(); + } + @HostListener('window:resize', ['$event']) onResize(): void { if (window.innerWidth >= 992) { diff --git a/frontend/src/app/components/acceleration/pending-stats/pending-stats.component.ts b/frontend/src/app/components/acceleration/pending-stats/pending-stats.component.ts index ed7061156..568e60d7e 100644 --- a/frontend/src/app/components/acceleration/pending-stats/pending-stats.component.ts +++ b/frontend/src/app/components/acceleration/pending-stats/pending-stats.component.ts @@ -2,7 +2,8 @@ import { ChangeDetectionStrategy, Component, Input, OnInit } from '@angular/core import { Observable, of } from 'rxjs'; import { switchMap } from 'rxjs/operators'; import { Acceleration } from '../../../interfaces/node-api.interface'; -import { ServicesApiServices } from '../../../services/services-api.service'; +import { StateService } from '../../../services/state.service'; +import { WebsocketService } from '../../../services/websocket.service'; @Component({ selector: 'app-pending-stats', @@ -15,11 +16,12 @@ export class PendingStatsComponent implements OnInit { public accelerationStats$: Observable; constructor( - private servicesApiService: ServicesApiServices, + private stateService: StateService, + private websocketService: WebsocketService, ) { } ngOnInit(): void { - this.accelerationStats$ = (this.accelerations$ || this.servicesApiService.getAccelerations$()).pipe( + this.accelerationStats$ = (this.accelerations$ || this.stateService.liveAccelerations$).pipe( switchMap(accelerations => { let totalAccelerations = 0; let totalFeeDelta = 0; diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index 8c24979e7..22789986e 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -1,7 +1,7 @@ import { SafeResourceUrl } from '@angular/platform-browser'; import { ILoadingIndicators } from '../services/state.service'; import { Transaction } from './electrs.interface'; -import { BlockExtended, DifficultyAdjustment, RbfTree, TransactionStripped } from './node-api.interface'; +import { Acceleration, BlockExtended, DifficultyAdjustment, RbfTree, TransactionStripped } from './node-api.interface'; export interface WebsocketResponse { backend?: 'esplora' | 'electrum' | 'none'; @@ -35,6 +35,7 @@ export interface WebsocketResponse { 'track-mempool-block'?: number; 'track-rbf'?: string; 'track-rbf-summary'?: boolean; + 'track-accelerations'?: boolean; 'watch-mempool'?: boolean; 'refresh-blocks'?: boolean; } @@ -92,6 +93,12 @@ export interface MempoolBlockDeltaCompressed { changed: MempoolDeltaChange[]; } +export interface AccelerationDelta { + added: Acceleration[]; + removed: string[]; + reset?: boolean; +} + export interface MempoolInfo { loaded: boolean; // (boolean) True if the mempool is fully loaded size: number; // (numeric) Current tx count diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index b939574b5..34197a3d1 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -1,8 +1,8 @@ import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core'; -import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs'; +import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs'; import { Transaction } from '../interfaces/electrs.interface'; -import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; -import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; +import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; +import { Acceleration, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; import { Router, NavigationStart } from '@angular/router'; import { isPlatformBrowser } from '@angular/common'; import { filter, map, scan, shareReplay } from 'rxjs/operators'; @@ -129,6 +129,8 @@ export class StateService { mempoolBlocks$ = new ReplaySubject(1); mempoolBlockUpdate$ = new Subject(); liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>; + accelerations$ = new Subject(); + liveAccelerations$: Observable; txConfirmed$ = new Subject<[string, BlockExtended]>(); txReplaced$ = new Subject(); txRbfInfo$ = new Subject(); @@ -238,6 +240,24 @@ export class StateService { } }, {})); + // Emits the full list of pending accelerations each time it changes + this.liveAccelerations$ = this.accelerations$.pipe( + scan((accelerations: { [txid: string]: Acceleration }, delta: AccelerationDelta) => { + if (delta.reset) { + accelerations = {}; + } else { + for (const txid of delta.removed) { + delete accelerations[txid]; + } + } + for (const acc of delta.added) { + accelerations[acc.txid] = acc; + } + return accelerations; + }, {}), + map((accMap) => Object.values(accMap).sort((a,b) => b.added - a.added)) + ); + this.networkChanged$.subscribe((network) => { this.transactions$ = new BehaviorSubject(null); this.blocksSubject$.next([]); diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index e4df12aa6..fbadf0de3 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -33,6 +33,7 @@ export class WebsocketService { private isTrackingRbfSummary = false; private isTrackingAddress: string | false = false; private isTrackingAddresses: string[] | false = false; + private isTrackingAccelerations: boolean = false; private trackingMempoolBlock: number; private latestGitCommit = ''; private onlineCheckTimeout: number; @@ -132,6 +133,9 @@ export class WebsocketService { if (this.isTrackingAddresses) { this.startTrackAddresses(this.isTrackingAddresses); } + if (this.isTrackingAccelerations) { + this.startTrackAccelerations(); + } this.stateService.connectionState$.next(2); } @@ -235,6 +239,24 @@ export class WebsocketService { this.isTrackingRbfSummary = false; } + startTrackAccelerations() { + this.websocketSubject.next({ 'track-accelerations': true }); + this.isTrackingAccelerations = true; + } + + stopTrackAccelerations() { + if (this.isTrackingAccelerations) { + this.websocketSubject.next({ 'track-accelerations': false }); + this.isTrackingAccelerations = false; + } + } + + ensureTrackAccelerations() { + if (!this.isTrackingAccelerations) { + this.startTrackAccelerations(); + } + } + fetchStatistics(historicalDate: string) { this.websocketSubject.next({ historicalDate }); } @@ -416,6 +438,18 @@ export class WebsocketService { } } + if (response['accelerations']) { + if (response['accelerations'].accelerations) { + this.stateService.accelerations$.next({ + added: response['accelerations'].accelerations, + removed: [], + reset: true, + }); + } else { + this.stateService.accelerations$.next(response['accelerations']); + } + } + if (response['live-2h-chart']) { this.stateService.live2Chart$.next(response['live-2h-chart']); }