Compare commits

...

3 Commits

Author SHA1 Message Date
Mononaut
a62bc22549
reduce number of GBT candidates 2023-05-05 15:38:00 -07:00
Mononaut
8bbe8a976b
add missing rbf eviction 2023-05-05 15:38:00 -07:00
Mononaut
1c40c64de1
Fix new block mempool deletion race condition 2023-05-05 15:38:00 -07:00
8 changed files with 143 additions and 40 deletions

View File

@ -529,13 +529,14 @@ class Blocks {
return await BlocksRepository.$validateChain();
}
public async $updateBlocks() {
public async $updateBlocks(): Promise<number> {
// warn if this run stalls the main loop for more than 2 minutes
const timer = this.startTimer();
diskCache.lock();
let fastForwarded = false;
let handledBlocks = 0;
const blockHeightTip = await bitcoinApi.$getBlockHeightTip();
this.updateTimerProgress(timer, 'got block height tip');
@ -697,11 +698,15 @@ class Blocks {
this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
await Promise.all(callbackPromises);
this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
handledBlocks++;
}
diskCache.unlock();
this.clearTimer(timer);
return handledBlocks;
}
private startTimer() {

View File

@ -52,7 +52,7 @@ class DiskCache {
const mempool = memPool.getMempool();
const mempoolArray: TransactionExtended[] = [];
for (const tx in mempool) {
if (mempool[tx] && !mempool[tx].deleteAfter) {
if (mempool[tx]) {
mempoolArray.push(mempool[tx]);
}
}

View File

@ -1,6 +1,7 @@
import logger from '../logger';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces';
import { Common } from './common';
import Mempool from './mempool';
import config from '../config';
import { Worker } from 'worker_threads';
import path from 'path';
@ -10,6 +11,9 @@ class MempoolBlocks {
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
private txSelectionWorker: Worker | null = null;
private filteredTxs: Map<string, TransactionExtended> = new Map();
private minFee: number = 0;
constructor() {}
public getMempoolBlocks(): MempoolBlock[] {
@ -94,6 +98,7 @@ class MempoolBlocks {
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, blocks);
this.mempoolBlocks = blocks;
this.mempoolBlockDeltas = deltas;
this.updateMinFee(this.mempoolBlocks);
}
return blocks;
@ -175,18 +180,29 @@ class MempoolBlocks {
}
public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> {
// Identify txs that can't be CPFP'd
this.markRelatives(newMempool);
// Track filtered transactions
this.filteredTxs.clear();
const filterFee = this.getFilterFee();
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
Object.values(newMempool).filter(tx => !tx.deleteAfter).forEach(entry => {
strippedMempool[entry.txid] = {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
Object.values(newMempool).forEach(entry => {
if (entry.hasRelatives || entry.feePerVsize >= filterFee) {
strippedMempool[entry.txid] = {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
} else {
this.filteredTxs.set(entry.txid, entry);
}
});
// (re)initialize tx selection worker thread
@ -238,9 +254,14 @@ class MempoolBlocks {
await this.$makeBlockTemplates(newMempool, saveResults);
return;
}
this.updateMarkedRelatives(newMempool, added);
const filterDiff = this.updateFilteredTxs(newMempool, this.getFilterFee());
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const addedStripped: ThreadTransaction[] = added.map(entry => {
const addedStripped: ThreadTransaction[] = added.concat(filterDiff.included).map(entry => {
return {
txid: entry.txid,
fee: entry.fee,
@ -250,6 +271,7 @@ class MempoolBlocks {
vin: entry.vin.map(v => v.txid),
};
});
const removedIds = removed.concat(filterDiff.filtered);
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
@ -261,7 +283,7 @@ class MempoolBlocks {
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedIds });
let { blocks, clusters } = await workerResultPromise;
// filter out stale transactions
const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
@ -332,6 +354,11 @@ class MempoolBlocks {
});
});
if (blocks.length) {
const sortedFiltered = Array.from(this.filteredTxs.values()).sort((a, b) => b.feePerVsize - a.feePerVsize);
blocks[blocks.length - 1] = blocks[blocks.length - 1].concat(sortedFiltered);
}
// unpack the condensed blocks into proper mempool blocks
const mempoolBlocks = blocks.map((transactions) => {
return this.dataToMempoolBlocks(transactions.map(tx => {
@ -343,6 +370,7 @@ class MempoolBlocks {
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks);
this.mempoolBlocks = mempoolBlocks;
this.mempoolBlockDeltas = deltas;
this.updateMinFee(this.mempoolBlocks);
}
return mempoolBlocks;
@ -371,6 +399,84 @@ class MempoolBlocks {
transactions: fitTransactions.map((tx) => Common.stripTransaction(tx)),
};
}
// Mark all transactions with in-mempool relatives
private markRelatives(mempool: { [txid: string]: TransactionExtended }): void {
for (const tx of Object.values(mempool)) {
if (!tx.hasRelatives) {
let hasRelatives = false;
tx.vin.forEach(parent => {
if (mempool[parent.txid] != null) {
hasRelatives = true;
mempool[parent.txid].hasRelatives = true;
}
});
tx.hasRelatives = hasRelatives;
}
}
}
private updateMarkedRelatives(mempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[]): void {
const newFiltered: TransactionExtended[] = [];
for (const tx of added) {
if (!tx.hasRelatives) {
let hasRelatives = false;
tx.vin.forEach(parent => {
if (mempool[parent.txid] != null) {
hasRelatives = true;
mempool[parent.txid].hasRelatives = true;
}
});
tx.hasRelatives = hasRelatives;
}
}
}
private updateFilteredTxs(mempool: { [txid: string]: TransactionExtended }, filterFee: number): { filtered: string[], included: TransactionExtended[] } {
const filtered: string[] = [];
const included: TransactionExtended[] = [];
for (const tx of Object.values(mempool)) {
if (!tx.hasRelatives) {
if (tx.feePerVsize < filterFee) {
// filter out tx
if (!this.filteredTxs.has(tx.txid)) {
this.filteredTxs.set(tx.txid, tx);
filtered.push(tx.txid);
}
} else {
// include tx
if (this.filteredTxs.has(tx.txid)) {
this.filteredTxs.delete(tx.txid);
included.push(tx);
}
}
}
}
return { filtered, included };
}
private updateMinFee(mempoolBlocks: MempoolBlockWithTransactions[]): void {
let totalFeeRate = 0;
let totalSize = 0;
let count = 0;
if (mempoolBlocks.length === 8) {
const lastBlock = mempoolBlocks[mempoolBlocks.length - 1];
for (let i = 0; i < lastBlock.transactions.length && totalSize < 16_000_000; i++) {
totalFeeRate += lastBlock.transactions[i].rate || (lastBlock.transactions[i].fee / lastBlock.transactions[i].vsize);
totalSize += lastBlock.transactions[i].vsize;
count++;
}
this.minFee = count ? (totalFeeRate / count) : 1;
} else {
this.minFee = 1;
}
}
private getFilterFee(): number {
const purgingBelow = Mempool.getMempoolInfo().mempoolminfee * 100000;
const filterFee = Math.max(purgingBelow, this.minFee);
return filterFee;
}
}
export default new MempoolBlocks();

View File

@ -12,7 +12,6 @@ import rbfCache from './rbf-cache';
class Mempool {
private static WEBSOCKET_REFRESH_RATE_MS = 10000;
private static LAZY_DELETE_AFTER_SECONDS = 30;
private inSync: boolean = false;
private mempoolCacheDelta: number = -1;
private mempoolCache: { [txId: string]: TransactionExtended } = {};
@ -119,7 +118,7 @@ class Mempool {
return txTimes;
}
public async $updateMempool(): Promise<void> {
public async $updateMempool(transactions: string[]): Promise<void> {
logger.debug(`Updating mempool...`);
// warn if this run stalls the main loop for more than 2 minutes
@ -128,7 +127,6 @@ class Mempool {
const start = new Date().getTime();
let hasChange: boolean = false;
const currentMempoolSize = Object.keys(this.mempoolCache).length;
const transactions = await bitcoinApi.$getRawMempool();
this.updateTimerProgress(timer, 'got raw mempool');
const diff = transactions.length - currentMempoolSize;
const newTransactions: TransactionExtended[] = [];
@ -207,13 +205,15 @@ class Mempool {
const transactionsObject = {};
transactions.forEach((txId) => transactionsObject[txId] = true);
// Flag transactions for lazy deletion
// Delete evicted transactions from mempool
for (const tx in this.mempoolCache) {
if (!transactionsObject[tx] && !this.mempoolCache[tx].deleteAfter) {
if (!transactionsObject[tx]) {
deletedTransactions.push(this.mempoolCache[tx]);
this.mempoolCache[tx].deleteAfter = new Date().getTime() + Mempool.LAZY_DELETE_AFTER_SECONDS * 1000;
}
}
for (const tx of deletedTransactions) {
delete this.mempoolCache[tx.txid];
}
}
const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
@ -270,10 +270,6 @@ class Mempool {
if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
// Store replaced transactions
rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
// Erase the replaced transactions from the local mempool
for (const replaced of rbfTransactions[rbfTransaction]) {
delete this.mempoolCache[replaced.txid];
}
}
}
}
@ -291,17 +287,6 @@ class Mempool {
}
}
public deleteExpiredTransactions() {
const now = new Date().getTime();
for (const tx in this.mempoolCache) {
const lazyDeleteAt = this.mempoolCache[tx].deleteAfter;
if (lazyDeleteAt && lazyDeleteAt < now) {
delete this.mempoolCache[tx];
rbfCache.evict(tx);
}
}
}
private $getMempoolInfo() {
if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
return Promise.all([

View File

@ -163,7 +163,7 @@ class RbfCache {
}
// flag a transaction as removed from the mempool
public evict(txid, fast: boolean = false): void {
public evict(txid: string, fast: boolean = false): void {
if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours
}

View File

@ -301,6 +301,9 @@ class WebsocketHandler {
rbfReplacements = rbfCache.getRbfTrees(false);
fullRbfReplacements = rbfCache.getRbfTrees(true);
}
for (const deletedTx of deletedTransactions) {
rbfCache.evict(deletedTx.txid);
}
const recommendedFees = feeApi.getRecommendedFee();
this.wss.clients.forEach(async (client) => {

View File

@ -2,6 +2,7 @@ import express from 'express';
import { Application, Request, Response, NextFunction } from 'express';
import * as http from 'http';
import * as WebSocket from 'ws';
import bitcoinApi from './api/bitcoin/bitcoin-api-factory';
import cluster from 'cluster';
import DB from './database';
import config from './config';
@ -179,12 +180,15 @@ class Server {
logger.debug(msg);
}
}
await blocks.$updateBlocks();
memPool.deleteExpiredTransactions();
await memPool.$updateMempool();
const newMempool = await bitcoinApi.$getRawMempool();
const numHandledBlocks = await blocks.$updateBlocks();
if (numHandledBlocks === 0) {
await memPool.$updateMempool(newMempool);
}
indexer.$run();
setTimeout(this.runMainUpdateLoop.bind(this), config.MEMPOOL.POLL_RATE_MS);
// rerun immediately if we skipped the mempool update, otherwise wait POLL_RATE_MS
setTimeout(this.runMainUpdateLoop.bind(this), numHandledBlocks > 0 ? 1 : config.MEMPOOL.POLL_RATE_MS);
this.backendRetryCount = 0;
} catch (e: any) {
this.backendRetryCount++;

View File

@ -80,7 +80,7 @@ export interface TransactionExtended extends IEsploraApi.Transaction {
descendants?: Ancestor[];
bestDescendant?: BestDescendant | null;
cpfpChecked?: boolean;
deleteAfter?: number;
hasRelatives?: boolean;
position?: {
block: number,
vsize: number,