From 0df71123f62968176846493c563e10fff825f968 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 17 Feb 2023 17:54:29 -0600 Subject: [PATCH] Use minfee node to limit gbt input size --- backend/src/api/mempool-blocks.ts | 102 ++++++++++++++++++++------- backend/src/api/mempool.ts | 85 +++++++++++++++------- backend/src/api/websocket-handler.ts | 30 ++++++-- backend/src/mempool.interfaces.ts | 6 ++ 4 files changed, 164 insertions(+), 59 deletions(-) diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index 3c2feb0e2..09f8cc631 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -1,5 +1,5 @@ import logger from '../logger'; -import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces'; +import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, GbtCandidates } from '../mempool.interfaces'; import { Common } from './common'; import config from '../config'; import { Worker } from 'worker_threads'; @@ -147,19 +147,23 @@ class MempoolBlocks { return mempoolBlockDeltas; } - public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise { + public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false, candidates?: GbtCandidates): Promise { // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread + const txids = candidates ? Object.keys(candidates.txs) : Object.keys(newMempool); const strippedMempool: { [txid: string]: ThreadTransaction } = {}; - Object.values(newMempool).forEach(entry => { - strippedMempool[entry.txid] = { - txid: entry.txid, - fee: entry.fee, - weight: entry.weight, - feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), - vin: entry.vin.map(v => v.txid), - }; + txids.forEach(txid => { + const entry = newMempool[txid]; + if (entry) { + strippedMempool[entry.txid] = { + txid: entry.txid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + vin: entry.vin.map(v => v.txid), + }; + } }); // (re)initialize tx selection worker thread @@ -191,31 +195,49 @@ class MempoolBlocks { // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - return this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + return this.processBlockTemplates(newMempool, blocks, clusters, saveResults, candidates); } catch (e) { logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } return this.mempoolBlocks; } - public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise { + public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false, candidates?: GbtCandidates): Promise { if (!this.txSelectionWorker) { // need to reset the worker - this.makeBlockTemplates(newMempool, saveResults); + this.makeBlockTemplates(newMempool, saveResults, candidates); return; } // prepare a stripped down version of the mempool with only the minimum necessary data // to reduce the overhead of passing this data to the worker thread - const addedStripped: ThreadTransaction[] = added.map(entry => { - return { - txid: entry.txid, - fee: entry.fee, - weight: entry.weight, - feePerVsize: entry.fee / (entry.weight / 4), - effectiveFeePerVsize: entry.fee / (entry.weight / 4), - vin: entry.vin.map(v => v.txid), - }; - }); + let addedStripped: ThreadTransaction[] = []; + let removedList; + if (candidates) { + addedStripped = candidates.added.filter(txid => newMempool[txid]).map(txid => { + const entry = newMempool[txid]; + return { + txid: entry.txid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + vin: entry.vin.map(v => v.txid), + }; + }); + removedList = candidates.removed; + } else { + addedStripped = added.map(entry => { + return { + txid: entry.txid, + fee: entry.fee, + weight: entry.weight, + feePerVsize: entry.fee / (entry.weight / 4), + effectiveFeePerVsize: entry.fee / (entry.weight / 4), + vin: entry.vin.map(v => v.txid), + }; + }); + removedList = removed; + } // run the block construction algorithm in a separate thread, and wait for a result let threadErrorListener; @@ -227,19 +249,19 @@ class MempoolBlocks { }); this.txSelectionWorker?.once('error', reject); }); - this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); + this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedList }); const { blocks, clusters } = await workerResultPromise; // clean up thread error listener this.txSelectionWorker?.removeListener('error', threadErrorListener); - this.processBlockTemplates(newMempool, blocks, clusters, saveResults); + this.processBlockTemplates(newMempool, blocks, clusters, saveResults, candidates); } catch (e) { logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); } } - private processBlockTemplates(mempool, blocks, clusters, saveResults): MempoolBlockWithTransactions[] { + private processBlockTemplates(mempool: { [txid: string ]: TransactionExtended }, blocks, clusters, saveResults, candidates?: GbtCandidates): MempoolBlockWithTransactions[] { // update this thread's mempool with the results blocks.forEach(block => { block.forEach(tx => { @@ -277,6 +299,32 @@ class MempoolBlocks { }); }); + // Add purged transactions at the end, if required + if (candidates) { + const purged: string[] = []; + Object.values(mempool).forEach(tx => { + if (!candidates.txs[tx.txid]) { + purged.push(tx.txid); + } + }); + if (!blocks.length) { + blocks = [[]]; + } + let blockIndex = blocks.length - 1; + let weight = blocks[blockIndex].reduce((acc, tx) => acc + tx.weight, 0); + purged.sort((a,b) => { return mempool[b].effectiveFeePerVsize - mempool[a].effectiveFeePerVsize}); + purged.forEach(txid => { + const tx = mempool[txid]; + if ((weight + tx.weight) >= (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000) && blockIndex < 7) { + blocks.push([]); + blockIndex++; + weight = 0; + } + blocks[blockIndex].push(tx); + weight += tx.weight; + }); + } + // unpack the condensed blocks into proper mempool blocks const mempoolBlocks = blocks.map((transactions, blockIndex) => { return this.dataToMempoolBlocks(transactions.map(tx => { diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 6f8011a12..2a888af25 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -1,6 +1,6 @@ import config from '../config'; import bitcoinApi from './bitcoin/bitcoin-api-factory'; -import { TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; +import { GbtCandidates, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; import logger from '../logger'; import { Common } from './common'; import transactionUtils from './transaction-utils'; @@ -9,6 +9,7 @@ import loadingIndicators from './loading-indicators'; import bitcoinClient from './bitcoin/bitcoin-client'; import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import rbfCache from './rbf-cache'; +import blocks from './blocks'; class Mempool { private static WEBSOCKET_REFRESH_RATE_MS = 10000; @@ -16,13 +17,16 @@ class Mempool { private inSync: boolean = false; private mempoolCacheDelta: number = -1; private mempoolCache: { [txId: string]: TransactionExtended } = {}; + private mempoolCandidates: { [txid: string ]: boolean } = {}; private minFeeMempool: { [txId: string]: boolean } = {}; private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0, maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 }; + private secondMempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0, + maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 }; private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => void) | undefined; private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], - deletedTransactions: TransactionExtended[]) => Promise) | undefined; + deletedTransactions: TransactionExtended[], candidates?: GbtCandidates) => Promise) | undefined; private txPerSecondArray: number[] = []; private txPerSecond: number = 0; @@ -72,7 +76,7 @@ class Mempool { } public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; }, - newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => Promise) { + newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[], candidates?: GbtCandidates) => Promise) { this.asyncMempoolChangedCallback = fn; } @@ -86,10 +90,18 @@ class Mempool { this.mempoolChangedCallback(this.mempoolCache, [], []); } if (this.asyncMempoolChangedCallback) { - this.asyncMempoolChangedCallback(this.mempoolCache, [], []); + if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) { + this.asyncMempoolChangedCallback(this.mempoolCache, [], [], { txs: {}, added: [], removed: [] }); + } else { + this.asyncMempoolChangedCallback(this.mempoolCache, [], [], ); + } } } + public getMempoolCandidates(): { [txid: string]: boolean } { + return this.mempoolCandidates; + } + public async $updateMemPoolInfo() { this.mempoolInfo = await this.$getMempoolInfo(); } @@ -125,7 +137,7 @@ class Mempool { let hasChange: boolean = false; const currentMempoolSize = Object.keys(this.mempoolCache).length; const transactions = await bitcoinApi.$getRawMempool(); - await this.updateMinFeeMempool(); + const candidates = await this.getNextCandidates(); const diff = transactions.length - currentMempoolSize; const newTransactions: TransactionExtended[] = []; @@ -225,8 +237,8 @@ class Mempool { if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); } - if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) { - await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); + if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidates?.added.length || candidates?.removed.length)) { + await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, candidates); } const end = new Date().getTime(); @@ -238,27 +250,48 @@ class Mempool { return !this.minFeeMempool[txid]; } - private async updateMinFeeMempool() { - const minFeeTransactions = await bitcoinSecondClient.getRawMemPool(); - const minFeeTxMap = {}; - for (const txid of minFeeTransactions) { - minFeeTxMap[txid] = true; - } - const removed: string[] = []; - const added: string[] = []; - for (const txid of Object.keys(this.minFeeMempool)) { - if (!minFeeTxMap[txid]) { - removed.push(txid); - } - } - for (const txid of minFeeTransactions) { - if (!this.minFeeMempool[txid]) { - added.push(txid); + public async getNextCandidates(): Promise { + if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) { + const minFeeTransactions = await bitcoinSecondClient.getRawMemPool(); + const blockHeight = await bitcoinSecondClient.getChainTips() + .then((result: IBitcoinApi.ChainTips[]) => { + return result.find(tip => tip.status === 'active')!.height; + }); + const newCandidateTxMap = {}; + this.minFeeMempool = {}; + for (const txid of minFeeTransactions) { + if (this.mempoolCache[txid]) { + newCandidateTxMap[txid] = true; + } this.minFeeMempool[txid] = true; } - } - for (const txid of removed) { - delete this.minFeeMempool[txid]; + const removed: string[] = []; + const added: string[] = []; + // don't prematurely remove txs included in a new block + if (blockHeight > blocks.getCurrentBlockHeight()) { + for (const txid of Object.keys(this.mempoolCandidates)) { + newCandidateTxMap[txid] = true; + } + } else { + for (const txid of Object.keys(this.mempoolCandidates)) { + if (!newCandidateTxMap[txid]) { + removed.push(txid); + } + } + } + + for (const txid of Object.keys(newCandidateTxMap)) { + if (!this.mempoolCandidates[txid]) { + added.push(txid); + } + } + + this.mempoolCandidates = newCandidateTxMap; + return { + txs: this.mempoolCandidates, + added, + removed + }; } } diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index c122fb052..c690b89aa 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -2,7 +2,7 @@ import logger from '../logger'; import * as WebSocket from 'ws'; import { BlockExtended, TransactionExtended, WebsocketResponse, - OptimizedStatistic, ILoadingIndicators + OptimizedStatistic, ILoadingIndicators, GbtCandidates } from '../mempool.interfaces'; import blocks from './blocks'; import memPool from './mempool'; @@ -19,7 +19,6 @@ import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository'; import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository'; import Audit from './audit'; import { deepClone } from '../utils/clone'; -import mempool from './mempool'; import priceUpdater from '../tasks/price-updater'; import { ApiPrice } from '../repositories/PricesRepository'; @@ -251,13 +250,14 @@ class WebsocketHandler { } async handleMempoolChange(newMempool: { [txid: string]: TransactionExtended }, - newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): Promise { + newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[], + candidates?: GbtCandidates): Promise { if (!this.wss) { throw new Error('WebSocket.Server is not set'); } if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { - await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true); + await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true, candidates); } else { mempoolBlocks.updateMempoolBlocks(newMempool, true); } @@ -428,6 +428,7 @@ class WebsocketHandler { } const _memPool = memPool.getMempool(); + const candidateTxs = await memPool.getMempoolCandidates(); if (config.MEMPOOL.AUDIT) { let projectedBlocks; @@ -435,7 +436,15 @@ class WebsocketHandler { // a cloned copy of the mempool if we're running a different algorithm for mempool updates const auditMempool = (config.MEMPOOL.ADVANCED_GBT_AUDIT === config.MEMPOOL.ADVANCED_GBT_MEMPOOL) ? _memPool : deepClone(_memPool); if (config.MEMPOOL.ADVANCED_GBT_AUDIT) { - projectedBlocks = await mempoolBlocks.makeBlockTemplates(auditMempool, false); + let candidates; + if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) { + candidates = { + txs: candidateTxs, + added: [], + removed: [], + }; + } + projectedBlocks = await mempoolBlocks.makeBlockTemplates(auditMempool, false, candidates); } else { projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false); } @@ -487,12 +496,21 @@ class WebsocketHandler { // Update mempool to remove transactions included in the new block for (const txId of txIds) { delete _memPool[txId]; + delete candidateTxs[txId]; removed.push(txId); rbfCache.evict(txId); } if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { - await mempoolBlocks.updateBlockTemplates(_memPool, [], removed, true); + let candidates; + if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) { + candidates = { + txs: candidateTxs, + added: [], + removed: removed, + }; + } + await mempoolBlocks.updateBlockTemplates(_memPool, [], removed, true, candidates); } else { mempoolBlocks.updateMempoolBlocks(_memPool, true); } diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 9961632c3..b16122a7c 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -100,6 +100,12 @@ export interface AuditTransaction { modifiedNode: HeapNode; } +export interface GbtCandidates { + txs: { [txid: string ]: boolean }, + added: string[]; + removed: string[]; +} + export interface ThreadTransaction { txid: string; fee: number;