From f8d30bf528100e7e11b19b852482a357ae10b44f Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sun, 21 Jan 2024 22:47:41 +0000 Subject: [PATCH 1/2] Add 'mempool delta' websocket subscriptions --- backend/src/api/websocket-handler.ts | 69 ++++++++++++++++++++++++++++ backend/src/mempool.interfaces.ts | 14 ++++++ 2 files changed, 83 insertions(+) diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index d4ff7efe3..9e8a4653a 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -3,6 +3,7 @@ import * as WebSocket from 'ws'; import { BlockExtended, TransactionExtended, MempoolTransactionExtended, WebsocketResponse, OptimizedStatistic, ILoadingIndicators, GbtCandidates, TxTrackingInfo, + MempoolBlockDelta, MempoolDelta, MempoolDeltaTxids } from '../mempool.interfaces'; import blocks from './blocks'; import memPool from './mempool'; @@ -364,6 +365,18 @@ class WebsocketHandler { client['track-donation'] = parsedMessage['track-donation']; } + if (parsedMessage['track-mempool-txids'] === true) { + client['track-mempool-txids'] = true; + } else if (parsedMessage['track-mempool-txids'] === false) { + delete client['track-mempool-txids']; + } + + if (parsedMessage['track-mempool'] === true) { + client['track-mempool'] = true; + } else if (parsedMessage['track-mempool'] === false) { + delete client['track-mempool']; + } + if (Object.keys(response).length) { client.send(this.serializeResponse(response)); } @@ -545,6 +558,27 @@ class WebsocketHandler { const latestTransactions = memPool.getLatestTransactions(); + const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; + for (const tx of newTransactions) { + if (rbfTransactions[tx.txid]) { + for (const replaced of rbfTransactions[tx.txid]) { + replacedTransactions.push({ replaced: replaced.txid, by: tx }); + } + } + } + const mempoolDeltaTxids: MempoolDeltaTxids = { + added: newTransactions.map(tx => tx.txid), + removed: deletedTransactions.map(tx => tx.txid), + mined: [], + replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })), + }; + const mempoolDelta: MempoolDelta = { + added: newTransactions, + removed: deletedTransactions.map(tx => tx.txid), + mined: [], + replaced: replacedTransactions, + }; + // update init data const socketDataFields = { 'mempoolInfo': mempoolInfo, @@ -847,6 +881,14 @@ class WebsocketHandler { response['rbfLatestSummary'] = getCachedResponse('rbfLatestSummary', rbfSummary); } + if (client['track-mempool-txids']) { + response['mempool-txids'] = getCachedResponse('mempool-txids', mempoolDeltaTxids); + } + + if (client['track-mempool']) { + response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta); + } + if (Object.keys(response).length) { client.send(this.serializeResponse(response)); } @@ -992,6 +1034,25 @@ class WebsocketHandler { const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions(); + const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; + for (const txid of Object.keys(rbfTransactions)) { + for (const replaced of rbfTransactions[txid].replaced) { + replacedTransactions.push({ replaced: replaced.txid, by: rbfTransactions[txid].replacedBy }); + } + } + const mempoolDeltaTxids: MempoolDeltaTxids = { + added: [], + removed: [], + mined: transactions.map(tx => tx.txid), + replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })), + }; + const mempoolDelta: MempoolDelta = { + added: [], + removed: [], + mined: transactions.map(tx => tx.txid), + replaced: replacedTransactions, + }; + const responseCache = { ...this.socketData }; function getCachedResponse(key, data): string { if (!responseCache[key]) { @@ -1185,6 +1246,14 @@ class WebsocketHandler { } } + if (client['track-mempool-txids']) { + response['mempool-txids'] = getCachedResponse('mempool-txids', mempoolDeltaTxids); + } + + if (client['track-mempool']) { + response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta); + } + if (Object.keys(response).length) { client.send(this.serializeResponse(response)); } diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 0b4b20e02..516748e9c 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -71,6 +71,20 @@ export interface MempoolBlockDelta { changed: MempoolDeltaChange[]; } +export interface MempoolDeltaTxids { + added: string[]; + removed: string[]; + mined: string[]; + replaced: { replaced: string, by: string }[]; +} + +export interface MempoolDelta { + added: MempoolTransactionExtended[]; + removed: string[]; + mined: string[]; + replaced: { replaced: string, by: TransactionExtended }[]; +} + interface VinStrippedToScriptsig { scriptsig: string; } From 5172f032e709e4e6cf7afa7f6e50131dbae2d809 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 3 May 2024 16:31:56 +0000 Subject: [PATCH 2/2] Add sequence number to track-mempool subscription messages --- backend/src/api/websocket-handler.ts | 20 ++++++++++++-------- backend/src/mempool.interfaces.ts | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 9e8a4653a..fdda3df88 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -558,6 +558,10 @@ class WebsocketHandler { const latestTransactions = memPool.getLatestTransactions(); + if (memPool.isInSync()) { + this.mempoolSequence++; + } + const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; for (const tx of newTransactions) { if (rbfTransactions[tx.txid]) { @@ -567,12 +571,14 @@ class WebsocketHandler { } } const mempoolDeltaTxids: MempoolDeltaTxids = { + sequence: this.mempoolSequence, added: newTransactions.map(tx => tx.txid), removed: deletedTransactions.map(tx => tx.txid), mined: [], replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })), }; const mempoolDelta: MempoolDelta = { + sequence: this.mempoolSequence, added: newTransactions, removed: deletedTransactions.map(tx => tx.txid), mined: [], @@ -638,10 +644,6 @@ class WebsocketHandler { const addressCache = this.makeAddressCache(newTransactions); const removedAddressCache = this.makeAddressCache(deletedTransactions); - if (memPool.isInSync()) { - this.mempoolSequence++; - } - // TODO - Fix indentation after PR is merged for (const server of this.webSocketServers) { server.clients.forEach(async (client) => { @@ -1034,6 +1036,10 @@ class WebsocketHandler { const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions(); + if (memPool.isInSync()) { + this.mempoolSequence++; + } + const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; for (const txid of Object.keys(rbfTransactions)) { for (const replaced of rbfTransactions[txid].replaced) { @@ -1041,12 +1047,14 @@ class WebsocketHandler { } } const mempoolDeltaTxids: MempoolDeltaTxids = { + sequence: this.mempoolSequence, added: [], removed: [], mined: transactions.map(tx => tx.txid), replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })), }; const mempoolDelta: MempoolDelta = { + sequence: this.mempoolSequence, added: [], removed: [], mined: transactions.map(tx => tx.txid), @@ -1061,10 +1069,6 @@ class WebsocketHandler { return responseCache[key]; } - if (memPool.isInSync()) { - this.mempoolSequence++; - } - // TODO - Fix indentation after PR is merged for (const server of this.webSocketServers) { server.clients.forEach((client) => { diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 516748e9c..0fcddc45a 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -72,6 +72,7 @@ export interface MempoolBlockDelta { } export interface MempoolDeltaTxids { + sequence: number, added: string[]; removed: string[]; mined: string[]; @@ -79,6 +80,7 @@ export interface MempoolDeltaTxids { } export interface MempoolDelta { + sequence: number, added: MempoolTransactionExtended[]; removed: string[]; mined: string[];