From 3ffe4e1d3d99bb0d5040d962911ac6d80c2348aa Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 13 Feb 2024 01:19:52 +0000 Subject: [PATCH] Transaction rebroadcaster module --- .../__fixtures__/mempool-config.template.json | 4 + backend/src/__tests__/config.test.ts | 5 + backend/src/api/blocks.ts | 2 + backend/src/api/mempool-blocks.ts | 52 +++-- backend/src/api/mempool.ts | 2 + backend/src/api/rebroadcaster.ts | 180 ++++++++++++++++++ backend/src/api/transaction-utils.ts | 1 + backend/src/api/websocket-handler.ts | 3 + backend/src/config.ts | 10 + backend/src/index.ts | 2 + backend/src/logger.ts | 1 + backend/src/mempool.interfaces.ts | 1 + docker/backend/mempool-config.json | 4 + docker/backend/start.sh | 8 + 14 files changed, 261 insertions(+), 14 deletions(-) create mode 100644 backend/src/api/rebroadcaster.ts diff --git a/backend/src/__fixtures__/mempool-config.template.json b/backend/src/__fixtures__/mempool-config.template.json index 9ee2bd0bc..711667d5c 100644 --- a/backend/src/__fixtures__/mempool-config.template.json +++ b/backend/src/__fixtures__/mempool-config.template.json @@ -137,6 +137,10 @@ "AUDIT_START_HEIGHT": 774000, "SERVERS": [] }, + "REBROADCAST": { + "ENABLED": false, + "FREQUENCY": 3600 + }, "MEMPOOL_SERVICES": { "API": "", "ACCELERATIONS": false diff --git a/backend/src/__tests__/config.test.ts b/backend/src/__tests__/config.test.ts index 6af0ce32f..39a6d9e00 100644 --- a/backend/src/__tests__/config.test.ts +++ b/backend/src/__tests__/config.test.ts @@ -140,6 +140,11 @@ describe('Mempool Backend Config', () => { SERVERS: [] }); + expect(config.REBROADCAST).toStrictEqual({ + ENABLED: false, + FREQUENCY: 3600 + }); + expect(config.MEMPOOL_SERVICES).toStrictEqual({ API: "", ACCELERATIONS: false, diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 837bc0ee9..7dd65ce94 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -29,6 +29,7 @@ import websocketHandler from './websocket-handler'; import redisCache from './redis-cache'; import rbfCache from './rbf-cache'; import { calcBitsDifference } from './difficulty-adjustment'; +import rebroadcaster from './rebroadcaster'; class Blocks { private blocks: BlockExtended[] = []; @@ -974,6 +975,7 @@ class Blocks { await redisCache.$removeTransactions(txIds); await rbfCache.updateCache(); } + rebroadcaster.remove(txIds); handledBlocks++; } diff --git a/backend/src/api/mempool-blocks.ts b/backend/src/api/mempool-blocks.ts index b9da7d4e8..fb10c3314 100644 --- a/backend/src/api/mempool-blocks.ts +++ b/backend/src/api/mempool-blocks.ts @@ -6,6 +6,7 @@ import config from '../config'; import { Worker } from 'worker_threads'; import path from 'path'; import mempool from './mempool'; +import rebroadcaster from './rebroadcaster'; const MAX_UINT32 = Math.pow(2, 32) - 1; @@ -112,6 +113,7 @@ class MempoolBlocks { let blockWeight = 0; let blockVsize = 0; let blockFees = 0; + const purgeRate = mempool.getMempoolInfo().mempoolminfee * 100000; const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; let transactionIds: string[] = []; let transactions: MempoolTransactionExtended[] = []; @@ -157,6 +159,16 @@ class MempoolBlocks { transactionIds = [tx.txid]; transactions = [tx]; } + + if (tx.purged) { + if (tx.effectiveFeePerVsize >= purgeRate) { + rebroadcaster.unpurge(tx.txid); + tx.purged = false; + } + } else if (tx.effectiveFeePerVsize < purgeRate) { + rebroadcaster.purge(tx.txid); + tx.purged = true; + } }); if (transactions.length) { const feeStats = onlineStats ? feeStatsCalculator.getRawFeeStats() : undefined; @@ -452,12 +464,12 @@ class MempoolBlocks { } } - private processBlockTemplates(mempool: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], accelerations, accelerationPool, saveResults): MempoolBlockWithTransactions[] { + private processBlockTemplates(mempoolTxs: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], accelerations, accelerationPool, saveResults): MempoolBlockWithTransactions[] { for (const [txid, rate] of rates) { - if (txid in mempool) { - mempool[txid].cpfpDirty = (rate !== mempool[txid].effectiveFeePerVsize); - mempool[txid].effectiveFeePerVsize = rate; - mempool[txid].cpfpChecked = false; + if (txid in mempoolTxs) { + mempoolTxs[txid].cpfpDirty = (rate !== mempoolTxs[txid].effectiveFeePerVsize); + mempoolTxs[txid].effectiveFeePerVsize = rate; + mempoolTxs[txid].cpfpChecked = false; } } @@ -469,7 +481,7 @@ class MempoolBlocks { if (blockWeights && blockWeights[7] !== null) { stackWeight = blockWeights[7]; } else { - stackWeight = blocks[lastBlockIndex].reduce((total, tx) => total + (mempool[tx]?.weight || 0), 0); + stackWeight = blocks[lastBlockIndex].reduce((total, tx) => total + (mempoolTxs[tx]?.weight || 0), 0); } hasBlockStack = stackWeight > config.MEMPOOL.BLOCK_WEIGHT_UNITS; feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5, [10, 20, 30, 40, 50, 60, 70, 80, 90]); @@ -477,7 +489,7 @@ class MempoolBlocks { for (const cluster of clusters) { for (const memberTxid of cluster) { - const mempoolTx = mempool[memberTxid]; + const mempoolTx = mempoolTxs[memberTxid]; if (mempoolTx) { const ancestors: Ancestor[] = []; const descendants: Ancestor[] = []; @@ -488,12 +500,12 @@ class MempoolBlocks { } else { const relative = { txid: txid, - fee: mempool[txid].fee, - weight: (mempool[txid].adjustedVsize * 4), + fee: mempoolTxs[txid].fee, + weight: (mempoolTxs[txid].adjustedVsize * 4), }; if (matched) { descendants.push(relative); - mempoolTx.lastBoosted = Math.max(mempoolTx.lastBoosted || 0, mempool[txid].firstSeen || 0); + mempoolTx.lastBoosted = Math.max(mempoolTx.lastBoosted || 0, mempoolTxs[txid].firstSeen || 0); } else { ancestors.push(relative); } @@ -508,6 +520,7 @@ class MempoolBlocks { } const isAccelerated : { [txid: string]: boolean } = {}; + const purgeRate = mempool.getMempoolInfo().mempoolminfee * 100000; const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2; // update this thread's mempool with the results @@ -520,7 +533,7 @@ class MempoolBlocks { const transactions: MempoolTransactionExtended[] = []; for (const txid of block) { if (txid) { - mempoolTx = mempool[txid]; + mempoolTx = mempoolTxs[txid]; // save position in projected blocks mempoolTx.position = { block: blockIndex, @@ -544,10 +557,10 @@ class MempoolBlocks { } mempoolTx.acceleration = true; for (const ancestor of mempoolTx.ancestors || []) { - if (!mempool[ancestor.txid].acceleration) { - mempool[ancestor.txid].cpfpDirty = true; + if (!mempoolTxs[ancestor.txid].acceleration) { + mempoolTxs[ancestor.txid].cpfpDirty = true; } - mempool[ancestor.txid].acceleration = true; + mempoolTxs[ancestor.txid].acceleration = true; isAccelerated[ancestor.txid] = true; } } else { @@ -562,6 +575,17 @@ class MempoolBlocks { feeStatsCalculator.processNext(mempoolTx); } + // update purge status + if (mempoolTx.purged) { + if (mempoolTx.effectiveFeePerVsize >= purgeRate) { + rebroadcaster.unpurge(mempoolTx.txid); + mempoolTx.purged = false; + } + } else if (mempoolTx.effectiveFeePerVsize < purgeRate) { + rebroadcaster.purge(mempoolTx.txid); + mempoolTx.purged = true; + } + totalSize += mempoolTx.size; totalVsize += mempoolTx.vsize; totalWeight += mempoolTx.weight; diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index a5bc8407a..de64c4913 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -11,6 +11,7 @@ import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import rbfCache from './rbf-cache'; import { Acceleration } from './services/acceleration'; import redisCache from './redis-cache'; +import rebroadcaster from './rebroadcaster'; class Mempool { private inSync: boolean = false; @@ -361,6 +362,7 @@ class Mempool { await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); await rbfCache.updateCache(); } + rebroadcaster.remove(deletedTransactions.map(tx => tx.txid)); const end = new Date().getTime(); const time = end - start; diff --git a/backend/src/api/rebroadcaster.ts b/backend/src/api/rebroadcaster.ts new file mode 100644 index 000000000..5a79563bd --- /dev/null +++ b/backend/src/api/rebroadcaster.ts @@ -0,0 +1,180 @@ +import config from '../config'; +import logger from '../logger'; +import { MempoolTransactionExtended } from '../mempool.interfaces'; +import bitcoinApi from './bitcoin/bitcoin-api-factory'; +import bitcoinClient from './bitcoin/bitcoin-client'; +import mempool from './mempool'; +import mempoolBlocks from './mempool-blocks'; + +/** + * Transaction Rebroadcaster + * + * Automatically rebroadcasts transactions from near the top of the mempool which peers may not know about. + * + * e.g: + * - transactions older than the default mempoolexpiry (336 hours) + * - transactions which previously fell below the default maxmempool purge rate + * - transactions we observed to be unexpectedly missing from recent mined blocks + * + * To avoid spamming relay peers, rebroadcasting is probabilistic, based on the target frequency + * set in config.REBROADCAST.FREQUENCY and a "priority" derived from the reason for rebroadcast. + */ + +class Rebroadcaster { + private unpurged = new Set(); + private rebroadcasted = new Set(); + private missing = new Set(); + private lastRun = (Date.now() / 1000); + + async $run(): Promise { + if (!config.REBROADCAST.ENABLED) { + return; + } + + const now = Date.now() / 1000; + const transactions = mempool.getMempool(); + const blocks = mempoolBlocks.getMempoolBlocksWithTransactions(); + const toRebroadcast: { txid: string, priority: number }[] = []; + + const twoWeeksAgo = now - (14 * 24 * 60 * 60); + for (const block of blocks) { + for (const txid of block.transactionIds) { + const tx = transactions[txid]; + if (tx && this.isRebroadcastable(tx, twoWeeksAgo)) { + if (this.unpurged.has(tx.txid) || this.missing.has(tx.txid)) { + toRebroadcast.push({ txid: tx.txid, priority: 1 }); + } else { + const depth = (tx.position?.block || 0) * (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) + (tx.position?.vsize || 0); + // priority approaches 0.5 as mempool depth approaches zero + // scaling factor ensures all txs in the next block have priority >= 0.4 + const priority = 0.5 / (1 + (depth / (config.MEMPOOL.BLOCK_WEIGHT_UNITS * 2))); + toRebroadcast.push({ txid: tx.txid, priority }); + } + } + } + } + + const elapsed = now - this.lastRun; + // config.REBROADCAST.FREQUENCY is actually the target /period/ + const probabilityFactor = elapsed / config.REBROADCAST.FREQUENCY; + + let totalRebroadcast = 0; + let totalFailed = 0; + for (const tx of toRebroadcast) { + // rebroadcast with probability = priority * frequency / number of txs + const cluster = this.getAncestors(tx.txid, transactions, twoWeeksAgo); + if (Math.random() < (tx.priority * probabilityFactor / cluster.length)) { + for (const txid of cluster) { + if (await this.$rebroadcastTx(txid)) { + totalRebroadcast++; + } else { + totalFailed++; + } + } + } + } + + this.lastRun = (Date.now() / 1000); + logger.debug(`${toRebroadcast.length - totalRebroadcast} candidates, ${totalRebroadcast + totalFailed} attempted, ${totalRebroadcast} successful`, logger.tags.rebroadcaster); + } + + // allow rebroadcast of old, missing or previously purged transactions + // within the first 7 projected blocks, that haven't been rebroadcast before + private isRebroadcastable(tx: MempoolTransactionExtended, minAge: number): boolean { + return !!(tx.firstSeen + && tx.position + && tx.position.block < 7 + && ((tx.firstSeen < minAge) || this.unpurged.has(tx.txid) || this.missing.has(tx.txid)) + && !this.rebroadcasted.has(tx.txid) + ); + } + + private async $rebroadcastTx(txid: string): Promise { + try { + const hex = await bitcoinApi.$getTransactionHex(txid); + if (hex) { + const txidResult = await bitcoinClient.sendRawTransaction(hex); + if (txidResult) { + this.rebroadcasted.add(txid); + return true; + } + } + } catch (e) { + logger.warn('Failed to rebroadcast transaction: ' + (e instanceof Error ? e.message : e)); + } + return false; + } + + // find and return a list of rebroadcastable ancestors of the given txid (including itself) + private getAncestors(txid: string, transactions: { [txid: string]: MempoolTransactionExtended }, minAge: number): string[] { + const ancestors = new Set(); + const skip = new Set(); + const stack: string[] = [txid]; + let sanityBreak = 0; + while (stack.length && sanityBreak < 100) { + const nextTxid = stack.pop(); + if (nextTxid) { + ancestors.add(nextTxid); + for (const vin of transactions[nextTxid].vin) { + if ( !skip.has(nextTxid) + && !ancestors.has(nextTxid) + && transactions[nextTxid] + && this.isRebroadcastable(transactions[nextTxid], minAge) + ) { + stack.push(vin.txid); + } else { + skip.add(nextTxid); + } + } + } + sanityBreak++; + } + return [...ancestors.keys()].reverse(); + } + + // transaction re-entered default mempools + public unpurge(txid: string): void { + if (!config.REBROADCAST.ENABLED) { + return; + } + + this.unpurged.add(txid); + } + + // transaction was purged from default mempools + public purge(txid: string): void { + if (!config.REBROADCAST.ENABLED) { + return; + } + + this.unpurged.delete(txid); + this.missing.delete(txid); + this.rebroadcasted.delete(txid); + } + + // transactions were unexpectedly missing from a block + public missed(txids: string[]): void { + if (!config.REBROADCAST.ENABLED) { + return; + } + + for (const txid of txids) { + this.missing.add(txid); + } + } + + // transactions were evicted or mined + public remove(txids: string[]): void { + if (!config.REBROADCAST.ENABLED) { + return; + } + + for (const txid of txids) { + this.unpurged.delete(txid); + this.missing.delete(txid); + this.rebroadcasted.delete(txid); + } + } +} + +export default new Rebroadcaster(); \ No newline at end of file diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 6ff1c10b7..13044aa07 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -129,6 +129,7 @@ class TransactionUtils { feePerVsize: feePerVbytes, adjustedFeePerVsize: adjustedFeePerVsize, effectiveFeePerVsize: adjustedFeePerVsize, + purged: false, }); if (!transactionExtended?.status?.confirmed && !transactionExtended.firstSeen) { transactionExtended.firstSeen = Math.round((Date.now() / 1000)); diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index b78389b64..6336ca6c2 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -24,6 +24,7 @@ import { ApiPrice } from '../repositories/PricesRepository'; import accelerationApi from './services/acceleration'; import mempool from './mempool'; import statistics from './statistics/statistics'; +import rebroadcaster from './rebroadcaster'; interface AddressTransactions { mempool: MempoolTransactionExtended[], @@ -805,6 +806,8 @@ class WebsocketHandler { block.extras.expectedWeight = totalWeight; block.extras.similarity = similarity; } + + rebroadcaster.missed(censored); } } else if (block.extras) { const mBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); diff --git a/backend/src/config.ts b/backend/src/config.ts index 32a7af3df..5ca0cf25d 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -147,6 +147,10 @@ interface IConfig { AUDIT_START_HEIGHT: number; SERVERS: string[]; }, + REBROADCAST: { + ENABLED: boolean; + FREQUENCY: number; + }, MEMPOOL_SERVICES: { API: string; ACCELERATIONS: boolean; @@ -303,6 +307,10 @@ const defaults: IConfig = { 'AUDIT_START_HEIGHT': 774000, 'SERVERS': [], }, + 'REBROADCAST': { + 'ENABLED': false, + 'FREQUENCY': 3600, + }, 'MEMPOOL_SERVICES': { 'API': '', 'ACCELERATIONS': false, @@ -331,6 +339,7 @@ class Config implements IConfig { EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER']; MAXMIND: IConfig['MAXMIND']; REPLICATION: IConfig['REPLICATION']; + REBROADCAST: IConfig['REBROADCAST']; MEMPOOL_SERVICES: IConfig['MEMPOOL_SERVICES']; REDIS: IConfig['REDIS']; @@ -352,6 +361,7 @@ class Config implements IConfig { this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER; this.MAXMIND = configs.MAXMIND; this.REPLICATION = configs.REPLICATION; + this.REBROADCAST = configs.REBROADCAST; this.MEMPOOL_SERVICES = configs.MEMPOOL_SERVICES; this.REDIS = configs.REDIS; } diff --git a/backend/src/index.ts b/backend/src/index.ts index 3a8449131..e64da91c7 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -45,6 +45,7 @@ import { formatBytes, getBytesUnit } from './utils/format'; import redisCache from './api/redis-cache'; import accelerationApi from './api/services/acceleration'; import bitcoinCoreRoutes from './api/bitcoin/bitcoin-core.routes'; +import rebroadcaster from './api/rebroadcaster'; class Server { private wss: WebSocket.Server | undefined; @@ -215,6 +216,7 @@ class Server { } indexer.$run(); priceUpdater.$run(); + rebroadcaster.$run(); // rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS const elapsed = Date.now() - start; diff --git a/backend/src/logger.ts b/backend/src/logger.ts index bbd781df6..e616fe351 100644 --- a/backend/src/logger.ts +++ b/backend/src/logger.ts @@ -36,6 +36,7 @@ class Logger { mining: 'Mining', ln: 'Lightning', goggles: 'Goggles', + rebroadcaster: 'Rebroadcaster', }; // @ts-ignore diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 71612f25f..01109c05a 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -107,6 +107,7 @@ export interface MempoolTransactionExtended extends TransactionExtended { inputs?: number[]; lastBoosted?: number; cpfpDirty?: boolean; + purged: boolean; } export interface AuditTransaction { diff --git a/docker/backend/mempool-config.json b/docker/backend/mempool-config.json index 8f69fd0c1..f71293700 100644 --- a/docker/backend/mempool-config.json +++ b/docker/backend/mempool-config.json @@ -143,6 +143,10 @@ "AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__, "SERVERS": __REPLICATION_SERVERS__ }, + "REBROADCAST": { + "ENABLED": __REBROADCAST_ENABLED__, + "FREQUENCY": __REBROADCAST_FREQUENCY__ + }, "MEMPOOL_SERVICES": { "API": "__MEMPOOL_SERVICES_API__", "ACCELERATIONS": __MEMPOOL_SERVICES_ACCELERATIONS__ diff --git a/docker/backend/start.sh b/docker/backend/start.sh index ba9b99233..29e0f6549 100755 --- a/docker/backend/start.sh +++ b/docker/backend/start.sh @@ -144,6 +144,10 @@ __REPLICATION_AUDIT__=${REPLICATION_AUDIT:=true} __REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000} __REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]} +# REBROADCAST +__REBROADCAST_ENABLED__=${REBROADCAST_ENABLED:=false} +__REBROADCAST_FREQUENCY__=${REBROADCAST_FREQUENCY:=3600} + # MEMPOOL_SERVICES __MEMPOOL_SERVICES_API__=${MEMPOOL_SERVICES_API:=""} __MEMPOOL_SERVICES_ACCELERATIONS__=${MEMPOOL_SERVICES_ACCELERATIONS:=false} @@ -288,6 +292,10 @@ sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json +# REBROADCAST +sed -i "s!__REBROADCAST_ENABLED__!${__REBROADCAST_ENABLED__}!g" mempool-config.json +sed -i "s!__REBROADCAST_FREQUENCY__!${__REBROADCAST_FREQUENCY__}!g" mempool-config.json + # MEMPOOL_SERVICES sed -i "s!__MEMPOOL_SERVICES_API__!${__MEMPOOL_SERVICES_API__}!g" mempool-config.json sed -i "s!__MEMPOOL_SERVICES_ACCELERATIONS__!${__MEMPOOL_SERVICES_ACCELERATIONS__}!g" mempool-config.json