Simplify websocket mempool delta handling
This commit is contained in:
parent
cd8abe5c06
commit
a8868b5f0f
@ -1,11 +1,10 @@
|
|||||||
import { Component, ComponentRef, ViewChild, HostListener, Input, Output, EventEmitter,
|
import { Component, ViewChild, Input, Output, EventEmitter,
|
||||||
OnInit, OnDestroy, OnChanges, ChangeDetectionStrategy, ChangeDetectorRef, AfterViewInit } from '@angular/core';
|
OnInit, OnDestroy, OnChanges, ChangeDetectionStrategy, ChangeDetectorRef, AfterViewInit } from '@angular/core';
|
||||||
import { StateService } from '../../services/state.service';
|
import { StateService } from '../../services/state.service';
|
||||||
import { MempoolBlockDelta } from '../../interfaces/websocket.interface';
|
import { MempoolBlockDelta, isMempoolDelta } from '../../interfaces/websocket.interface';
|
||||||
import { TransactionStripped } from '../../interfaces/node-api.interface';
|
import { TransactionStripped } from '../../interfaces/node-api.interface';
|
||||||
import { BlockOverviewGraphComponent } from '../../components/block-overview-graph/block-overview-graph.component';
|
import { BlockOverviewGraphComponent } from '../../components/block-overview-graph/block-overview-graph.component';
|
||||||
import { Subscription, BehaviorSubject, merge, of, timer } from 'rxjs';
|
import { Subscription, BehaviorSubject } from 'rxjs';
|
||||||
import { switchMap, filter, concatMap, map } from 'rxjs/operators';
|
|
||||||
import { WebsocketService } from '../../services/websocket.service';
|
import { WebsocketService } from '../../services/websocket.service';
|
||||||
import { RelativeUrlPipe } from '../../shared/pipes/relative-url/relative-url.pipe';
|
import { RelativeUrlPipe } from '../../shared/pipes/relative-url/relative-url.pipe';
|
||||||
import { Router } from '@angular/router';
|
import { Router } from '@angular/router';
|
||||||
@ -39,10 +38,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
poolDirection: string = 'left';
|
poolDirection: string = 'left';
|
||||||
|
|
||||||
blockSub: Subscription;
|
blockSub: Subscription;
|
||||||
rateLimit = 1000;
|
|
||||||
private lastEventTime = Date.now() - this.rateLimit;
|
|
||||||
private subId = 0;
|
|
||||||
|
|
||||||
firstLoad: boolean = true;
|
firstLoad: boolean = true;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@ -62,39 +57,13 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
}
|
}
|
||||||
|
|
||||||
ngAfterViewInit(): void {
|
ngAfterViewInit(): void {
|
||||||
this.blockSub = merge(
|
this.blockSub = this.stateService.mempoolBlockUpdate$.subscribe((update) => {
|
||||||
this.stateService.mempoolBlockTransactions$,
|
|
||||||
this.stateService.mempoolBlockDelta$,
|
|
||||||
).pipe(
|
|
||||||
concatMap(update => {
|
|
||||||
const now = Date.now();
|
|
||||||
const timeSinceLastEvent = now - this.lastEventTime;
|
|
||||||
this.lastEventTime = Math.max(now, this.lastEventTime + this.rateLimit);
|
|
||||||
|
|
||||||
const subId = this.subId;
|
|
||||||
|
|
||||||
// If time since last event is less than X seconds, delay this event
|
|
||||||
if (timeSinceLastEvent < this.rateLimit) {
|
|
||||||
return timer(this.rateLimit - timeSinceLastEvent).pipe(
|
|
||||||
// Emit the event after the timer
|
|
||||||
map(() => ({ update, subId }))
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// If enough time has passed, emit the event immediately
|
|
||||||
return of({ update, subId });
|
|
||||||
}
|
|
||||||
})
|
|
||||||
).subscribe(({ update, subId }) => {
|
|
||||||
// discard stale updates after a block transition
|
|
||||||
if (subId !== this.subId) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// process update
|
// process update
|
||||||
if (update['added']) {
|
if (isMempoolDelta(update)) {
|
||||||
// delta
|
// delta
|
||||||
this.updateBlock(update as MempoolBlockDelta);
|
this.updateBlock(update);
|
||||||
} else {
|
} else {
|
||||||
const transactionsStripped = update as TransactionStripped[];
|
const transactionsStripped = update.transactions;
|
||||||
// new transactions
|
// new transactions
|
||||||
if (this.firstLoad) {
|
if (this.firstLoad) {
|
||||||
this.replaceBlock(transactionsStripped);
|
this.replaceBlock(transactionsStripped);
|
||||||
@ -137,7 +106,6 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
|
|
||||||
ngOnChanges(changes): void {
|
ngOnChanges(changes): void {
|
||||||
if (changes.index) {
|
if (changes.index) {
|
||||||
this.subId++;
|
|
||||||
this.firstLoad = true;
|
this.firstLoad = true;
|
||||||
if (this.blockGraph) {
|
if (this.blockGraph) {
|
||||||
this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection);
|
this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection);
|
||||||
|
@ -75,6 +75,16 @@ export interface MempoolBlockDelta {
|
|||||||
removed: string[];
|
removed: string[];
|
||||||
changed: { txid: string, rate: number, flags: number, acc: boolean }[];
|
changed: { txid: string, rate: number, flags: number, acc: boolean }[];
|
||||||
}
|
}
|
||||||
|
export interface MempoolBlockState {
|
||||||
|
transactions: TransactionStripped[];
|
||||||
|
}
|
||||||
|
export type MempoolBlockUpdate = MempoolBlockDelta | MempoolBlockState;
|
||||||
|
export function isMempoolState(update: MempoolBlockUpdate): update is MempoolBlockState {
|
||||||
|
return update['transactions'] !== undefined;
|
||||||
|
}
|
||||||
|
export function isMempoolDelta(update: MempoolBlockUpdate): update is MempoolBlockDelta {
|
||||||
|
return update['transactions'] === undefined;
|
||||||
|
}
|
||||||
|
|
||||||
export interface MempoolBlockDeltaCompressed {
|
export interface MempoolBlockDeltaCompressed {
|
||||||
added: TransactionCompressed[];
|
added: TransactionCompressed[];
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core';
|
import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core';
|
||||||
import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs';
|
import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs';
|
||||||
import { Transaction } from '../interfaces/electrs.interface';
|
import { Transaction } from '../interfaces/electrs.interface';
|
||||||
import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo } from '../interfaces/websocket.interface';
|
import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface';
|
||||||
import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
|
import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
|
||||||
import { Router, NavigationStart } from '@angular/router';
|
import { Router, NavigationStart } from '@angular/router';
|
||||||
import { isPlatformBrowser } from '@angular/common';
|
import { isPlatformBrowser } from '@angular/common';
|
||||||
@ -127,8 +127,7 @@ export class StateService {
|
|||||||
bsqPrice$ = new ReplaySubject<number>(1);
|
bsqPrice$ = new ReplaySubject<number>(1);
|
||||||
mempoolInfo$ = new ReplaySubject<MempoolInfo>(1);
|
mempoolInfo$ = new ReplaySubject<MempoolInfo>(1);
|
||||||
mempoolBlocks$ = new ReplaySubject<MempoolBlock[]>(1);
|
mempoolBlocks$ = new ReplaySubject<MempoolBlock[]>(1);
|
||||||
mempoolBlockTransactions$ = new Subject<TransactionStripped[]>();
|
mempoolBlockUpdate$ = new Subject<MempoolBlockUpdate>();
|
||||||
mempoolBlockDelta$ = new Subject<MempoolBlockDelta>();
|
|
||||||
liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>;
|
liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>;
|
||||||
txConfirmed$ = new Subject<[string, BlockExtended]>();
|
txConfirmed$ = new Subject<[string, BlockExtended]>();
|
||||||
txReplaced$ = new Subject<ReplacedTransaction>();
|
txReplaced$ = new Subject<ReplacedTransaction>();
|
||||||
@ -215,25 +214,25 @@ export class StateService {
|
|||||||
this.router.navigate(['/tracker/' + window.location.pathname.slice(4)]);
|
this.router.navigate(['/tracker/' + window.location.pathname.slice(4)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.liveMempoolBlockTransactions$ = merge(
|
this.liveMempoolBlockTransactions$ = this.mempoolBlockUpdate$.pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: MempoolBlockUpdate): { [txid: string]: TransactionStripped } => {
|
||||||
this.mempoolBlockTransactions$.pipe(map(transactions => { return { transactions }; })),
|
if (isMempoolState(change)) {
|
||||||
this.mempoolBlockDelta$.pipe(map(delta => { return { delta }; })),
|
const txMap = {};
|
||||||
).pipe(scan((transactions: { [txid: string]: TransactionStripped }, change: any): { [txid: string]: TransactionStripped } => {
|
|
||||||
if (change.transactions) {
|
|
||||||
const txMap = {}
|
|
||||||
change.transactions.forEach(tx => {
|
change.transactions.forEach(tx => {
|
||||||
txMap[tx.txid] = tx;
|
txMap[tx.txid] = tx;
|
||||||
})
|
});
|
||||||
return txMap;
|
return txMap;
|
||||||
} else {
|
} else {
|
||||||
change.delta.changed.forEach(tx => {
|
change.added.forEach(tx => {
|
||||||
transactions[tx.txid].rate = tx.rate;
|
transactions[tx.txid] = tx;
|
||||||
})
|
});
|
||||||
change.delta.removed.forEach(txid => {
|
change.removed.forEach(txid => {
|
||||||
delete transactions[txid];
|
delete transactions[txid];
|
||||||
});
|
});
|
||||||
change.delta.added.forEach(tx => {
|
change.changed.forEach(tx => {
|
||||||
transactions[tx.txid] = tx;
|
if (transactions[tx.txid]) {
|
||||||
|
transactions[tx.txid].rate = tx.rate;
|
||||||
|
transactions[tx.txid].acc = tx.acc;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
@ -401,14 +401,16 @@ export class WebsocketService {
|
|||||||
if (response['projected-block-transactions'].index == this.trackingMempoolBlock) {
|
if (response['projected-block-transactions'].index == this.trackingMempoolBlock) {
|
||||||
if (response['projected-block-transactions'].blockTransactions) {
|
if (response['projected-block-transactions'].blockTransactions) {
|
||||||
this.stateService.mempoolSequence = response['projected-block-transactions'].sequence;
|
this.stateService.mempoolSequence = response['projected-block-transactions'].sequence;
|
||||||
this.stateService.mempoolBlockTransactions$.next(response['projected-block-transactions'].blockTransactions.map(uncompressTx));
|
this.stateService.mempoolBlockUpdate$.next({
|
||||||
|
transactions: response['projected-block-transactions'].blockTransactions.map(uncompressTx),
|
||||||
|
});
|
||||||
} else if (response['projected-block-transactions'].delta) {
|
} else if (response['projected-block-transactions'].delta) {
|
||||||
if (this.stateService.mempoolSequence && response['projected-block-transactions'].sequence !== this.stateService.mempoolSequence + 1) {
|
if (this.stateService.mempoolSequence && response['projected-block-transactions'].sequence !== this.stateService.mempoolSequence + 1) {
|
||||||
this.stateService.mempoolSequence = 0;
|
this.stateService.mempoolSequence = 0;
|
||||||
this.startTrackMempoolBlock(this.trackingMempoolBlock, true);
|
this.startTrackMempoolBlock(this.trackingMempoolBlock, true);
|
||||||
} else {
|
} else {
|
||||||
this.stateService.mempoolSequence = response['projected-block-transactions'].sequence;
|
this.stateService.mempoolSequence = response['projected-block-transactions'].sequence;
|
||||||
this.stateService.mempoolBlockDelta$.next(uncompressDeltaChange(response['projected-block-transactions'].delta));
|
this.stateService.mempoolBlockUpdate$.next(uncompressDeltaChange(response['projected-block-transactions'].delta));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user