Replace acceleration API polling with websocket
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core';
|
||||
import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable, merge } from 'rxjs';
|
||||
import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs';
|
||||
import { Transaction } from '../interfaces/electrs.interface';
|
||||
import { HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockDelta, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface';
|
||||
import { BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
|
||||
import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface';
|
||||
import { Acceleration, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
|
||||
import { Router, NavigationStart } from '@angular/router';
|
||||
import { isPlatformBrowser } from '@angular/common';
|
||||
import { filter, map, scan, shareReplay } from 'rxjs/operators';
|
||||
@@ -129,6 +129,8 @@ export class StateService {
|
||||
mempoolBlocks$ = new ReplaySubject<MempoolBlock[]>(1);
|
||||
mempoolBlockUpdate$ = new Subject<MempoolBlockUpdate>();
|
||||
liveMempoolBlockTransactions$: Observable<{ [txid: string]: TransactionStripped}>;
|
||||
accelerations$ = new Subject<AccelerationDelta>();
|
||||
liveAccelerations$: Observable<Acceleration[]>;
|
||||
txConfirmed$ = new Subject<[string, BlockExtended]>();
|
||||
txReplaced$ = new Subject<ReplacedTransaction>();
|
||||
txRbfInfo$ = new Subject<RbfTree>();
|
||||
@@ -238,6 +240,24 @@ export class StateService {
|
||||
}
|
||||
}, {}));
|
||||
|
||||
// Emits the full list of pending accelerations each time it changes
|
||||
this.liveAccelerations$ = this.accelerations$.pipe(
|
||||
scan((accelerations: { [txid: string]: Acceleration }, delta: AccelerationDelta) => {
|
||||
if (delta.reset) {
|
||||
accelerations = {};
|
||||
} else {
|
||||
for (const txid of delta.removed) {
|
||||
delete accelerations[txid];
|
||||
}
|
||||
}
|
||||
for (const acc of delta.added) {
|
||||
accelerations[acc.txid] = acc;
|
||||
}
|
||||
return accelerations;
|
||||
}, {}),
|
||||
map((accMap) => Object.values(accMap).sort((a,b) => b.added - a.added))
|
||||
);
|
||||
|
||||
this.networkChanged$.subscribe((network) => {
|
||||
this.transactions$ = new BehaviorSubject<TransactionStripped[]>(null);
|
||||
this.blocksSubject$.next([]);
|
||||
|
||||
@@ -33,6 +33,7 @@ export class WebsocketService {
|
||||
private isTrackingRbfSummary = false;
|
||||
private isTrackingAddress: string | false = false;
|
||||
private isTrackingAddresses: string[] | false = false;
|
||||
private isTrackingAccelerations: boolean = false;
|
||||
private trackingMempoolBlock: number;
|
||||
private latestGitCommit = '';
|
||||
private onlineCheckTimeout: number;
|
||||
@@ -132,6 +133,9 @@ export class WebsocketService {
|
||||
if (this.isTrackingAddresses) {
|
||||
this.startTrackAddresses(this.isTrackingAddresses);
|
||||
}
|
||||
if (this.isTrackingAccelerations) {
|
||||
this.startTrackAccelerations();
|
||||
}
|
||||
this.stateService.connectionState$.next(2);
|
||||
}
|
||||
|
||||
@@ -235,6 +239,24 @@ export class WebsocketService {
|
||||
this.isTrackingRbfSummary = false;
|
||||
}
|
||||
|
||||
startTrackAccelerations() {
|
||||
this.websocketSubject.next({ 'track-accelerations': true });
|
||||
this.isTrackingAccelerations = true;
|
||||
}
|
||||
|
||||
stopTrackAccelerations() {
|
||||
if (this.isTrackingAccelerations) {
|
||||
this.websocketSubject.next({ 'track-accelerations': false });
|
||||
this.isTrackingAccelerations = false;
|
||||
}
|
||||
}
|
||||
|
||||
ensureTrackAccelerations() {
|
||||
if (!this.isTrackingAccelerations) {
|
||||
this.startTrackAccelerations();
|
||||
}
|
||||
}
|
||||
|
||||
fetchStatistics(historicalDate: string) {
|
||||
this.websocketSubject.next({ historicalDate });
|
||||
}
|
||||
@@ -416,6 +438,18 @@ export class WebsocketService {
|
||||
}
|
||||
}
|
||||
|
||||
if (response['accelerations']) {
|
||||
if (response['accelerations'].accelerations) {
|
||||
this.stateService.accelerations$.next({
|
||||
added: response['accelerations'].accelerations,
|
||||
removed: [],
|
||||
reset: true,
|
||||
});
|
||||
} else {
|
||||
this.stateService.accelerations$.next(response['accelerations']);
|
||||
}
|
||||
}
|
||||
|
||||
if (response['live-2h-chart']) {
|
||||
this.stateService.live2Chart$.next(response['live-2h-chart']);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user