diff --git a/frontend/src/app/components/block-overview-graph/block-overview-graph.component.ts b/frontend/src/app/components/block-overview-graph/block-overview-graph.component.ts index 3fee3f901..57db9bfca 100644 --- a/frontend/src/app/components/block-overview-graph/block-overview-graph.component.ts +++ b/frontend/src/app/components/block-overview-graph/block-overview-graph.component.ts @@ -81,6 +81,20 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On tooltipPosition: Position; readyNextFrame = false; + lastUpdate: number = 0; + pendingUpdate: { + count: number, + add: { [txid: string]: TransactionStripped }, + remove: { [txid: string]: string }, + change: { [txid: string]: { txid: string, rate: number | undefined, acc: boolean | undefined } }, + direction?: string, + } = { + count: 0, + add: {}, + remove: {}, + change: {}, + direction: 'left', + }; searchText: string; searchSubscription: Subscription; @@ -176,6 +190,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On destroy(): void { if (this.scene) { this.scene.destroy(); + this.clearUpdateQueue(); this.start(); } } @@ -188,6 +203,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On } this.filtersAvailable = filtersAvailable; if (this.scene) { + this.clearUpdateQueue(); this.scene.setup(transactions); this.readyNextFrame = true; this.start(); @@ -197,6 +213,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On enter(transactions: TransactionStripped[], direction: string): void { if (this.scene) { + this.clearUpdateQueue(); this.scene.enter(transactions, direction); this.start(); this.updateSearchHighlight(); @@ -205,6 +222,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On exit(direction: string): void { if (this.scene) { + this.clearUpdateQueue(); this.scene.exit(direction); this.start(); this.updateSearchHighlight(); @@ -213,13 +231,61 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On replace(transactions: TransactionStripped[], direction: string, sort: boolean = true, startTime?: number): void { if (this.scene) { + this.clearUpdateQueue(); this.scene.replace(transactions || [], direction, sort, startTime); this.start(); this.updateSearchHighlight(); } } + // collates non-urgent updates into a set of consistent pending changes + queueUpdate(add: TransactionStripped[], remove: string[], change: { txid: string, rate: number | undefined, acc: boolean | undefined }[], direction: string = 'left'): void { + for (const tx of add) { + this.pendingUpdate.add[tx.txid] = tx; + delete this.pendingUpdate.remove[tx.txid]; + delete this.pendingUpdate.change[tx.txid]; + } + for (const txid of remove) { + delete this.pendingUpdate.add[txid]; + this.pendingUpdate.remove[txid] = txid; + delete this.pendingUpdate.change[txid]; + } + for (const tx of change) { + if (this.pendingUpdate.add[tx.txid]) { + this.pendingUpdate.add[tx.txid].rate = tx.rate; + this.pendingUpdate.add[tx.txid].acc = tx.acc; + } else { + this.pendingUpdate.change[tx.txid] = tx; + } + } + this.pendingUpdate.direction = direction; + this.pendingUpdate.count++; + } + + applyQueuedUpdates(): void { + if (this.pendingUpdate.count && performance.now() > (this.lastUpdate + this.animationDuration)) { + this.update([], [], [], this.pendingUpdate?.direction); + } + } + + clearUpdateQueue(): void { + this.pendingUpdate = { + count: 0, + add: {}, + remove: {}, + change: {}, + }; + this.lastUpdate = performance.now(); + } + update(add: TransactionStripped[], remove: string[], change: { txid: string, rate: number | undefined, acc: boolean | undefined }[], direction: string = 'left', resetLayout: boolean = false): void { + // merge any pending changes into this update + this.queueUpdate(add, remove, change); + this.applyUpdate(Object.values(this.pendingUpdate.add), Object.values(this.pendingUpdate.remove), Object.values(this.pendingUpdate.change), direction, resetLayout); + this.clearUpdateQueue(); + } + + applyUpdate(add: TransactionStripped[], remove: string[], change: { txid: string, rate: number | undefined, acc: boolean | undefined }[], direction: string = 'left', resetLayout: boolean = false): void { if (this.scene) { add = add.filter(tx => !this.scene.txs[tx.txid]); remove = remove.filter(txid => this.scene.txs[txid]); @@ -230,6 +296,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On } this.scene.update(add, remove, change, direction, resetLayout); this.start(); + this.lastUpdate = performance.now(); this.updateSearchHighlight(); } } @@ -370,6 +437,7 @@ export class BlockOverviewGraphComponent implements AfterViewInit, OnDestroy, On if (!now) { now = performance.now(); } + this.applyQueuedUpdates(); // skip re-render if there's no change to the scene if (this.scene && this.gl) { /* SET UP SHADER UNIFORMS */ diff --git a/frontend/src/app/components/block-overview-graph/block-scene.ts b/frontend/src/app/components/block-overview-graph/block-scene.ts index bef907a7a..9dd76dec9 100644 --- a/frontend/src/app/components/block-overview-graph/block-scene.ts +++ b/frontend/src/app/components/block-overview-graph/block-scene.ts @@ -13,7 +13,7 @@ export default class BlockScene { theme: ThemeService; orientation: string; flip: boolean; - animationDuration: number = 900; + animationDuration: number = 1000; configAnimationOffset: number | null; animationOffset: number; highlightingEnabled: boolean; @@ -179,7 +179,7 @@ export default class BlockScene { removed.forEach(tx => { tx.destroy(); }); - }, 1000); + }, (startTime - performance.now()) + this.animationDuration + 1000); if (resetLayout) { add.forEach(tx => { @@ -239,7 +239,7 @@ export default class BlockScene { { width: number, height: number, resolution: number, blockLimit: number, animationDuration: number, animationOffset: number, orientation: string, flip: boolean, vertexArray: FastVertexArray, theme: ThemeService, highlighting: boolean, colorFunction: ((tx: TxView) => Color) | null } ): void { - this.animationDuration = animationDuration || 1000; + this.animationDuration = animationDuration || this.animationDuration || 1000; this.configAnimationOffset = animationOffset; this.animationOffset = this.configAnimationOffset == null ? (this.width * 1.4) : this.configAnimationOffset; this.orientation = orientation; diff --git a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts index 4d01bd9b9..3cea7e123 100644 --- a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts +++ b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts @@ -1,11 +1,10 @@ -import { Component, ComponentRef, ViewChild, HostListener, Input, Output, EventEmitter, +import { Component, ViewChild, Input, Output, EventEmitter, OnInit, OnDestroy, OnChanges, ChangeDetectionStrategy, ChangeDetectorRef, AfterViewInit } from '@angular/core'; import { StateService } from '../../services/state.service'; -import { MempoolBlockDelta } from '../../interfaces/websocket.interface'; +import { MempoolBlockDelta, isMempoolDelta } from '../../interfaces/websocket.interface'; import { TransactionStripped } from '../../interfaces/node-api.interface'; import { BlockOverviewGraphComponent } from '../../components/block-overview-graph/block-overview-graph.component'; -import { Subscription, BehaviorSubject, merge, of, timer } from 'rxjs'; -import { switchMap, filter, concatMap, map } from 'rxjs/operators'; +import { Subscription, BehaviorSubject } from 'rxjs'; import { WebsocketService } from '../../services/websocket.service'; import { RelativeUrlPipe } from '../../shared/pipes/relative-url/relative-url.pipe'; import { Router } from '@angular/router'; @@ -39,10 +38,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang poolDirection: string = 'left'; blockSub: Subscription; - rateLimit = 1000; - private lastEventTime = Date.now() - this.rateLimit; - private subId = 0; - firstLoad: boolean = true; constructor( @@ -62,39 +57,13 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang } ngAfterViewInit(): void { - this.blockSub = merge( - this.stateService.mempoolBlockTransactions$, - this.stateService.mempoolBlockDelta$, - ).pipe( - concatMap(update => { - const now = Date.now(); - const timeSinceLastEvent = now - this.lastEventTime; - this.lastEventTime = Math.max(now, this.lastEventTime + this.rateLimit); - - const subId = this.subId; - - // If time since last event is less than X seconds, delay this event - if (timeSinceLastEvent < this.rateLimit) { - return timer(this.rateLimit - timeSinceLastEvent).pipe( - // Emit the event after the timer - map(() => ({ update, subId })) - ); - } else { - // If enough time has passed, emit the event immediately - return of({ update, subId }); - } - }) - ).subscribe(({ update, subId }) => { - // discard stale updates after a block transition - if (subId !== this.subId) { - return; - } + this.blockSub = this.stateService.mempoolBlockUpdate$.subscribe((update) => { // process update - if (update['added']) { + if (isMempoolDelta(update)) { // delta - this.updateBlock(update as MempoolBlockDelta); + this.updateBlock(update); } else { - const transactionsStripped = update as TransactionStripped[]; + const transactionsStripped = update.transactions; // new transactions if (this.firstLoad) { this.replaceBlock(transactionsStripped); @@ -137,7 +106,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang ngOnChanges(changes): void { if (changes.index) { - this.subId++; this.firstLoad = true; if (this.blockGraph) { this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection); @@ -173,7 +141,11 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang const direction = (this.blockIndex == null || this.index < this.blockIndex) ? this.poolDirection : this.chainDirection; this.blockGraph.replace(delta.added, direction); } else { - this.blockGraph.update(delta.added, delta.removed, delta.changed || [], blockMined ? this.chainDirection : this.poolDirection, blockMined); + if (blockMined) { + this.blockGraph.update(delta.added, delta.removed, delta.changed || [], blockMined ? this.chainDirection : this.poolDirection, blockMined); + } else { + this.blockGraph.queueUpdate(delta.added, delta.removed, delta.changed || [], this.poolDirection); + } } this.lastBlockHeight = this.stateService.latestBlockHeight; diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index daf06603f..8c24979e7 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -75,6 +75,16 @@ export interface MempoolBlockDelta { removed: string[]; changed: { txid: string, rate: number, flags: number, acc: boolean }[]; } +export interface MempoolBlockState { + transactions: TransactionStripped[]; +} +export type MempoolBlockUpdate = MempoolBlockDelta | MempoolBlockState; +export function isMempoolState(update: MempoolBlockUpdate): update is MempoolBlockState { + return update['transactions'] !== undefined; +} +export function isMempoolDelta(update: MempoolBlockUpdate): update is MempoolBlockDelta { + return update['transactions'] === undefined; +} export interface MempoolBlockDeltaCompressed { added: TransactionCompressed[]; diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index 286ae5e48..3554e465e 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core'; import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs'; import { Transaction } from '../interfaces/electrs.interface'; -import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo } from '../interfaces/websocket.interface'; +import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface'; import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface'; import { Router, NavigationStart } from '@angular/router'; import { isPlatformBrowser } from '@angular/common'; @@ -127,8 +127,7 @@ export class StateService { bsqPrice$ = new ReplaySubject(1); mempoolInfo$ = new ReplaySubject(1); mempoolBlocks$ = new ReplaySubject(1); - mempoolBlockTransactions$ = new Subject(); - mempoolBlockDelta$ = new Subject(); + mempoolBlockUpdate$ = new Subject(); liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>; txConfirmed$ = new Subject<[string, BlockExtended]>(); txReplaced$ = new Subject(); @@ -215,25 +214,25 @@ export class StateService { this.router.navigate(['/tracker/' + window.location.pathname.slice(4)]); } - this.liveMempoolBlockTransactions$ = merge( - this.mempoolBlockTransactions$.pipe(map(transactions => { return { transactions }; })), - this.mempoolBlockDelta$.pipe(map(delta => { return { delta }; })), - ).pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: any): { [txid: string]: TransactionStripped } => { - if (change.transactions) { - const txMap = {} + this.liveMempoolBlockTransactions$ = this.mempoolBlockUpdate$.pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: MempoolBlockUpdate): { [txid: string]: TransactionStripped } => { + if (isMempoolState(change)) { + const txMap = {}; change.transactions.forEach(tx => { txMap[tx.txid] = tx; - }) + }); return txMap; } else { - change.delta.changed.forEach(tx => { - transactions[tx.txid].rate = tx.rate; - }) - change.delta.removed.forEach(txid => { + change.added.forEach(tx => { + transactions[tx.txid] = tx; + }); + change.removed.forEach(txid => { delete transactions[txid]; }); - change.delta.added.forEach(tx => { - transactions[tx.txid] = tx; + change.changed.forEach(tx => { + if (transactions[tx.txid]) { + transactions[tx.txid].rate = tx.rate; + transactions[tx.txid].acc = tx.acc; + } }); return transactions; } diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index 414f60bc5..e4df12aa6 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -401,14 +401,16 @@ export class WebsocketService { if (response['projected-block-transactions'].index == this.trackingMempoolBlock) { if (response['projected-block-transactions'].blockTransactions) { this.stateService.mempoolSequence = response['projected-block-transactions'].sequence; - this.stateService.mempoolBlockTransactions$.next(response['projected-block-transactions'].blockTransactions.map(uncompressTx)); + this.stateService.mempoolBlockUpdate$.next({ + transactions: response['projected-block-transactions'].blockTransactions.map(uncompressTx), + }); } else if (response['projected-block-transactions'].delta) { if (this.stateService.mempoolSequence && response['projected-block-transactions'].sequence !== this.stateService.mempoolSequence + 1) { this.stateService.mempoolSequence = 0; this.startTrackMempoolBlock(this.trackingMempoolBlock, true); } else { this.stateService.mempoolSequence = response['projected-block-transactions'].sequence; - this.stateService.mempoolBlockDelta$.next(uncompressDeltaChange(response['projected-block-transactions'].delta)); + this.stateService.mempoolBlockUpdate$.next(uncompressDeltaChange(response['projected-block-transactions'].delta)); } } }