Use minfee node to limit gbt input size

This commit is contained in:
Mononaut 2023-02-17 17:54:29 -06:00
parent 2e6a0c0967
commit 0df71123f6
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
4 changed files with 164 additions and 59 deletions

View File

@ -1,5 +1,5 @@
import logger from '../logger'; import logger from '../logger';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces'; import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, GbtCandidates } from '../mempool.interfaces';
import { Common } from './common'; import { Common } from './common';
import config from '../config'; import config from '../config';
import { Worker } from 'worker_threads'; import { Worker } from 'worker_threads';
@ -147,19 +147,23 @@ class MempoolBlocks {
return mempoolBlockDeltas; return mempoolBlockDeltas;
} }
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> { public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false, candidates?: GbtCandidates): Promise<MempoolBlockWithTransactions[]> {
// prepare a stripped down version of the mempool with only the minimum necessary data // prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread // to reduce the overhead of passing this data to the worker thread
const txids = candidates ? Object.keys(candidates.txs) : Object.keys(newMempool);
const strippedMempool: { [txid: string]: ThreadTransaction } = {}; const strippedMempool: { [txid: string]: ThreadTransaction } = {};
Object.values(newMempool).forEach(entry => { txids.forEach(txid => {
strippedMempool[entry.txid] = { const entry = newMempool[txid];
txid: entry.txid, if (entry) {
fee: entry.fee, strippedMempool[entry.txid] = {
weight: entry.weight, txid: entry.txid,
feePerVsize: entry.fee / (entry.weight / 4), fee: entry.fee,
effectiveFeePerVsize: entry.fee / (entry.weight / 4), weight: entry.weight,
vin: entry.vin.map(v => v.txid), feePerVsize: entry.fee / (entry.weight / 4),
}; effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
}
}); });
// (re)initialize tx selection worker thread // (re)initialize tx selection worker thread
@ -191,31 +195,49 @@ class MempoolBlocks {
// clean up thread error listener // clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener); this.txSelectionWorker?.removeListener('error', threadErrorListener);
return this.processBlockTemplates(newMempool, blocks, clusters, saveResults); return this.processBlockTemplates(newMempool, blocks, clusters, saveResults, candidates);
} catch (e) { } catch (e) {
logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
} }
return this.mempoolBlocks; return this.mempoolBlocks;
} }
public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise<void> { public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false, candidates?: GbtCandidates): Promise<void> {
if (!this.txSelectionWorker) { if (!this.txSelectionWorker) {
// need to reset the worker // need to reset the worker
this.makeBlockTemplates(newMempool, saveResults); this.makeBlockTemplates(newMempool, saveResults, candidates);
return; return;
} }
// prepare a stripped down version of the mempool with only the minimum necessary data // prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread // to reduce the overhead of passing this data to the worker thread
const addedStripped: ThreadTransaction[] = added.map(entry => { let addedStripped: ThreadTransaction[] = [];
return { let removedList;
txid: entry.txid, if (candidates) {
fee: entry.fee, addedStripped = candidates.added.filter(txid => newMempool[txid]).map(txid => {
weight: entry.weight, const entry = newMempool[txid];
feePerVsize: entry.fee / (entry.weight / 4), return {
effectiveFeePerVsize: entry.fee / (entry.weight / 4), txid: entry.txid,
vin: entry.vin.map(v => v.txid), fee: entry.fee,
}; weight: entry.weight,
}); feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
removedList = candidates.removed;
} else {
addedStripped = added.map(entry => {
return {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
removedList = removed;
}
// run the block construction algorithm in a separate thread, and wait for a result // run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener; let threadErrorListener;
@ -227,19 +249,19 @@ class MempoolBlocks {
}); });
this.txSelectionWorker?.once('error', reject); this.txSelectionWorker?.once('error', reject);
}); });
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed }); this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedList });
const { blocks, clusters } = await workerResultPromise; const { blocks, clusters } = await workerResultPromise;
// clean up thread error listener // clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener); this.txSelectionWorker?.removeListener('error', threadErrorListener);
this.processBlockTemplates(newMempool, blocks, clusters, saveResults); this.processBlockTemplates(newMempool, blocks, clusters, saveResults, candidates);
} catch (e) { } catch (e) {
logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e)); logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
} }
} }
private processBlockTemplates(mempool, blocks, clusters, saveResults): MempoolBlockWithTransactions[] { private processBlockTemplates(mempool: { [txid: string ]: TransactionExtended }, blocks, clusters, saveResults, candidates?: GbtCandidates): MempoolBlockWithTransactions[] {
// update this thread's mempool with the results // update this thread's mempool with the results
blocks.forEach(block => { blocks.forEach(block => {
block.forEach(tx => { block.forEach(tx => {
@ -277,6 +299,32 @@ class MempoolBlocks {
}); });
}); });
// Add purged transactions at the end, if required
if (candidates) {
const purged: string[] = [];
Object.values(mempool).forEach(tx => {
if (!candidates.txs[tx.txid]) {
purged.push(tx.txid);
}
});
if (!blocks.length) {
blocks = [[]];
}
let blockIndex = blocks.length - 1;
let weight = blocks[blockIndex].reduce((acc, tx) => acc + tx.weight, 0);
purged.sort((a,b) => { return mempool[b].effectiveFeePerVsize - mempool[a].effectiveFeePerVsize});
purged.forEach(txid => {
const tx = mempool[txid];
if ((weight + tx.weight) >= (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000) && blockIndex < 7) {
blocks.push([]);
blockIndex++;
weight = 0;
}
blocks[blockIndex].push(tx);
weight += tx.weight;
});
}
// unpack the condensed blocks into proper mempool blocks // unpack the condensed blocks into proper mempool blocks
const mempoolBlocks = blocks.map((transactions, blockIndex) => { const mempoolBlocks = blocks.map((transactions, blockIndex) => {
return this.dataToMempoolBlocks(transactions.map(tx => { return this.dataToMempoolBlocks(transactions.map(tx => {

View File

@ -1,6 +1,6 @@
import config from '../config'; import config from '../config';
import bitcoinApi from './bitcoin/bitcoin-api-factory'; import bitcoinApi from './bitcoin/bitcoin-api-factory';
import { TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; import { GbtCandidates, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces';
import logger from '../logger'; import logger from '../logger';
import { Common } from './common'; import { Common } from './common';
import transactionUtils from './transaction-utils'; import transactionUtils from './transaction-utils';
@ -9,6 +9,7 @@ import loadingIndicators from './loading-indicators';
import bitcoinClient from './bitcoin/bitcoin-client'; import bitcoinClient from './bitcoin/bitcoin-client';
import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
import rbfCache from './rbf-cache'; import rbfCache from './rbf-cache';
import blocks from './blocks';
class Mempool { class Mempool {
private static WEBSOCKET_REFRESH_RATE_MS = 10000; private static WEBSOCKET_REFRESH_RATE_MS = 10000;
@ -16,13 +17,16 @@ class Mempool {
private inSync: boolean = false; private inSync: boolean = false;
private mempoolCacheDelta: number = -1; private mempoolCacheDelta: number = -1;
private mempoolCache: { [txId: string]: TransactionExtended } = {}; private mempoolCache: { [txId: string]: TransactionExtended } = {};
private mempoolCandidates: { [txid: string ]: boolean } = {};
private minFeeMempool: { [txId: string]: boolean } = {}; private minFeeMempool: { [txId: string]: boolean } = {};
private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0, private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0,
maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 }; maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 };
private secondMempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0,
maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 };
private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => void) | undefined; deletedTransactions: TransactionExtended[]) => void) | undefined;
private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => Promise<void>) | undefined; deletedTransactions: TransactionExtended[], candidates?: GbtCandidates) => Promise<void>) | undefined;
private txPerSecondArray: number[] = []; private txPerSecondArray: number[] = [];
private txPerSecond: number = 0; private txPerSecond: number = 0;
@ -72,7 +76,7 @@ class Mempool {
} }
public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; }, public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; },
newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => Promise<void>) { newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[], candidates?: GbtCandidates) => Promise<void>) {
this.asyncMempoolChangedCallback = fn; this.asyncMempoolChangedCallback = fn;
} }
@ -86,10 +90,18 @@ class Mempool {
this.mempoolChangedCallback(this.mempoolCache, [], []); this.mempoolChangedCallback(this.mempoolCache, [], []);
} }
if (this.asyncMempoolChangedCallback) { if (this.asyncMempoolChangedCallback) {
this.asyncMempoolChangedCallback(this.mempoolCache, [], []); if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
this.asyncMempoolChangedCallback(this.mempoolCache, [], [], { txs: {}, added: [], removed: [] });
} else {
this.asyncMempoolChangedCallback(this.mempoolCache, [], [], );
}
} }
} }
public getMempoolCandidates(): { [txid: string]: boolean } {
return this.mempoolCandidates;
}
public async $updateMemPoolInfo() { public async $updateMemPoolInfo() {
this.mempoolInfo = await this.$getMempoolInfo(); this.mempoolInfo = await this.$getMempoolInfo();
} }
@ -125,7 +137,7 @@ class Mempool {
let hasChange: boolean = false; let hasChange: boolean = false;
const currentMempoolSize = Object.keys(this.mempoolCache).length; const currentMempoolSize = Object.keys(this.mempoolCache).length;
const transactions = await bitcoinApi.$getRawMempool(); const transactions = await bitcoinApi.$getRawMempool();
await this.updateMinFeeMempool(); const candidates = await this.getNextCandidates();
const diff = transactions.length - currentMempoolSize; const diff = transactions.length - currentMempoolSize;
const newTransactions: TransactionExtended[] = []; const newTransactions: TransactionExtended[] = [];
@ -225,8 +237,8 @@ class Mempool {
if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions);
} }
if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) { if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidates?.added.length || candidates?.removed.length)) {
await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, candidates);
} }
const end = new Date().getTime(); const end = new Date().getTime();
@ -238,27 +250,48 @@ class Mempool {
return !this.minFeeMempool[txid]; return !this.minFeeMempool[txid];
} }
private async updateMinFeeMempool() { public async getNextCandidates(): Promise<GbtCandidates | undefined> {
const minFeeTransactions = await bitcoinSecondClient.getRawMemPool(); if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
const minFeeTxMap = {}; const minFeeTransactions = await bitcoinSecondClient.getRawMemPool();
for (const txid of minFeeTransactions) { const blockHeight = await bitcoinSecondClient.getChainTips()
minFeeTxMap[txid] = true; .then((result: IBitcoinApi.ChainTips[]) => {
} return result.find(tip => tip.status === 'active')!.height;
const removed: string[] = []; });
const added: string[] = []; const newCandidateTxMap = {};
for (const txid of Object.keys(this.minFeeMempool)) { this.minFeeMempool = {};
if (!minFeeTxMap[txid]) { for (const txid of minFeeTransactions) {
removed.push(txid); if (this.mempoolCache[txid]) {
} newCandidateTxMap[txid] = true;
} }
for (const txid of minFeeTransactions) {
if (!this.minFeeMempool[txid]) {
added.push(txid);
this.minFeeMempool[txid] = true; this.minFeeMempool[txid] = true;
} }
} const removed: string[] = [];
for (const txid of removed) { const added: string[] = [];
delete this.minFeeMempool[txid]; // don't prematurely remove txs included in a new block
if (blockHeight > blocks.getCurrentBlockHeight()) {
for (const txid of Object.keys(this.mempoolCandidates)) {
newCandidateTxMap[txid] = true;
}
} else {
for (const txid of Object.keys(this.mempoolCandidates)) {
if (!newCandidateTxMap[txid]) {
removed.push(txid);
}
}
}
for (const txid of Object.keys(newCandidateTxMap)) {
if (!this.mempoolCandidates[txid]) {
added.push(txid);
}
}
this.mempoolCandidates = newCandidateTxMap;
return {
txs: this.mempoolCandidates,
added,
removed
};
} }
} }

