Merge branch 'master' into simon/remove-bisq

This commit is contained in:
softsimon
2024-04-01 18:52:56 +09:00
committed by GitHub
74 changed files with 8049 additions and 7560 deletions

View File

@@ -2,7 +2,7 @@ import logger from '../logger';
import * as WebSocket from 'ws';
import {
BlockExtended, TransactionExtended, MempoolTransactionExtended, WebsocketResponse,
OptimizedStatistic, ILoadingIndicators
OptimizedStatistic, ILoadingIndicators, GbtCandidates, TxTrackingInfo,
} from '../mempool.interfaces';
import blocks from './blocks';
import memPool from './mempool';
@@ -18,7 +18,6 @@ import feeApi from './fee-api';
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
import Audit from './audit';
import { deepClone } from '../utils/clone';
import priceUpdater from '../tasks/price-updater';
import { ApiPrice } from '../repositories/PricesRepository';
import accelerationApi from './services/acceleration';
@@ -32,6 +31,8 @@ interface AddressTransactions {
confirmed: MempoolTransactionExtended[],
removed: MempoolTransactionExtended[],
}
import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
import { calculateCpfp } from './cpfp';
// valid 'want' subscriptions
const wantable = [
@@ -208,6 +209,52 @@ class WebsocketHandler {
}
}
if (parsedMessage && parsedMessage['track-txs']) {
const txids: string[] = [];
if (Array.isArray(parsedMessage['track-txs'])) {
for (const txid of parsedMessage['track-txs']) {
if (/^[a-fA-F0-9]{64}$/.test(txid)) {
txids.push(txid);
}
}
}
const txs: { [txid: string]: TxTrackingInfo } = {};
for (const txid of txids) {
const txInfo: TxTrackingInfo = {
confirmed: true,
};
const rbfCacheTxid = rbfCache.getReplacedBy(txid);
if (rbfCacheTxid) {
txInfo.replacedBy = rbfCacheTxid;
txInfo.confirmed = false;
}
const tx = memPool.getMempool()[txid];
if (tx && tx.position) {
txInfo.position = {
...tx.position
};
if (tx.acceleration) {
txInfo.accelerated = tx.acceleration;
}
}
if (tx) {
txInfo.confirmed = false;
}
txs[txid] = txInfo;
}
if (txids.length) {
client['track-txs'] = txids;
} else {
client['track-txs'] = null;
}
if (Object.keys(txs).length) {
response['tracked-txs'] = JSON.stringify(txs);
}
}
if (parsedMessage && parsedMessage['track-address']) {
const validAddress = this.testAddress(parsedMessage['track-address']);
if (validAddress) {
@@ -428,21 +475,26 @@ class WebsocketHandler {
}
async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]): Promise<void> {
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
candidates?: GbtCandidates): Promise<void> {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
}
this.printLogs();
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
if (config.MEMPOOL.RUST_GBT) {
await mempoolBlocks.$rustUpdateBlockTemplates(newMempool, mempoolSize, newTransactions, deletedTransactions, config.MEMPOOL_SERVICES.ACCELERATIONS);
} else {
await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions, accelerationDelta, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
}
const transactionIds = (memPool.limitGBT && candidates) ? Object.keys(candidates?.txs || {}) : Object.keys(newMempool);
let added = newTransactions;
let removed = deletedTransactions;
if (memPool.limitGBT) {
added = candidates?.added || [];
removed = candidates?.removed || [];
}
if (config.MEMPOOL.RUST_GBT) {
await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, newMempool, added, removed, candidates, config.MEMPOOL_SERVICES.ACCELERATIONS);
} else {
mempoolBlocks.updateMempoolBlocks(newMempool, true);
await mempoolBlocks.$updateBlockTemplates(transactionIds, newMempool, added, removed, candidates, accelerationDelta, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
}
const mBlocks = mempoolBlocks.getMempoolBlocks();
@@ -503,6 +555,11 @@ class WebsocketHandler {
if (client['track-tx']) {
trackedTxs.add(client['track-tx']);
}
if (client['track-txs']) {
for (const txid of client['track-txs']) {
trackedTxs.add(txid);
}
}
});
if (trackedTxs.size > 0) {
for (const tx of newTransactions) {
@@ -681,6 +738,9 @@ class WebsocketHandler {
accelerated: mempoolTx.acceleration || undefined,
}
};
if (!mempoolTx.cpfpChecked) {
calculateCpfp(mempoolTx, newMempool);
}
if (mempoolTx.cpfpDirty) {
positionData['cpfp'] = {
ancestors: mempoolTx.ancestors,
@@ -696,6 +756,46 @@ class WebsocketHandler {
}
}
if (client['track-txs']) {
const txids = client['track-txs'];
const txs: { [txid: string]: TxTrackingInfo } = {};
for (const txid of txids) {
const txInfo: TxTrackingInfo = {};
const outspends = outspendCache[txid];
if (outspends && Object.keys(outspends).length) {
txInfo.utxoSpent = outspends;
}
const replacedBy = rbfChanges.map[txid] ? rbfCache.getReplacedBy(txid) : false;
if (replacedBy) {
txInfo.replacedBy = replacedBy;
}
const mempoolTx = newMempool[txid];
if (mempoolTx && mempoolTx.position) {
txInfo.position = {
...mempoolTx.position,
accelerated: mempoolTx.acceleration || undefined,
};
if (!mempoolTx.cpfpChecked) {
calculateCpfp(mempoolTx, newMempool);
}
if (mempoolTx.cpfpDirty) {
txInfo.cpfp = {
ancestors: mempoolTx.ancestors,
bestDescendant: mempoolTx.bestDescendant || null,
descendants: mempoolTx.descendants || null,
effectiveFeePerVsize: mempoolTx.effectiveFeePerVsize || null,
sigops: mempoolTx.sigops,
adjustedVsize: mempoolTx.adjustedVsize,
};
}
}
txs[txid] = txInfo;
}
if (Object.keys(txs).length) {
response['tracked-txs'] = JSON.stringify(txs);
}
}
if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
const index = client['track-mempool-block'];
if (mBlockDeltas[index]) {
@@ -731,8 +831,9 @@ class WebsocketHandler {
await statistics.runStatistics();
const _memPool = memPool.getMempool();
const isAccelerated = config.MEMPOOL_SERVICES.ACCELERATIONS && accelerationApi.isAcceleratedBlock(block, Object.values(mempool.getAccelerations()));
const candidateTxs = await memPool.getMempoolCandidates();
let candidates: GbtCandidates | undefined = (memPool.limitGBT && candidateTxs) ? { txs: candidateTxs, added: [], removed: [] } : undefined;
let transactionIds: string[] = (memPool.limitGBT) ? Object.keys(candidates?.txs || {}) : Object.keys(_memPool);
const accelerations = Object.values(mempool.getAccelerations());
await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, transactions);
@@ -743,31 +844,19 @@ class WebsocketHandler {
if (config.MEMPOOL.AUDIT && memPool.isInSync()) {
let projectedBlocks;
let auditMempool = _memPool;
// template calculation functions have mempool side effects, so calculate audits using
// a cloned copy of the mempool if we're running a different algorithm for mempool updates
const separateAudit = config.MEMPOOL.ADVANCED_GBT_AUDIT !== config.MEMPOOL.ADVANCED_GBT_MEMPOOL;
if (separateAudit) {
auditMempool = deepClone(_memPool);
if (config.MEMPOOL.ADVANCED_GBT_AUDIT) {
if (config.MEMPOOL.RUST_GBT) {
projectedBlocks = await mempoolBlocks.$oneOffRustBlockTemplates(auditMempool, isAccelerated, block.extras.pool.id);
} else {
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false, isAccelerated, block.extras.pool.id);
}
const auditMempool = _memPool;
const isAccelerated = config.MEMPOOL_SERVICES.ACCELERATIONS && accelerationApi.isAcceleratedBlock(block, Object.values(mempool.getAccelerations()));
if ((config.MEMPOOL_SERVICES.ACCELERATIONS)) {
if (config.MEMPOOL.RUST_GBT) {
const added = memPool.limitGBT ? (candidates?.added || []) : [];
const removed = memPool.limitGBT ? (candidates?.removed || []) : [];
projectedBlocks = await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, auditMempool, added, removed, candidates, isAccelerated, block.extras.pool.id);
} else {
projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false);
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(transactionIds, auditMempool, candidates, false, isAccelerated, block.extras.pool.id);
}
} else {
if ((config.MEMPOOL_SERVICES.ACCELERATIONS)) {
if (config.MEMPOOL.RUST_GBT) {
projectedBlocks = await mempoolBlocks.$rustUpdateBlockTemplates(auditMempool, Object.keys(auditMempool).length, [], [], isAccelerated, block.extras.pool.id);
} else {
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false, isAccelerated, block.extras.pool.id);
}
} else {
projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();
}
projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();
}
if (Common.indexingEnabled()) {
@@ -830,14 +919,23 @@ class WebsocketHandler {
confirmedTxids[txId] = true;
}
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
if (config.MEMPOOL.RUST_GBT) {
await mempoolBlocks.$rustUpdateBlockTemplates(_memPool, Object.keys(_memPool).length, [], transactions, true);
} else {
await mempoolBlocks.$makeBlockTemplates(_memPool, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
}
if (memPool.limitGBT) {
const minFeeMempool = memPool.limitGBT ? await bitcoinSecondClient.getRawMemPool() : null;
const minFeeTip = memPool.limitGBT ? await bitcoinSecondClient.getBlockCount() : -1;
candidates = await memPool.getNextCandidates(minFeeMempool, minFeeTip, transactions);
transactionIds = Object.keys(candidates?.txs || {});
} else {
mempoolBlocks.updateMempoolBlocks(_memPool, true);
candidates = undefined;
transactionIds = Object.keys(memPool.getMempool());
}
if (config.MEMPOOL.RUST_GBT) {
const added = memPool.limitGBT ? (candidates?.added || []) : [];
const removed = memPool.limitGBT ? (candidates?.removed || []) : transactions;
await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, _memPool, added, removed, candidates, true);
} else {
await mempoolBlocks.$makeBlockTemplates(transactionIds, _memPool, candidates, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
}
const mBlocks = mempoolBlocks.getMempoolBlocks();
const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
@@ -916,6 +1014,28 @@ class WebsocketHandler {
}
}
if (client['track-txs']) {
const txs: { [txid: string]: TxTrackingInfo } = {};
for (const txid of client['track-txs']) {
if (confirmedTxids[txid]) {
txs[txid] = { confirmed: true };
} else {
const mempoolTx = _memPool[txid];
if (mempoolTx && mempoolTx.position) {
txs[txid] = {
position: {
...mempoolTx.position,
},
accelerated: mempoolTx.acceleration || undefined,
};
}
}
}
if (Object.keys(txs).length) {
response['tracked-txs'] = JSON.stringify(txs);
}
}
if (client['track-address']) {
const foundTransactions: TransactionExtended[] = Array.from(addressCache[client['track-address']]?.values() || []);
@@ -1111,6 +1231,7 @@ class WebsocketHandler {
private printLogs(): void {
if (this.wss) {
let numTxSubs = 0;
let numTxsSubs = 0;
let numProjectedSubs = 0;
let numRbfSubs = 0;
@@ -1118,6 +1239,9 @@ class WebsocketHandler {
if (client['track-tx']) {
numTxSubs++;
}
if (client['track-txs']) {
numTxsSubs++;
}
if (client['track-mempool-block'] != null && client['track-mempool-block'] >= 0) {
numProjectedSubs++;
}
@@ -1130,7 +1254,7 @@ class WebsocketHandler {
const diff = count - this.numClients;
this.numClients = count;
logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);
logger.debug(`websocket subscriptions: track-tx: ${numTxSubs}, track-mempool-block: ${numProjectedSubs} track-rbf: ${numRbfSubs}`);
logger.debug(`websocket subscriptions: track-tx: ${numTxSubs}, track-txs: ${numTxsSubs}, track-mempool-block: ${numProjectedSubs} track-rbf: ${numRbfSubs}`);
this.numConnected = 0;
this.numDisconnected = 0;
}