From 3ffc4956f4a0f781c661dd5e8b798b2e6bcc3f91 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 31 May 2022 21:36:42 +0000 Subject: [PATCH] Stream projected block deltas instead of full data --- backend/src/api/mempool-blocks.ts | 48 ++++++++++- backend/src/api/websocket-handler.ts | 21 +++-- backend/src/mempool.interfaces.ts | 5 ++ .../mempool-block-overview/block-scene.ts | 76 ++++++++++------- .../mempool-block-overview.component.ts | 81 +++++++------------ .../src/app/interfaces/websocket.interface.ts | 5 ++ frontend/src/app/services/state.service.ts | 3 +- .../src/app/services/websocket.service.ts | 6 +- 8 files changed, 156 insertions(+), 89 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index ce85be436..d202c3a44 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,10 +1,11 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, MempoolBlockWithTransactions } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; class MempoolBlocks { private mempoolBlocks: MempoolBlockWithTransactions[] = []; + private mempoolBlockDeltas: MempoolBlockDelta[] = []; constructor() {} @@ -25,6 +26,10 @@ class MempoolBlocks { return this.mempoolBlocks; } + public getMempoolBlockDeltas(): MempoolBlockDelta[] { + return this.mempoolBlockDeltas + } + public updateMempoolBlocks(memPool: { [txid: string]: TransactionExtended }): void { const latestMempool = memPool; const memPoolArray: TransactionExtended[] = []; @@ -66,11 +71,14 @@ class MempoolBlocks { const time = end - start; logger.debug('Mempool blocks calculated in ' + time / 1000 + ' seconds'); - this.mempoolBlocks = this.calculateMempoolBlocks(memPoolArray); + const { blocks, deltas } = this.calculateMempoolBlocks(memPoolArray, this.mempoolBlocks); + this.mempoolBlocks = blocks + this.mempoolBlockDeltas = deltas } - private calculateMempoolBlocks(transactionsSorted: TransactionExtended[]): MempoolBlockWithTransactions[] { + private calculateMempoolBlocks(transactionsSorted: TransactionExtended[], prevBlocks: MempoolBlockWithTransactions[]): { blocks: MempoolBlockWithTransactions[], deltas: MempoolBlockDelta[] } { const mempoolBlocks: MempoolBlockWithTransactions[] = []; + const mempoolBlockDeltas: MempoolBlockDelta[] = []; let blockWeight = 0; let blockSize = 0; let transactions: TransactionExtended[] = []; @@ -90,7 +98,39 @@ class MempoolBlocks { if (transactions.length) { mempoolBlocks.push(this.dataToMempoolBlocks(transactions, blockSize, blockWeight, mempoolBlocks.length)); } - return mempoolBlocks; + // Calculate change from previous block states + for (let i = 0; i < Math.max(mempoolBlocks.length, prevBlocks.length); i++) { + let added: TransactionStripped[] = [] + let removed: string[] = [] + if (mempoolBlocks[i] && !prevBlocks[i]) { + added = mempoolBlocks[i].transactions + } else if (!mempoolBlocks[i] && prevBlocks[i]) { + removed = prevBlocks[i].transactions.map(tx => tx.txid) + } else if (mempoolBlocks[i] && prevBlocks[i]) { + const prevIds = {} + const newIds = {} + prevBlocks[i].transactions.forEach(tx => { + prevIds[tx.txid] = true + }) + mempoolBlocks[i].transactions.forEach(tx => { + newIds[tx.txid] = true + }) + prevBlocks[i].transactions.forEach(tx => { + if (!newIds[tx.txid]) removed.push(tx.txid) + }) + mempoolBlocks[i].transactions.forEach(tx => { + if (!prevIds[tx.txid]) added.push(tx) + }) + } + mempoolBlockDeltas.push({ + added, + removed + }) + } + return { + blocks: mempoolBlocks, + deltas: mempoolBlockDeltas + } } private dataToMempoolBlocks(transactions: TransactionExtended[], diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index b85197230..341b09e7f 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -1,7 +1,6 @@ import logger from '../logger'; import * as WebSocket from 'ws'; -import { BlockExtended, TransactionExtended, WebsocketResponse, MempoolBlock, - OptimizedStatistic, ILoadingIndicators, IConversionRates } from '../mempool.interfaces'; +import { BlockExtended, TransactionExtended, WebsocketResponse, MempoolBlock, MempoolBlockDelta, OptimizedStatistic, ILoadingIndicators, IConversionRates } from '../mempool.interfaces'; import blocks from './blocks'; import memPool from './mempool'; import backendInfo from './backend-info'; @@ -249,7 +248,7 @@ class WebsocketHandler { mempoolBlocks.updateMempoolBlocks(newMempool); const mBlocks = mempoolBlocks.getMempoolBlocks(); - const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions(); + const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); const mempoolInfo = memPool.getMempoolInfo(); const vBytesPerSecond = memPool.getVBytesPerSecond(); const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); @@ -389,10 +388,10 @@ class WebsocketHandler { if (client['track-mempool-block'] >= 0) { const index = client['track-mempool-block']; - if (mBlocksWithTransactions[index]) { + if (mBlockDeltas[index]) { response['projected-mempool-block'] = { index: index, - block: mBlocksWithTransactions[index], + delta: mBlockDeltas[index], }; } } @@ -409,6 +408,7 @@ class WebsocketHandler { } let mBlocks: undefined | MempoolBlock[]; + let mBlockDeltas: undefined | MempoolBlockDelta[]; let matchRate = 0; const _memPool = memPool.getMempool(); const _mempoolBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); @@ -425,6 +425,7 @@ class WebsocketHandler { matchRate = Math.round((matches.length / (txIds.length - 1)) * 100); mempoolBlocks.updateMempoolBlocks(_memPool); mBlocks = mempoolBlocks.getMempoolBlocks(); + mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); } if (block.extras) { @@ -522,6 +523,16 @@ class WebsocketHandler { } } + if (client['track-mempool-block'] >= 0) { + const index = client['track-mempool-block']; + if (mBlockDeltas && mBlockDeltas[index]) { + response['projected-mempool-block'] = { + index: index, + delta: mBlockDeltas[index], + }; + } + } + client.send(JSON.stringify(response)); }); } diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index f35aba16c..8d0fa6972 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -36,6 +36,11 @@ export interface MempoolBlockWithTransactions extends MempoolBlock { transactions: TransactionStripped[]; } +export interface MempoolBlockDelta { + added: TransactionStripped[]; + removed: string[]; +} + interface VinStrippedToScriptsig { scriptsig: string; } diff --git a/frontend/src/app/components/mempool-block-overview/block-scene.ts b/frontend/src/app/components/mempool-block-overview/block-scene.ts index fcff47840..825569100 100644 --- a/frontend/src/app/components/mempool-block-overview/block-scene.ts +++ b/frontend/src/app/components/mempool-block-overview/block-scene.ts @@ -1,9 +1,12 @@ +import { FastVertexArray } from './fast-vertex-array' import TxSprite from './tx-sprite' import TxView from './tx-view' +import { TransactionStripped } from 'src/app/interfaces/websocket.interface'; import { Position, Square } from './sprite-types' export default class BlockScene { scene: { count: number, offset: { x: number, y: number}}; + vertexArray: FastVertexArray; txs: { [key: string]: TxView }; width: number; height: number; @@ -17,8 +20,8 @@ export default class BlockScene { layout: BlockLayout; dirty: boolean; - constructor ({ width, height, resolution, blockLimit }: { width: number, height: number, resolution: number, blockLimit: number}) { - this.init({ width, height, resolution, blockLimit }) + constructor ({ width, height, resolution, blockLimit, vertexArray }: { width: number, height: number, resolution: number, blockLimit: number, vertexArray: FastVertexArray }) { + this.init({ width, height, resolution, blockLimit, vertexArray }) } destroy (): void { @@ -37,55 +40,70 @@ export default class BlockScene { } // Animate new block entering scene - enter (txs: TxView[], direction) { - this.replace(txs, [], direction) + enter (txs: TransactionStripped[], direction) { + this.replace(txs, direction) } // Animate block leaving scene - exit (direction: string): TxView[] { - const removed = [] + exit (direction: string): void { const startTime = performance.now() - Object.values(this.txs).forEach(tx => { - this.remove(tx.txid, startTime, direction) - removed.push(tx) - }) - return removed - } - - // Reset layout and replace with new set of transactions - replace (txs: TxView[], remove: TxView[], direction: string = 'left'): void { - const startTime = performance.now() - this.removeBatch(remove.map(tx => tx.txid), startTime, direction) + const removed = this.removeBatch(Object.keys(this.txs), startTime, direction) // clean up sprites setTimeout(() => { - remove.forEach(tx => { + removed.forEach(tx => { + tx.destroy() + }) + }, 2000) + } + + // Reset layout and replace with new set of transactions + replace (txs: TransactionStripped[], direction: string = 'left'): void { + const startTime = performance.now() + const nextIds = {} + const remove = [] + txs.forEach(tx => { + nextIds[tx.txid] = true + }) + Object.keys(this.txs).forEach(txid => { + if (!nextIds[txid]) remove.push(txid) + }) + txs.forEach(tx => { + if (!this.txs[tx.txid]) this.txs[tx.txid] = new TxView(tx, this.vertexArray) + }) + + const removed = this.removeBatch(remove, startTime, direction) + + // clean up sprites + setTimeout(() => { + removed.forEach(tx => { tx.destroy() }) }, 1000) this.layout = new BlockLayout({ width: this.gridWidth, height: this.gridHeight }) - txs.sort((a,b) => { return b.feerate - a.feerate }).forEach(tx => { - this.insert(tx, startTime, direction) + Object.values(this.txs).sort((a,b) => { return b.feerate - a.feerate }).forEach(tx => { + this.place(tx) }) + + this.updateAll(startTime, direction) } - update (add: TxView[], remove: TxView[], direction: string = 'left'): void { + update (add: TransactionStripped[], remove: string[], direction: string = 'left'): void { const startTime = performance.now() - this.removeBatch(remove.map(tx => tx.txid), startTime, direction) + const removed = this.removeBatch(remove, startTime, direction) // clean up sprites setTimeout(() => { - remove.forEach(tx => { + removed.forEach(tx => { tx.destroy() }) }, 1000) // try to insert new txs directly const remaining = [] - add = add.sort((a,b) => { return b.feerate - a.feerate }) - add.forEach(tx => { + add.map(tx => new TxView(tx, this.vertexArray)).sort((a,b) => { return b.feerate - a.feerate }).forEach(tx => { if (!this.tryInsertByFee(tx)) { remaining.push(tx) } @@ -106,7 +124,9 @@ export default class BlockScene { } else return null } - private init ({ width, height, resolution, blockLimit }: { width: number, height: number, resolution: number, blockLimit: number}): void { + private init ({ width, height, resolution, blockLimit, vertexArray }: { width: number, height: number, resolution: number, blockLimit: number, vertexArray: FastVertexArray }): void { + this.vertexArray = vertexArray + this.scene = { count: 0, offset: { @@ -300,11 +320,11 @@ export default class BlockScene { } } - private removeBatch (ids: string[], startTime: number, direction: string = 'left'): (TxView | void)[] { + private removeBatch (ids: string[], startTime: number, direction: string = 'left'): TxView[] { if (!startTime) startTime = performance.now() return ids.map(id => { return this.remove(id, startTime, direction) - }).filter(tx => !!tx) + }).filter(tx => tx != null) as TxView[] } } diff --git a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts index 9c1a4d7cf..4edbbb49e 100644 --- a/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts +++ b/frontend/src/app/components/mempool-block-overview/mempool-block-overview.component.ts @@ -1,6 +1,6 @@ import { Component, ElementRef, ViewChild, HostListener, Input, Output, EventEmitter, OnInit, OnDestroy, OnChanges, ChangeDetectionStrategy, NgZone } from '@angular/core'; import { StateService } from 'src/app/services/state.service'; -import { MempoolBlockWithTransactions, TransactionStripped } from 'src/app/interfaces/websocket.interface'; +import { MempoolBlockWithTransactions, MempoolBlockDelta, TransactionStripped } from 'src/app/interfaces/websocket.interface'; import { Observable, Subscription } from 'rxjs'; import { WebsocketService } from 'src/app/services/websocket.service'; import { FastVertexArray } from './fast-vertex-array'; @@ -29,14 +29,13 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang vertexArray: FastVertexArray; running: boolean; scene: BlockScene; - txViews: { [key: string]: TxView }; hoverTx: TxView | void; selectedTx: TxView | void; lastBlockHeight: number; blockIndex: number; - sub: Subscription; - mempoolBlock$: Observable; + blockSub: Subscription; + deltaSub: Subscription; constructor( public stateService: StateService, @@ -44,14 +43,14 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang readonly _ngZone: NgZone, ) { this.vertexArray = new FastVertexArray(512, TxSprite.dataSize) - this.txViews = {} } ngOnInit(): void { - this.websocketService.startTrackMempoolBlock(this.index); - this.mempoolBlock$ = this.stateService.mempoolBlock$ - this.sub = this.mempoolBlock$.subscribe((block) => { - this.updateBlock(block) + this.blockSub = this.stateService.mempoolBlock$.subscribe((block) => { + this.replaceBlock(block) + }) + this.deltaSub = this.stateService.mempoolBlockDelta$.subscribe((delta) => { + this.updateBlock(delta) }) } @@ -66,65 +65,47 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang ngOnChanges(changes): void { if (changes.index) { - this.clearBlock(changes.index.currentValue) this.websocketService.startTrackMempoolBlock(changes.index.currentValue); } } ngOnDestroy(): void { - this.sub.unsubscribe(); + this.blockSub.unsubscribe(); + this.deltaSub.unsubscribe(); this.websocketService.stopTrackMempoolBlock(); } - clearBlock(index: number): void { - if (this.scene && index != this.blockIndex) { - const direction = (this.blockIndex == null || this.index < this.blockIndex) ? 'left' : 'right' - const removed = this.scene.exit(direction) - setTimeout(() => { - removed.forEach(tx => tx.destroy()) - }, 1000) - this.txViews = {} - this.scene = null + replaceBlock(block: MempoolBlockWithTransactions): void { + if (!this.scene) { + this.scene = new BlockScene({ width: this.displayWidth, height: this.displayHeight, resolution: 75, blockLimit: this.stateService.blockVSize, vertexArray: this.vertexArray }) } + const blockMined = (this.stateService.latestBlockHeight > this.lastBlockHeight) + if (this.blockIndex != this.index) { + const direction = (this.blockIndex == null || this.index < this.blockIndex) ? 'left' : 'right' + this.scene.exit(direction) + this.scene = new BlockScene({ width: this.displayWidth, height: this.displayHeight, resolution: 75, blockLimit: this.stateService.blockVSize, vertexArray: this.vertexArray }) + this.scene.enter(block.transactions, direction) + } else { + this.scene.replace(block.transactions, blockMined ? 'right' : 'left') + } + + this.lastBlockHeight = this.stateService.latestBlockHeight + this.blockIndex = this.index } - updateBlock(block: MempoolBlockWithTransactions): void { + updateBlock(delta: MempoolBlockDelta): void { if (!this.scene) { - this.scene = new BlockScene({ width: this.displayWidth, height: this.displayHeight, resolution: 75, blockLimit: this.stateService.blockVSize }) + this.scene = new BlockScene({ width: this.displayWidth, height: this.displayHeight, resolution: 75, blockLimit: this.stateService.blockVSize, vertexArray: this.vertexArray }) } const blockMined = (this.stateService.latestBlockHeight > this.lastBlockHeight) - const nextIds = {} - let remove = [] - let add = [] - block.transactions.forEach(tx => { - nextIds[tx.txid] = true - }) - - // List old transactions to remove - Object.keys(this.txViews).forEach(txid => { - if (!nextIds[txid]) { - remove.push(this.txViews[txid]) - delete this.txViews[txid] - } - }) - - // List new transactions to add - block.transactions.forEach(tx => { - if (!this.txViews[tx.txid]) { - const txView = new TxView(tx, this.vertexArray) - this.txViews[tx.txid] = txView - add.push(txView) - } - }) - if (this.blockIndex != this.index) { const direction = (this.blockIndex == null || this.index < this.blockIndex) ? 'left' : 'right' - this.scene.enter(Object.values(this.txViews), direction) - } else if (blockMined) { - this.scene.replace(Object.values(this.txViews), remove, 'right') + this.scene.exit(direction) + this.scene = new BlockScene({ width: this.displayWidth, height: this.displayHeight, resolution: 75, blockLimit: this.stateService.blockVSize, vertexArray: this.vertexArray }) + this.scene.enter(delta.added, direction) } else { - this.scene.update(add, remove, 'left') + this.scene.update(delta.added, delta.removed, blockMined ? 'right' : 'left') } this.lastBlockHeight = this.stateService.latestBlockHeight diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index 751f60777..d7f0addea 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -50,6 +50,11 @@ export interface MempoolBlockWithTransactions extends MempoolBlock { transactions: TransactionStripped[]; } +export interface MempoolBlockDelta { + added: TransactionStripped[], + removed: string[], +} + export interface MempoolInfo { loaded: boolean; // (boolean) True if the mempool is fully loaded size: number; // (numeric) Current tx count diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index f280cf67f..0397b53ee 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, PLATFORM_ID } from '@angular/core'; import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs'; import { Transaction } from '../interfaces/electrs.interface'; -import { IBackendInfo, MempoolBlock, MempoolBlockWithTransactions, MempoolInfo, Recommendedfees, ReplacedTransaction, TransactionStripped } from '../interfaces/websocket.interface'; +import { IBackendInfo, MempoolBlock, MempoolBlockWithTransactions, MempoolBlockDelta, MempoolInfo, Recommendedfees, ReplacedTransaction, TransactionStripped } from '../interfaces/websocket.interface'; import { BlockExtended, DifficultyAdjustment, OptimizedMempoolStats } from '../interfaces/node-api.interface'; import { Router, NavigationStart } from '@angular/router'; import { isPlatformBrowser } from '@angular/common'; @@ -81,6 +81,7 @@ export class StateService { mempoolInfo$ = new ReplaySubject(1); mempoolBlocks$ = new ReplaySubject(1); mempoolBlock$ = new Subject(); + mempoolBlockDelta$ = new Subject(); txReplaced$ = new Subject(); utxoSpent$ = new Subject(); difficultyAdjustment$ = new ReplaySubject(1); diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index b40e43530..9507ea9d5 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -311,7 +311,11 @@ export class WebsocketService { if (response['projected-mempool-block']) { if (response['projected-mempool-block'].index == this.trackingMempoolBlock) { - this.stateService.mempoolBlock$.next(response['projected-mempool-block'].block); + if (response['projected-mempool-block'].block) { + this.stateService.mempoolBlock$.next(response['projected-mempool-block'].block); + } else if (response['projected-mempool-block'].delta) { + this.stateService.mempoolBlockDelta$.next(response['projected-mempool-block'].delta); + } } }