[accelerator] get acceleration updates over websocket

This commit is contained in:
Mononaut
2024-08-01 18:19:13 +00:00
parent 05e88a25be
commit be49f70b09
5 changed files with 187 additions and 65 deletions

View File

@@ -10,6 +10,7 @@ import bitcoinClient from './bitcoin/bitcoin-client';
import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
import rbfCache from './rbf-cache';
import { Acceleration } from './services/acceleration';
import accelerationApi from './services/acceleration';
import redisCache from './redis-cache';
import blocks from './blocks';
@@ -207,7 +208,7 @@ class Mempool {
return txTimes;
}
public async $updateMempool(transactions: string[], accelerations: Acceleration[] | null, minFeeMempool: string[], minFeeTip: number, pollRate: number): Promise<void> {
public async $updateMempool(transactions: string[], accelerations: Record<string, Acceleration> | null, minFeeMempool: string[], minFeeTip: number, pollRate: number): Promise<void> {
logger.debug(`Updating mempool...`);
// warn if this run stalls the main loop for more than 2 minutes
@@ -354,7 +355,7 @@ class Mempool {
const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx));
this.latestTransactions = newTransactionsStripped.concat(this.latestTransactions).slice(0, 6);
const accelerationDelta = accelerations != null ? await this.$updateAccelerations(accelerations) : [];
const accelerationDelta = accelerations != null ? await this.updateAccelerations(accelerations) : [];
if (accelerationDelta.length) {
hasChange = true;
}
@@ -399,58 +400,11 @@ class Mempool {
return this.accelerations;
}
public $updateAccelerations(newAccelerations: Acceleration[]): string[] {
public updateAccelerations(newAccelerationMap: Record<string, Acceleration>): string[] {
try {
const changed: string[] = [];
const newAccelerationMap: { [txid: string]: Acceleration } = {};
for (const acceleration of newAccelerations) {
// skip transactions we don't know about
if (!this.mempoolCache[acceleration.txid]) {
continue;
}
newAccelerationMap[acceleration.txid] = acceleration;
if (this.accelerations[acceleration.txid] == null) {
// new acceleration
changed.push(acceleration.txid);
} else {
if (this.accelerations[acceleration.txid].feeDelta !== acceleration.feeDelta) {
// feeDelta changed
changed.push(acceleration.txid);
} else if (this.accelerations[acceleration.txid].pools?.length) {
let poolsChanged = false;
const pools = new Set();
this.accelerations[acceleration.txid].pools.forEach(pool => {
pools.add(pool);
});
acceleration.pools.forEach(pool => {
if (!pools.has(pool)) {
poolsChanged = true;
} else {
pools.delete(pool);
}
});
if (pools.size > 0) {
poolsChanged = true;
}
if (poolsChanged) {
// pools changed
changed.push(acceleration.txid);
}
}
}
}
for (const oldTxid of Object.keys(this.accelerations)) {
if (!newAccelerationMap[oldTxid]) {
// removed
changed.push(oldTxid);
}
}
const accelerationDelta = accelerationApi.getAccelerationDelta(this.accelerations, newAccelerationMap);
this.accelerations = newAccelerationMap;
return changed;
return accelerationDelta;
} catch (e: any) {
logger.debug(`Failed to update accelerations: ` + (e instanceof Error ? e.message : e));
return [];