View File

@ -2,7 +2,7 @@ import logger from '../logger';
import * as WebSocket from 'ws'; import * as WebSocket from 'ws';
import { import {
BlockExtended, TransactionExtended, WebsocketResponse, BlockExtended, TransactionExtended, WebsocketResponse,
OptimizedStatistic, ILoadingIndicators OptimizedStatistic, ILoadingIndicators, GbtCandidates
} from '../mempool.interfaces'; } from '../mempool.interfaces';
import blocks from './blocks'; import blocks from './blocks';
import memPool from './mempool'; import memPool from './mempool';
@ -19,7 +19,6 @@ import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository'; import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
import Audit from './audit'; import Audit from './audit';
import { deepClone } from '../utils/clone'; import { deepClone } from '../utils/clone';
import mempool from './mempool';
import priceUpdater from '../tasks/price-updater'; import priceUpdater from '../tasks/price-updater';
import { ApiPrice } from '../repositories/PricesRepository'; import { ApiPrice } from '../repositories/PricesRepository';
@ -251,13 +250,14 @@ class WebsocketHandler {
} }
async handleMempoolChange(newMempool: { [txid: string]: TransactionExtended }, async handleMempoolChange(newMempool: { [txid: string]: TransactionExtended },
newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): Promise<void> { newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[],
candidates?: GbtCandidates): Promise<void> {
if (!this.wss) { if (!this.wss) {
throw new Error('WebSocket.Server is not set'); throw new Error('WebSocket.Server is not set');
} }
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true); await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true, candidates);
} else { } else {
mempoolBlocks.updateMempoolBlocks(newMempool, true); mempoolBlocks.updateMempoolBlocks(newMempool, true);
} }
@ -428,6 +428,7 @@ class WebsocketHandler {
} }
const _memPool = memPool.getMempool(); const _memPool = memPool.getMempool();
const candidateTxs = await memPool.getMempoolCandidates();
if (config.MEMPOOL.AUDIT) { if (config.MEMPOOL.AUDIT) {
let projectedBlocks; let projectedBlocks;
@ -435,7 +436,15 @@ class WebsocketHandler {
// a cloned copy of the mempool if we're running a different algorithm for mempool updates // a cloned copy of the mempool if we're running a different algorithm for mempool updates
const auditMempool = (config.MEMPOOL.ADVANCED_GBT_AUDIT === config.MEMPOOL.ADVANCED_GBT_MEMPOOL) ? _memPool : deepClone(_memPool); const auditMempool = (config.MEMPOOL.ADVANCED_GBT_AUDIT === config.MEMPOOL.ADVANCED_GBT_MEMPOOL) ? _memPool : deepClone(_memPool);
if (config.MEMPOOL.ADVANCED_GBT_AUDIT) { if (config.MEMPOOL.ADVANCED_GBT_AUDIT) {
projectedBlocks = await mempoolBlocks.makeBlockTemplates(auditMempool, false); let candidates;
if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
candidates = {
txs: candidateTxs,
added: [],
removed: [],
};
}
projectedBlocks = await mempoolBlocks.makeBlockTemplates(auditMempool, false, candidates);
} else { } else {
projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false); projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false);
} }
@ -487,12 +496,21 @@ class WebsocketHandler {
// Update mempool to remove transactions included in the new block // Update mempool to remove transactions included in the new block
for (const txId of txIds) { for (const txId of txIds) {
delete _memPool[txId]; delete _memPool[txId];
delete candidateTxs[txId];
removed.push(txId); removed.push(txId);
rbfCache.evict(txId); rbfCache.evict(txId);
} }
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.updateBlockTemplates(_memPool, [], removed, true); let candidates;
if (config.MEMPOOL.USE_SECOND_NODE_FOR_MINFEE) {
candidates = {
txs: candidateTxs,
added: [],
removed: removed,
};
}
await mempoolBlocks.updateBlockTemplates(_memPool, [], removed, true, candidates);
} else { } else {
mempoolBlocks.updateMempoolBlocks(_memPool, true); mempoolBlocks.updateMempoolBlocks(_memPool, true);
} }

View File

@ -100,6 +100,12 @@ export interface AuditTransaction {
modifiedNode: HeapNode<AuditTransaction>; modifiedNode: HeapNode<AuditTransaction>;
} }
export interface GbtCandidates {
txs: { [txid: string ]: boolean },
added: string[];
removed: string[];
}
export interface ThreadTransaction { export interface ThreadTransaction {
txid: string; txid: string;
fee: number; fee: number;