From b1d490972bc321181ad6f9b6d3a443ac8c48a870 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sun, 20 Nov 2022 16:12:39 +0900 Subject: [PATCH] refactor async mempool/block update callbacks --- backend/src/api/mempool.ts | 10 +- backend/src/api/tx-selection-worker.ts | 4 +- backend/src/api/websocket-handler.ts | 126 +++++-------------------- backend/src/index.ts | 6 +- 4 files changed, 34 insertions(+), 112 deletions(-) diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 86538f51d..584ddf816 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -20,7 +20,8 @@ class Mempool { maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 }; private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => void) | undefined; - private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }) => void) | undefined; + private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], + deletedTransactions: TransactionExtended[]) => void) | undefined; private txPerSecondArray: number[] = []; private txPerSecond: number = 0; @@ -64,7 +65,8 @@ class Mempool { this.mempoolChangedCallback = fn; } - public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; }) => void) { + public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; }, + newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => Promise) { this.asyncMempoolChangedCallback = fn; } @@ -78,7 +80,7 @@ class Mempool { this.mempoolChangedCallback(this.mempoolCache, [], []); } if (this.asyncMempoolChangedCallback) { - this.asyncMempoolChangedCallback(this.mempoolCache); + this.asyncMempoolChangedCallback(this.mempoolCache, [], []); } } @@ -196,7 +198,7 @@ class Mempool { this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); } if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) { - await this.asyncMempoolChangedCallback(this.mempoolCache); + await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); } const end = new Date().getTime(); diff --git a/backend/src/api/tx-selection-worker.ts b/backend/src/api/tx-selection-worker.ts index 09d9b9102..10f65000b 100644 --- a/backend/src/api/tx-selection-worker.ts +++ b/backend/src/api/tx-selection-worker.ts @@ -156,7 +156,8 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: // this block is full const exceededPackageTries = failures > 1000 && blockWeight > (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000); - if (exceededPackageTries && (!condenseRest || blocks.length < blockLimit - 1)) { + const queueEmpty = top >= mempoolArray.length && modified.isEmpty(); + if ((exceededPackageTries || queueEmpty) && (!condenseRest || blocks.length < blockLimit - 1)) { // construct this block if (transactions.length) { blocks.push(dataToMempoolBlocks(transactions.map(t => mempool[t.txid]), blockSize, blockWeight, blocks.length)); @@ -209,7 +210,6 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: tx.cpfpChecked = false; tx.ancestors = []; tx.bestDescendant = null; - tx.ancestors blockTransactions.push(tx); }); if (blockTransactions.length) { diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 73db85fe6..375869902 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -244,59 +244,20 @@ class WebsocketHandler { }); } - async handleAsyncMempoolChange(newMempool: { [txid: string]: TransactionExtended }): Promise { + async handleMempoolChange(newMempool: { [txid: string]: TransactionExtended }, + newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): Promise { if (!this.wss) { throw new Error('WebSocket.Server is not set'); } - if (!config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { - return; + if (config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + await mempoolBlocks.makeBlockTemplates(newMempool, 8, null, true); + } + else { + mempoolBlocks.updateMempoolBlocks(newMempool); } - - await mempoolBlocks.makeBlockTemplates(newMempool, 8, null, true); const mBlocks = mempoolBlocks.getMempoolBlocks(); const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); - - this.wss.clients.forEach(async (client) => { - if (client.readyState !== WebSocket.OPEN) { - return; - } - - const response = {}; - - if (client['want-mempool-blocks']) { - response['mempool-blocks'] = mBlocks; - } - - if (client['track-mempool-block'] >= 0) { - const index = client['track-mempool-block']; - if (mBlockDeltas[index]) { - response['projected-block-transactions'] = { - index: index, - delta: mBlockDeltas[index], - }; - } - } - - if (Object.keys(response).length) { - client.send(JSON.stringify(response)); - } - }); - } - - handleMempoolChange(newMempool: { [txid: string]: TransactionExtended }, - newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): void { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); - } - - let mBlocks; - let mBlockDeltas; - if (!config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { - mempoolBlocks.updateMempoolBlocks(newMempool); - mBlocks = mempoolBlocks.getMempoolBlocks(); - mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); - } const mempoolInfo = memPool.getMempoolInfo(); const vBytesPerSecond = memPool.getVBytesPerSecond(); const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); @@ -319,7 +280,7 @@ class WebsocketHandler { response['fees'] = recommendedFees; } - if (client['want-mempool-blocks'] && !config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + if (client['want-mempool-blocks']) { response['mempool-blocks'] = mBlocks; } @@ -434,7 +395,7 @@ class WebsocketHandler { } } - if (client['track-mempool-block'] >= 0 && !config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + if (client['track-mempool-block'] >= 0) { const index = client['track-mempool-block']; if (mBlockDeltas[index]) { response['projected-block-transactions'] = { @@ -449,61 +410,20 @@ class WebsocketHandler { } }); } - - async handleNewAsyncBlock(block: BlockExtended, txIds: string[], transactions: TransactionExtended[]): Promise { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); - } - - if (!config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { - return; - } - - const _memPool = memPool.getMempool(); - - await mempoolBlocks.makeBlockTemplates(_memPool, 2); - const mBlocks = mempoolBlocks.getMempoolBlocks(); - const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); - - this.wss.clients.forEach((client) => { - if (client.readyState !== WebSocket.OPEN) { - return; - } - - if (!client['want-blocks']) { - return; - } - - const response = {}; - - if (mBlocks && client['want-mempool-blocks']) { - response['mempool-blocks'] = mBlocks; - } - - if (client['track-mempool-block'] >= 0) { - const index = client['track-mempool-block']; - if (mBlockDeltas && mBlockDeltas[index]) { - response['projected-block-transactions'] = { - index: index, - delta: mBlockDeltas[index], - }; - } - } - - client.send(JSON.stringify(response)); - }); - } - - handleNewBlock(block: BlockExtended, txIds: string[], transactions: TransactionExtended[]): void { + async handleNewBlock(block: BlockExtended, txIds: string[], transactions: TransactionExtended[]): Promise { if (!this.wss) { throw new Error('WebSocket.Server is not set'); } - let mBlocks: undefined | MempoolBlock[]; - let mBlockDeltas: undefined | MempoolBlockDelta[]; - let matchRate; const _memPool = memPool.getMempool(); + let matchRate; + + if (config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + await mempoolBlocks.makeBlockTemplates(_memPool, 2); + } else { + mempoolBlocks.updateMempoolBlocks(_memPool); + } if (Common.indexingEnabled() && memPool.isInSync()) { const projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); @@ -547,11 +467,13 @@ class WebsocketHandler { delete _memPool[txId]; } - if (!config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + if (config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + await mempoolBlocks.makeBlockTemplates(_memPool, 2); + } else { mempoolBlocks.updateMempoolBlocks(_memPool); - mBlocks = mempoolBlocks.getMempoolBlocks(); - mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); } + const mBlocks = mempoolBlocks.getMempoolBlocks(); + const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); const da = difficultyAdjustment.getDifficultyAdjustment(); const fees = feeApi.getRecommendedFee(); @@ -572,7 +494,7 @@ class WebsocketHandler { 'fees': fees, }; - if (mBlocks && client['want-mempool-blocks'] && !config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + if (mBlocks && client['want-mempool-blocks']) { response['mempool-blocks'] = mBlocks; } @@ -644,7 +566,7 @@ class WebsocketHandler { } } - if (client['track-mempool-block'] >= 0 && !config.MEMPOOL.ADVANCED_TRANSACTION_SELECTION) { + if (client['track-mempool-block'] >= 0) { const index = client['track-mempool-block']; if (mBlockDeltas && mBlockDeltas[index]) { response['projected-block-transactions'] = { diff --git a/backend/src/index.ts b/backend/src/index.ts index 2c15aa81a..09a12e200 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -216,10 +216,8 @@ class Server { websocketHandler.setupConnectionHandling(); if (config.MEMPOOL.ENABLED) { statistics.setNewStatisticsEntryCallback(websocketHandler.handleNewStatistic.bind(websocketHandler)); - blocks.setNewBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler)); - blocks.setNewAsyncBlockCallback(websocketHandler.handleNewAsyncBlock.bind(websocketHandler)); - memPool.setMempoolChangedCallback(websocketHandler.handleMempoolChange.bind(websocketHandler)); - memPool.setAsyncMempoolChangedCallback(websocketHandler.handleAsyncMempoolChange.bind(websocketHandler)); + memPool.setAsyncMempoolChangedCallback(websocketHandler.handleMempoolChange.bind(websocketHandler)); + blocks.setNewAsyncBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler)); } fiatConversion.setProgressChangedCallback(websocketHandler.handleNewConversionRates.bind(websocketHandler)); loadingIndicators.setProgressChangedCallback(websocketHandler.handleLoadingChanged.bind(websocketHandler));