Transaction rebroadcaster module

This commit is contained in:
Mononaut 2024-02-13 01:19:52 +00:00
parent 512f632475
commit 3ffe4e1d3d
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
14 changed files with 261 additions and 14 deletions

View File

@ -137,6 +137,10 @@
"AUDIT_START_HEIGHT": 774000,
"SERVERS": []
},
"REBROADCAST": {
"ENABLED": false,
"FREQUENCY": 3600
},
"MEMPOOL_SERVICES": {
"API": "",
"ACCELERATIONS": false

View File

@ -140,6 +140,11 @@ describe('Mempool Backend Config', () => {
SERVERS: []
});
expect(config.REBROADCAST).toStrictEqual({
ENABLED: false,
FREQUENCY: 3600
});
expect(config.MEMPOOL_SERVICES).toStrictEqual({
API: "",
ACCELERATIONS: false,

View File

@ -29,6 +29,7 @@ import websocketHandler from './websocket-handler';
import redisCache from './redis-cache';
import rbfCache from './rbf-cache';
import { calcBitsDifference } from './difficulty-adjustment';
import rebroadcaster from './rebroadcaster';
class Blocks {
private blocks: BlockExtended[] = [];
@ -974,6 +975,7 @@ class Blocks {
await redisCache.$removeTransactions(txIds);
await rbfCache.updateCache();
}
rebroadcaster.remove(txIds);
handledBlocks++;
}

View File

@ -6,6 +6,7 @@ import config from '../config';
import { Worker } from 'worker_threads';
import path from 'path';
import mempool from './mempool';
import rebroadcaster from './rebroadcaster';
const MAX_UINT32 = Math.pow(2, 32) - 1;
@ -112,6 +113,7 @@ class MempoolBlocks {
let blockWeight = 0;
let blockVsize = 0;
let blockFees = 0;
const purgeRate = mempool.getMempoolInfo().mempoolminfee * 100000;
const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2;
let transactionIds: string[] = [];
let transactions: MempoolTransactionExtended[] = [];
@ -157,6 +159,16 @@ class MempoolBlocks {
transactionIds = [tx.txid];
transactions = [tx];
}
if (tx.purged) {
if (tx.effectiveFeePerVsize >= purgeRate) {
rebroadcaster.unpurge(tx.txid);
tx.purged = false;
}
} else if (tx.effectiveFeePerVsize < purgeRate) {
rebroadcaster.purge(tx.txid);
tx.purged = true;
}
});
if (transactions.length) {
const feeStats = onlineStats ? feeStatsCalculator.getRawFeeStats() : undefined;
@ -452,12 +464,12 @@ class MempoolBlocks {
}
}
private processBlockTemplates(mempool: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], accelerations, accelerationPool, saveResults): MempoolBlockWithTransactions[] {
private processBlockTemplates(mempoolTxs: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], accelerations, accelerationPool, saveResults): MempoolBlockWithTransactions[] {
for (const [txid, rate] of rates) {
if (txid in mempool) {
mempool[txid].cpfpDirty = (rate !== mempool[txid].effectiveFeePerVsize);
mempool[txid].effectiveFeePerVsize = rate;
mempool[txid].cpfpChecked = false;
if (txid in mempoolTxs) {
mempoolTxs[txid].cpfpDirty = (rate !== mempoolTxs[txid].effectiveFeePerVsize);
mempoolTxs[txid].effectiveFeePerVsize = rate;
mempoolTxs[txid].cpfpChecked = false;
}
}
@ -469,7 +481,7 @@ class MempoolBlocks {
if (blockWeights && blockWeights[7] !== null) {
stackWeight = blockWeights[7];
} else {
stackWeight = blocks[lastBlockIndex].reduce((total, tx) => total + (mempool[tx]?.weight || 0), 0);
stackWeight = blocks[lastBlockIndex].reduce((total, tx) => total + (mempoolTxs[tx]?.weight || 0), 0);
}
hasBlockStack = stackWeight > config.MEMPOOL.BLOCK_WEIGHT_UNITS;
feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5, [10, 20, 30, 40, 50, 60, 70, 80, 90]);
@ -477,7 +489,7 @@ class MempoolBlocks {
for (const cluster of clusters) {
for (const memberTxid of cluster) {
const mempoolTx = mempool[memberTxid];
const mempoolTx = mempoolTxs[memberTxid];
if (mempoolTx) {
const ancestors: Ancestor[] = [];
const descendants: Ancestor[] = [];
@ -488,12 +500,12 @@ class MempoolBlocks {
} else {
const relative = {
txid: txid,
fee: mempool[txid].fee,
weight: (mempool[txid].adjustedVsize * 4),
fee: mempoolTxs[txid].fee,
weight: (mempoolTxs[txid].adjustedVsize * 4),
};
if (matched) {
descendants.push(relative);
mempoolTx.lastBoosted = Math.max(mempoolTx.lastBoosted || 0, mempool[txid].firstSeen || 0);
mempoolTx.lastBoosted = Math.max(mempoolTx.lastBoosted || 0, mempoolTxs[txid].firstSeen || 0);
} else {
ancestors.push(relative);
}
@ -508,6 +520,7 @@ class MempoolBlocks {
}
const isAccelerated : { [txid: string]: boolean } = {};
const purgeRate = mempool.getMempoolInfo().mempoolminfee * 100000;
const sizeLimit = (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) * 1.2;
// update this thread's mempool with the results
@ -520,7 +533,7 @@ class MempoolBlocks {
const transactions: MempoolTransactionExtended[] = [];
for (const txid of block) {
if (txid) {
mempoolTx = mempool[txid];
mempoolTx = mempoolTxs[txid];
// save position in projected blocks
mempoolTx.position = {
block: blockIndex,
@ -544,10 +557,10 @@ class MempoolBlocks {
}
mempoolTx.acceleration = true;
for (const ancestor of mempoolTx.ancestors || []) {
if (!mempool[ancestor.txid].acceleration) {
mempool[ancestor.txid].cpfpDirty = true;
if (!mempoolTxs[ancestor.txid].acceleration) {
mempoolTxs[ancestor.txid].cpfpDirty = true;
}
mempool[ancestor.txid].acceleration = true;
mempoolTxs[ancestor.txid].acceleration = true;
isAccelerated[ancestor.txid] = true;
}
} else {
@ -562,6 +575,17 @@ class MempoolBlocks {
feeStatsCalculator.processNext(mempoolTx);
}
// update purge status
if (mempoolTx.purged) {
if (mempoolTx.effectiveFeePerVsize >= purgeRate) {
rebroadcaster.unpurge(mempoolTx.txid);
mempoolTx.purged = false;
}
} else if (mempoolTx.effectiveFeePerVsize < purgeRate) {
rebroadcaster.purge(mempoolTx.txid);
mempoolTx.purged = true;
}
totalSize += mempoolTx.size;
totalVsize += mempoolTx.vsize;
totalWeight += mempoolTx.weight;

View File

@ -11,6 +11,7 @@ import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
import rbfCache from './rbf-cache';
import { Acceleration } from './services/acceleration';
import redisCache from './redis-cache';
import rebroadcaster from './rebroadcaster';
class Mempool {
private inSync: boolean = false;
@ -361,6 +362,7 @@ class Mempool {
await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid));
await rbfCache.updateCache();
}
rebroadcaster.remove(deletedTransactions.map(tx => tx.txid));
const end = new Date().getTime();
const time = end - start;

View File

@ -0,0 +1,180 @@
import config from '../config';
import logger from '../logger';
import { MempoolTransactionExtended } from '../mempool.interfaces';
import bitcoinApi from './bitcoin/bitcoin-api-factory';
import bitcoinClient from './bitcoin/bitcoin-client';
import mempool from './mempool';
import mempoolBlocks from './mempool-blocks';
/**
* Transaction Rebroadcaster
*
* Automatically rebroadcasts transactions from near the top of the mempool which peers may not know about.
*
* e.g:
* - transactions older than the default mempoolexpiry (336 hours)
* - transactions which previously fell below the default maxmempool purge rate
* - transactions we observed to be unexpectedly missing from recent mined blocks
*
* To avoid spamming relay peers, rebroadcasting is probabilistic, based on the target frequency
* set in config.REBROADCAST.FREQUENCY and a "priority" derived from the reason for rebroadcast.
*/
class Rebroadcaster {
private unpurged = new Set<string>();
private rebroadcasted = new Set<string>();
private missing = new Set<string>();
private lastRun = (Date.now() / 1000);
async $run(): Promise<void> {
if (!config.REBROADCAST.ENABLED) {
return;
}
const now = Date.now() / 1000;
const transactions = mempool.getMempool();
const blocks = mempoolBlocks.getMempoolBlocksWithTransactions();
const toRebroadcast: { txid: string, priority: number }[] = [];
const twoWeeksAgo = now - (14 * 24 * 60 * 60);
for (const block of blocks) {
for (const txid of block.transactionIds) {
const tx = transactions[txid];
if (tx && this.isRebroadcastable(tx, twoWeeksAgo)) {
if (this.unpurged.has(tx.txid) || this.missing.has(tx.txid)) {
toRebroadcast.push({ txid: tx.txid, priority: 1 });
} else {
const depth = (tx.position?.block || 0) * (config.MEMPOOL.BLOCK_WEIGHT_UNITS / 4) + (tx.position?.vsize || 0);
// priority approaches 0.5 as mempool depth approaches zero
// scaling factor ensures all txs in the next block have priority >= 0.4
const priority = 0.5 / (1 + (depth / (config.MEMPOOL.BLOCK_WEIGHT_UNITS * 2)));
toRebroadcast.push({ txid: tx.txid, priority });
}
}
}
}
const elapsed = now - this.lastRun;
// config.REBROADCAST.FREQUENCY is actually the target /period/
const probabilityFactor = elapsed / config.REBROADCAST.FREQUENCY;
let totalRebroadcast = 0;
let totalFailed = 0;
for (const tx of toRebroadcast) {
// rebroadcast with probability = priority * frequency / number of txs
const cluster = this.getAncestors(tx.txid, transactions, twoWeeksAgo);
if (Math.random() < (tx.priority * probabilityFactor / cluster.length)) {
for (const txid of cluster) {
if (await this.$rebroadcastTx(txid)) {
totalRebroadcast++;
} else {
totalFailed++;
}
}
}
}
this.lastRun = (Date.now() / 1000);
logger.debug(`${toRebroadcast.length - totalRebroadcast} candidates, ${totalRebroadcast + totalFailed} attempted, ${totalRebroadcast} successful`, logger.tags.rebroadcaster);
}
// allow rebroadcast of old, missing or previously purged transactions
// within the first 7 projected blocks, that haven't been rebroadcast before
private isRebroadcastable(tx: MempoolTransactionExtended, minAge: number): boolean {
return !!(tx.firstSeen
&& tx.position
&& tx.position.block < 7
&& ((tx.firstSeen < minAge) || this.unpurged.has(tx.txid) || this.missing.has(tx.txid))
&& !this.rebroadcasted.has(tx.txid)
);
}
private async $rebroadcastTx(txid: string): Promise<boolean> {
try {
const hex = await bitcoinApi.$getTransactionHex(txid);
if (hex) {
const txidResult = await bitcoinClient.sendRawTransaction(hex);
if (txidResult) {
this.rebroadcasted.add(txid);
return true;
}
}
} catch (e) {
logger.warn('Failed to rebroadcast transaction: ' + (e instanceof Error ? e.message : e));
}
return false;
}
// find and return a list of rebroadcastable ancestors of the given txid (including itself)
private getAncestors(txid: string, transactions: { [txid: string]: MempoolTransactionExtended }, minAge: number): string[] {
const ancestors = new Set<string>();
const skip = new Set<string>();
const stack: string[] = [txid];
let sanityBreak = 0;
while (stack.length && sanityBreak < 100) {
const nextTxid = stack.pop();
if (nextTxid) {
ancestors.add(nextTxid);
for (const vin of transactions[nextTxid].vin) {
if ( !skip.has(nextTxid)
&& !ancestors.has(nextTxid)
&& transactions[nextTxid]
&& this.isRebroadcastable(transactions[nextTxid], minAge)
) {
stack.push(vin.txid);
} else {
skip.add(nextTxid);
}
}
}
sanityBreak++;
}
return [...ancestors.keys()].reverse();
}
// transaction re-entered default mempools
public unpurge(txid: string): void {
if (!config.REBROADCAST.ENABLED) {
return;
}
this.unpurged.add(txid);
}
// transaction was purged from default mempools
public purge(txid: string): void {
if (!config.REBROADCAST.ENABLED) {
return;
}
this.unpurged.delete(txid);
this.missing.delete(txid);
this.rebroadcasted.delete(txid);
}
// transactions were unexpectedly missing from a block
public missed(txids: string[]): void {
if (!config.REBROADCAST.ENABLED) {
return;
}
for (const txid of txids) {
this.missing.add(txid);
}
}
// transactions were evicted or mined
public remove(txids: string[]): void {
if (!config.REBROADCAST.ENABLED) {
return;
}
for (const txid of txids) {
this.unpurged.delete(txid);
this.missing.delete(txid);
this.rebroadcasted.delete(txid);
}
}
}
export default new Rebroadcaster();

View File

@ -129,6 +129,7 @@ class TransactionUtils {
feePerVsize: feePerVbytes,
adjustedFeePerVsize: adjustedFeePerVsize,
effectiveFeePerVsize: adjustedFeePerVsize,
purged: false,
});
if (!transactionExtended?.status?.confirmed && !transactionExtended.firstSeen) {
transactionExtended.firstSeen = Math.round((Date.now() / 1000));

View File

@ -24,6 +24,7 @@ import { ApiPrice } from '../repositories/PricesRepository';
import accelerationApi from './services/acceleration';
import mempool from './mempool';
import statistics from './statistics/statistics';
import rebroadcaster from './rebroadcaster';
interface AddressTransactions {
mempool: MempoolTransactionExtended[],
@ -805,6 +806,8 @@ class WebsocketHandler {
block.extras.expectedWeight = totalWeight;
block.extras.similarity = similarity;
}
rebroadcaster.missed(censored);
}
} else if (block.extras) {
const mBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();

View File

@ -147,6 +147,10 @@ interface IConfig {
AUDIT_START_HEIGHT: number;
SERVERS: string[];
},
REBROADCAST: {
ENABLED: boolean;
FREQUENCY: number;
},
MEMPOOL_SERVICES: {
API: string;
ACCELERATIONS: boolean;
@ -303,6 +307,10 @@ const defaults: IConfig = {
'AUDIT_START_HEIGHT': 774000,
'SERVERS': [],
},
'REBROADCAST': {
'ENABLED': false,
'FREQUENCY': 3600,
},
'MEMPOOL_SERVICES': {
'API': '',
'ACCELERATIONS': false,
@ -331,6 +339,7 @@ class Config implements IConfig {
EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER'];
MAXMIND: IConfig['MAXMIND'];
REPLICATION: IConfig['REPLICATION'];
REBROADCAST: IConfig['REBROADCAST'];
MEMPOOL_SERVICES: IConfig['MEMPOOL_SERVICES'];
REDIS: IConfig['REDIS'];
@ -352,6 +361,7 @@ class Config implements IConfig {
this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER;
this.MAXMIND = configs.MAXMIND;
this.REPLICATION = configs.REPLICATION;
this.REBROADCAST = configs.REBROADCAST;
this.MEMPOOL_SERVICES = configs.MEMPOOL_SERVICES;
this.REDIS = configs.REDIS;
}

View File

@ -45,6 +45,7 @@ import { formatBytes, getBytesUnit } from './utils/format';
import redisCache from './api/redis-cache';
import accelerationApi from './api/services/acceleration';
import bitcoinCoreRoutes from './api/bitcoin/bitcoin-core.routes';
import rebroadcaster from './api/rebroadcaster';
class Server {
private wss: WebSocket.Server | undefined;
@ -215,6 +216,7 @@ class Server {
}
indexer.$run();
priceUpdater.$run();
rebroadcaster.$run();
// rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS
const elapsed = Date.now() - start;

View File

@ -36,6 +36,7 @@ class Logger {
mining: 'Mining',
ln: 'Lightning',
goggles: 'Goggles',
rebroadcaster: 'Rebroadcaster',
};
// @ts-ignore

View File

@ -107,6 +107,7 @@ export interface MempoolTransactionExtended extends TransactionExtended {
inputs?: number[];
lastBoosted?: number;
cpfpDirty?: boolean;
purged: boolean;
}
export interface AuditTransaction {

View File

@ -143,6 +143,10 @@
"AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__,
"SERVERS": __REPLICATION_SERVERS__
},
"REBROADCAST": {
"ENABLED": __REBROADCAST_ENABLED__,
"FREQUENCY": __REBROADCAST_FREQUENCY__
},
"MEMPOOL_SERVICES": {
"API": "__MEMPOOL_SERVICES_API__",
"ACCELERATIONS": __MEMPOOL_SERVICES_ACCELERATIONS__

View File

@ -144,6 +144,10 @@ __REPLICATION_AUDIT__=${REPLICATION_AUDIT:=true}
__REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000}
__REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]}
# REBROADCAST
__REBROADCAST_ENABLED__=${REBROADCAST_ENABLED:=false}
__REBROADCAST_FREQUENCY__=${REBROADCAST_FREQUENCY:=3600}
# MEMPOOL_SERVICES
__MEMPOOL_SERVICES_API__=${MEMPOOL_SERVICES_API:=""}
__MEMPOOL_SERVICES_ACCELERATIONS__=${MEMPOOL_SERVICES_ACCELERATIONS:=false}
@ -288,6 +292,10 @@ sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json
sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json
sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json
# REBROADCAST
sed -i "s!__REBROADCAST_ENABLED__!${__REBROADCAST_ENABLED__}!g" mempool-config.json
sed -i "s!__REBROADCAST_FREQUENCY__!${__REBROADCAST_FREQUENCY__}!g" mempool-config.json
# MEMPOOL_SERVICES
sed -i "s!__MEMPOOL_SERVICES_API__!${__MEMPOOL_SERVICES_API__}!g" mempool-config.json
sed -i "s!__MEMPOOL_SERVICES_ACCELERATIONS__!${__MEMPOOL_SERVICES_ACCELERATIONS__}!g" mempool-config.json