Refactor accelerated audits
This commit is contained in:
parent
20b3ceab1e
commit
083bfdba06
@ -6,7 +6,7 @@ import rbfCache from './rbf-cache';
|
|||||||
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners
|
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners
|
||||||
|
|
||||||
class Audit {
|
class Audit {
|
||||||
auditBlock(transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended })
|
auditBlock(transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended }, useAccelerations: boolean = false)
|
||||||
: { censored: string[], added: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number } {
|
: { censored: string[], added: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number } {
|
||||||
if (!projectedBlocks?.[0]?.transactionIds || !mempool) {
|
if (!projectedBlocks?.[0]?.transactionIds || !mempool) {
|
||||||
return { censored: [], added: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 0, similarity: 1 };
|
return { censored: [], added: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 0, similarity: 1 };
|
||||||
@ -29,7 +29,7 @@ class Audit {
|
|||||||
const now = Math.round((Date.now() / 1000));
|
const now = Math.round((Date.now() / 1000));
|
||||||
for (const tx of transactions) {
|
for (const tx of transactions) {
|
||||||
inBlock[tx.txid] = tx;
|
inBlock[tx.txid] = tx;
|
||||||
if (tx.acceleration) {
|
if (mempool[tx.txid] && mempool[tx.txid].acceleration) {
|
||||||
accelerated.push(tx.txid);
|
accelerated.push(tx.txid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -207,7 +207,7 @@ class MempoolBlocks {
|
|||||||
return mempoolBlockDeltas;
|
return mempoolBlockDeltas;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $makeBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> {
|
public async $makeBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, saveResults: boolean = false, useAccelerations: boolean = false): Promise<MempoolBlockWithTransactions[]> {
|
||||||
const start = Date.now();
|
const start = Date.now();
|
||||||
|
|
||||||
// reset mempool short ids
|
// reset mempool short ids
|
||||||
@ -216,7 +216,7 @@ class MempoolBlocks {
|
|||||||
this.setUid(tx, true);
|
this.setUid(tx, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
const accelerations = mempool.getAccelerations();
|
const accelerations = useAccelerations ? mempool.getAccelerations() : {};
|
||||||
|
|
||||||
// prepare a stripped down version of the mempool with only the minimum necessary data
|
// prepare a stripped down version of the mempool with only the minimum necessary data
|
||||||
// to reduce the overhead of passing this data to the worker thread
|
// to reduce the overhead of passing this data to the worker thread
|
||||||
@ -225,7 +225,7 @@ class MempoolBlocks {
|
|||||||
if (entry.uid !== null && entry.uid !== undefined) {
|
if (entry.uid !== null && entry.uid !== undefined) {
|
||||||
const stripped = {
|
const stripped = {
|
||||||
uid: entry.uid,
|
uid: entry.uid,
|
||||||
fee: entry.fee + (accelerations[entry.txid] || 0),
|
fee: entry.fee + (useAccelerations ? (accelerations[entry.txid] || 0) : 0),
|
||||||
weight: (entry.adjustedVsize * 4),
|
weight: (entry.adjustedVsize * 4),
|
||||||
sigops: entry.sigops,
|
sigops: entry.sigops,
|
||||||
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
||||||
@ -265,7 +265,7 @@ class MempoolBlocks {
|
|||||||
// clean up thread error listener
|
// clean up thread error listener
|
||||||
this.txSelectionWorker?.removeListener('error', threadErrorListener);
|
this.txSelectionWorker?.removeListener('error', threadErrorListener);
|
||||||
|
|
||||||
const processed = this.processBlockTemplates(newMempool, blocks, null, Object.entries(rates), Object.values(clusters), saveResults);
|
const processed = this.processBlockTemplates(newMempool, blocks, null, Object.entries(rates), Object.values(clusters), accelerations, saveResults);
|
||||||
|
|
||||||
logger.debug(`makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
logger.debug(`makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
||||||
|
|
||||||
@ -276,17 +276,17 @@ class MempoolBlocks {
|
|||||||
return this.mempoolBlocks;
|
return this.mempoolBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $updateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[], accelerationDelta: string[] = [], saveResults: boolean = false): Promise<void> {
|
public async $updateBlockTemplates(newMempool: { [txid: string]: MempoolTransactionExtended }, added: MempoolTransactionExtended[], removed: MempoolTransactionExtended[], accelerationDelta: string[] = [], saveResults: boolean = false, useAccelerations: boolean = false): Promise<void> {
|
||||||
if (!this.txSelectionWorker) {
|
if (!this.txSelectionWorker) {
|
||||||
// need to reset the worker
|
// need to reset the worker
|
||||||
await this.$makeBlockTemplates(newMempool, saveResults);
|
await this.$makeBlockTemplates(newMempool, saveResults, useAccelerations);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const start = Date.now();
|
const start = Date.now();
|
||||||
|
|
||||||
const accelerations = mempool.getAccelerations();
|
const accelerations = useAccelerations ? mempool.getAccelerations() : {};
|
||||||
const addedAndChanged: MempoolTransactionExtended[] = accelerationDelta.map(txid => newMempool[txid]).filter(tx => tx != null).concat(added);
|
const addedAndChanged: MempoolTransactionExtended[] = useAccelerations ? accelerationDelta.map(txid => newMempool[txid]).filter(tx => tx != null).concat(added) : added;
|
||||||
|
|
||||||
for (const tx of addedAndChanged) {
|
for (const tx of addedAndChanged) {
|
||||||
this.setUid(tx, true);
|
this.setUid(tx, true);
|
||||||
@ -298,7 +298,7 @@ class MempoolBlocks {
|
|||||||
const addedStripped: CompactThreadTransaction[] = addedAndChanged.filter(entry => entry.uid != null).map(entry => {
|
const addedStripped: CompactThreadTransaction[] = addedAndChanged.filter(entry => entry.uid != null).map(entry => {
|
||||||
return {
|
return {
|
||||||
uid: entry.uid || 0,
|
uid: entry.uid || 0,
|
||||||
fee: entry.fee + (accelerations[entry.txid] || 0),
|
fee: entry.fee + (useAccelerations ? (accelerations[entry.txid] || 0) : 0),
|
||||||
weight: (entry.adjustedVsize * 4),
|
weight: (entry.adjustedVsize * 4),
|
||||||
sigops: entry.sigops,
|
sigops: entry.sigops,
|
||||||
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
feePerVsize: entry.adjustedFeePerVsize || entry.feePerVsize,
|
||||||
@ -325,7 +325,7 @@ class MempoolBlocks {
|
|||||||
// clean up thread error listener
|
// clean up thread error listener
|
||||||
this.txSelectionWorker?.removeListener('error', threadErrorListener);
|
this.txSelectionWorker?.removeListener('error', threadErrorListener);
|
||||||
|
|
||||||
this.processBlockTemplates(newMempool, blocks, null, Object.entries(rates), Object.values(clusters), saveResults);
|
this.processBlockTemplates(newMempool, blocks, null, Object.entries(rates), Object.values(clusters), accelerations, saveResults);
|
||||||
logger.debug(`updateBlockTemplates completed in ${(Date.now() - start) / 1000} seconds`);
|
logger.debug(`updateBlockTemplates completed in ${(Date.now() - start) / 1000} seconds`);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
|
logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
|
||||||
@ -362,7 +362,7 @@ class MempoolBlocks {
|
|||||||
if (saveResults) {
|
if (saveResults) {
|
||||||
this.rustInitialized = true;
|
this.rustInitialized = true;
|
||||||
}
|
}
|
||||||
const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, saveResults);
|
const processed = this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, {}, saveResults);
|
||||||
logger.debug(`RUST makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
logger.debug(`RUST makeBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
||||||
return processed;
|
return processed;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -414,7 +414,7 @@ class MempoolBlocks {
|
|||||||
if (mempoolSize !== resultMempoolSize) {
|
if (mempoolSize !== resultMempoolSize) {
|
||||||
throw new Error('GBT returned wrong number of transactions, cache is probably out of sync');
|
throw new Error('GBT returned wrong number of transactions, cache is probably out of sync');
|
||||||
} else {
|
} else {
|
||||||
this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, true);
|
this.processBlockTemplates(newMempool, blocks, blockWeights, rates, clusters, {}, true);
|
||||||
}
|
}
|
||||||
this.removeUids(removedUids);
|
this.removeUids(removedUids);
|
||||||
logger.debug(`RUST updateBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
logger.debug(`RUST updateBlockTemplates completed in ${(Date.now() - start)/1000} seconds`);
|
||||||
@ -424,7 +424,7 @@ class MempoolBlocks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private processBlockTemplates(mempool: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], saveResults): MempoolBlockWithTransactions[] {
|
private processBlockTemplates(mempool: { [txid: string]: MempoolTransactionExtended }, blocks: string[][], blockWeights: number[] | null, rates: [string, number][], clusters: string[][], accelerations, saveResults): MempoolBlockWithTransactions[] {
|
||||||
for (const [txid, rate] of rates) {
|
for (const [txid, rate] of rates) {
|
||||||
if (txid in mempool) {
|
if (txid in mempool) {
|
||||||
mempool[txid].effectiveFeePerVsize = rate;
|
mempool[txid].effectiveFeePerVsize = rate;
|
||||||
@ -503,6 +503,8 @@ class MempoolBlocks {
|
|||||||
mempoolTx.cpfpChecked = true;
|
mempoolTx.cpfpChecked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mempoolTx.acceleration = accelerations[txid];
|
||||||
|
|
||||||
// online calculation of stack-of-blocks fee stats
|
// online calculation of stack-of-blocks fee stats
|
||||||
if (hasBlockStack && blockIndex === lastBlockIndex && feeStatsCalculator) {
|
if (hasBlockStack && blockIndex === lastBlockIndex && feeStatsCalculator) {
|
||||||
feeStatsCalculator.processNext(mempoolTx);
|
feeStatsCalculator.processNext(mempoolTx);
|
||||||
|
@ -342,7 +342,7 @@ class Mempool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const newAccelerations = await accelerationApi.fetchAccelerations$();
|
const newAccelerations = await accelerationApi.$fetchAccelerations();
|
||||||
|
|
||||||
const changed: string[] = [];
|
const changed: string[] = [];
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import { query } from '../../utils/axios-query';
|
import { query } from '../../utils/axios-query';
|
||||||
import config from '../../config';
|
import config from '../../config';
|
||||||
|
import { BlockExtended, PoolTag } from '../../mempool.interfaces';
|
||||||
|
|
||||||
export interface Acceleration {
|
export interface Acceleration {
|
||||||
txid: string,
|
txid: string,
|
||||||
@ -7,7 +8,7 @@ export interface Acceleration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class AccelerationApi {
|
class AccelerationApi {
|
||||||
public async fetchAccelerations$(): Promise<Acceleration[]> {
|
public async $fetchAccelerations(): Promise<Acceleration[]> {
|
||||||
if (config.MEMPOOL_SERVICES.ACCELERATIONS) {
|
if (config.MEMPOOL_SERVICES.ACCELERATIONS) {
|
||||||
const response = await query(`${config.MEMPOOL_SERVICES.API}/accelerations`);
|
const response = await query(`${config.MEMPOOL_SERVICES.API}/accelerations`);
|
||||||
return (response as Acceleration[]) || [];
|
return (response as Acceleration[]) || [];
|
||||||
@ -15,6 +16,23 @@ class AccelerationApi {
|
|||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async $fetchPools(): Promise<PoolTag[]> {
|
||||||
|
if (config.MEMPOOL_SERVICES.ACCELERATIONS) {
|
||||||
|
const response = await query(`${config.MEMPOOL_SERVICES.API}/partners`);
|
||||||
|
return (response as PoolTag[]) || [];
|
||||||
|
} else {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async $isAcceleratedBlock(block: BlockExtended): Promise<boolean> {
|
||||||
|
const pools = await this.$fetchPools();
|
||||||
|
if (block?.extras?.pool?.id == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return pools.reduce((match, tag) => match || tag.uniqueId === block.extras.pool.id, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default new AccelerationApi();
|
export default new AccelerationApi();
|
@ -21,7 +21,7 @@ import Audit from './audit';
|
|||||||
import { deepClone } from '../utils/clone';
|
import { deepClone } from '../utils/clone';
|
||||||
import priceUpdater from '../tasks/price-updater';
|
import priceUpdater from '../tasks/price-updater';
|
||||||
import { ApiPrice } from '../repositories/PricesRepository';
|
import { ApiPrice } from '../repositories/PricesRepository';
|
||||||
import mempool from './mempool';
|
import accelerationApi from './services/acceleration';
|
||||||
|
|
||||||
// valid 'want' subscriptions
|
// valid 'want' subscriptions
|
||||||
const wantable = [
|
const wantable = [
|
||||||
@ -392,7 +392,7 @@ class WebsocketHandler {
|
|||||||
if (config.MEMPOOL.RUST_GBT) {
|
if (config.MEMPOOL.RUST_GBT) {
|
||||||
await mempoolBlocks.$rustUpdateBlockTemplates(newMempool, mempoolSize, newTransactions, deletedTransactions);
|
await mempoolBlocks.$rustUpdateBlockTemplates(newMempool, mempoolSize, newTransactions, deletedTransactions);
|
||||||
} else {
|
} else {
|
||||||
await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions, accelerationDelta, true);
|
await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions, accelerationDelta, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mempoolBlocks.updateMempoolBlocks(newMempool, true);
|
mempoolBlocks.updateMempoolBlocks(newMempool, true);
|
||||||
@ -648,6 +648,7 @@ class WebsocketHandler {
|
|||||||
if (config.MEMPOOL.AUDIT && memPool.isInSync()) {
|
if (config.MEMPOOL.AUDIT && memPool.isInSync()) {
|
||||||
let projectedBlocks;
|
let projectedBlocks;
|
||||||
let auditMempool = _memPool;
|
let auditMempool = _memPool;
|
||||||
|
const isAccelerated = config.MEMPOOL_SERVICES.ACCELERATIONS && await accelerationApi.$isAcceleratedBlock(block);
|
||||||
// template calculation functions have mempool side effects, so calculate audits using
|
// template calculation functions have mempool side effects, so calculate audits using
|
||||||
// a cloned copy of the mempool if we're running a different algorithm for mempool updates
|
// a cloned copy of the mempool if we're running a different algorithm for mempool updates
|
||||||
const separateAudit = config.MEMPOOL.ADVANCED_GBT_AUDIT !== config.MEMPOOL.ADVANCED_GBT_MEMPOOL;
|
const separateAudit = config.MEMPOOL.ADVANCED_GBT_AUDIT !== config.MEMPOOL.ADVANCED_GBT_MEMPOOL;
|
||||||
@ -657,14 +658,18 @@ class WebsocketHandler {
|
|||||||
if (config.MEMPOOL.RUST_GBT) {
|
if (config.MEMPOOL.RUST_GBT) {
|
||||||
projectedBlocks = await mempoolBlocks.$oneOffRustBlockTemplates(auditMempool);
|
projectedBlocks = await mempoolBlocks.$oneOffRustBlockTemplates(auditMempool);
|
||||||
} else {
|
} else {
|
||||||
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false);
|
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false, isAccelerated);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false);
|
projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if ((config.MEMPOOL_SERVICES.ACCELERATIONS && !isAccelerated)) {
|
||||||
|
projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false, isAccelerated);
|
||||||
} else {
|
} else {
|
||||||
projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();
|
projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (Common.indexingEnabled()) {
|
if (Common.indexingEnabled()) {
|
||||||
const { censored, added, fresh, sigop, fullrbf, accelerated, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool);
|
const { censored, added, fresh, sigop, fullrbf, accelerated, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool);
|
||||||
@ -723,11 +728,15 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
|
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
|
||||||
|
<<<<<<< HEAD
|
||||||
if (config.MEMPOOL.RUST_GBT) {
|
if (config.MEMPOOL.RUST_GBT) {
|
||||||
await mempoolBlocks.$rustUpdateBlockTemplates(_memPool, Object.keys(_memPool).length, [], transactions);
|
await mempoolBlocks.$rustUpdateBlockTemplates(_memPool, Object.keys(_memPool).length, [], transactions);
|
||||||
} else {
|
} else {
|
||||||
await mempoolBlocks.$makeBlockTemplates(_memPool, true);
|
await mempoolBlocks.$makeBlockTemplates(_memPool, true);
|
||||||
}
|
}
|
||||||
|
=======
|
||||||
|
await mempoolBlocks.$makeBlockTemplates(_memPool, true, config.MEMPOOL_SERVICES.ACCELERATIONS);
|
||||||
|
>>>>>>> 77b0a8ecc (Refactor accelerated audits)
|
||||||
} else {
|
} else {
|
||||||
mempoolBlocks.updateMempoolBlocks(_memPool, true);
|
mempoolBlocks.updateMempoolBlocks(_memPool, true);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user