diff --git a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts index 4d01bd9b9..9971e92ab 100644 --- a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts +++ b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts @@ -1,11 +1,10 @@ -import { Component, ComponentRef, ViewChild, HostListener, Input, Output, EventEmitter, +import { Component, ViewChild, Input, Output, EventEmitter, OnInit, OnDestroy, OnChanges, ChangeDetectionStrategy, ChangeDetectorRef, AfterViewInit } from '@angular/core'; import { StateService } from '../../services/state.service'; -import { MempoolBlockDelta } from '../../interfaces/websocket.interface'; +import { MempoolBlockDelta, isMempoolDelta } from '../../interfaces/websocket.interface'; import { TransactionStripped } from '../../interfaces/node-api.interface'; import { BlockOverviewGraphComponent } from '../../components/block-overview-graph/block-overview-graph.component'; -import { Subscription, BehaviorSubject, merge, of, timer } from 'rxjs'; -import { switchMap, filter, concatMap, map } from 'rxjs/operators'; +import { Subscription, BehaviorSubject } from 'rxjs'; import { WebsocketService } from '../../services/websocket.service'; import { RelativeUrlPipe } from '../../shared/pipes/relative-url/relative-url.pipe'; import { Router } from '@angular/router'; @@ -39,10 +38,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang poolDirection: string = 'left'; blockSub: Subscription; - rateLimit = 1000; - private lastEventTime = Date.now() - this.rateLimit; - private subId = 0; - firstLoad: boolean = true; constructor( @@ -62,39 +57,13 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang } ngAfterViewInit(): void { - this.blockSub = merge( - this.stateService.mempoolBlockTransactions$, - this.stateService.mempoolBlockDelta$, - ).pipe( - concatMap(update => { - const now = Date.now(); - const timeSinceLastEvent = now - this.lastEventTime; - this.lastEventTime = Math.max(now, this.lastEventTime + this.rateLimit); - - const subId = this.subId; - - // If time since last event is less than X seconds, delay this event - if (timeSinceLastEvent < this.rateLimit) { - return timer(this.rateLimit - timeSinceLastEvent).pipe( - // Emit the event after the timer - map(() => ({ update, subId })) - ); - } else { - // If enough time has passed, emit the event immediately - return of({ update, subId }); - } - }) - ).subscribe(({ update, subId }) => { - // discard stale updates after a block transition - if (subId !== this.subId) { - return; - } + this.blockSub = this.stateService.mempoolBlockUpdate$.subscribe((update) => { // process update - if (update['added']) { + if (isMempoolDelta(update)) { // delta - this.updateBlock(update as MempoolBlockDelta); + this.updateBlock(update); } else { - const transactionsStripped = update as TransactionStripped[]; + const transactionsStripped = update.transactions; // new transactions if (this.firstLoad) { this.replaceBlock(transactionsStripped); @@ -137,7 +106,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang ngOnChanges(changes): void { if (changes.index) { - this.subId++; this.firstLoad = true; if (this.blockGraph) { this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection); diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index daf06603f..8c24979e7 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -75,6 +75,16 @@ export interface MempoolBlockDelta { removed: string[]; changed: { txid: string, rate: number, flags: number, acc: boolean }[]; } +export interface MempoolBlockState { + transactions: TransactionStripped[]; +} +export type MempoolBlockUpdate = MempoolBlockDelta | MempoolBlockState; +export function isMempoolState(update: MempoolBlockUpdate): update is MempoolBlockState { + return update['transactions'] !== undefined; +} +export function isMempoolDelta(update: MempoolBlockUpdate): update is MempoolBlockDelta { + return update['transactions'] === undefined; +} export interface MempoolBlockDeltaCompressed { added: TransactionCompressed[]; diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index 286ae5e48..3554e465e 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core'; import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs'; import { Transaction } from '../interfaces/electrs.interface'; -import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo } from '../interfaces/websocket.interface'; +import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; import { Router, NavigationStart } from '@angular/router'; import { isPlatformBrowser } from '@angular/common'; @@ -127,8 +127,7 @@ export class StateService { bsqPrice$ = new ReplaySubject(1); mempoolInfo$ = new ReplaySubject(1); mempoolBlocks$ = new ReplaySubject(1); - mempoolBlockTransactions$ = new Subject(); - mempoolBlockDelta$ = new Subject(); + mempoolBlockUpdate$ = new Subject(); liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>; txConfirmed$ = new Subject<[string, BlockExtended]>(); txReplaced$ = new Subject(); @@ -215,25 +214,25 @@ export class StateService { this.router.navigate(['/tracker/' + window.location.pathname.slice(4)]); } - this.liveMempoolBlockTransactions$ = merge( - this.mempoolBlockTransactions$.pipe(map(transactions => { return { transactions }; })), - this.mempoolBlockDelta$.pipe(map(delta => { return { delta }; })), - ).pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: any): { [txid: string]: TransactionStripped } => { - if (change.transactions) { - const txMap = {} + this.liveMempoolBlockTransactions$ = this.mempoolBlockUpdate$.pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: MempoolBlockUpdate): { [txid: string]: TransactionStripped } => { + if (isMempoolState(change)) { + const txMap = {}; change.transactions.forEach(tx => { txMap[tx.txid] = tx; - }) + }); return txMap; } else { - change.delta.changed.forEach(tx => { - transactions[tx.txid].rate = tx.rate; - }) - change.delta.removed.forEach(txid => { + change.added.forEach(tx => { + transactions[tx.txid] = tx; + }); + change.removed.forEach(txid => { delete transactions[txid]; }); - change.delta.added.forEach(tx => { - transactions[tx.txid] = tx; + change.changed.forEach(tx => { + if (transactions[tx.txid]) { + transactions[tx.txid].rate = tx.rate; + transactions[tx.txid].acc = tx.acc; + } }); return transactions; } diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index 414f60bc5..e4df12aa6 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -401,14 +401,16 @@ export class WebsocketService { if (response['projected-block-transactions'].index == this.trackingMempoolBlock) { if (response['projected-block-transactions'].blockTransactions) { this.stateService.mempoolSequence = response['projected-block-transactions'].sequence; - this.stateService.mempoolBlockTransactions$.next(response['projected-block-transactions'].blockTransactions.map(uncompressTx)); + this.stateService.mempoolBlockUpdate$.next({ + transactions: response['projected-block-transactions'].blockTransactions.map(uncompressTx), + }); } else if (response['projected-block-transactions'].delta) { if (this.stateService.mempoolSequence && response['projected-block-transactions'].sequence !== this.stateService.mempoolSequence + 1) { this.stateService.mempoolSequence = 0; this.startTrackMempoolBlock(this.trackingMempoolBlock, true); } else { this.stateService.mempoolSequence = response['projected-block-transactions'].sequence; - this.stateService.mempoolBlockDelta$.next(uncompressDeltaChange(response['projected-block-transactions'].delta)); + this.stateService.mempoolBlockUpdate$.next(uncompressDeltaChange(response['projected-block-transactions'].delta)); } } }