Merge pull request #5394 from mempool/mononaut/use-acceleration-websocket
[accelerator] get acceleration updates over websocket
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
import { WebSocket } from 'ws';
|
||||
import config from '../../config';
|
||||
import logger from '../../logger';
|
||||
import { BlockExtended } from '../../mempool.interfaces';
|
||||
import axios from 'axios';
|
||||
import mempool from '../mempool';
|
||||
import websocketHandler from '../websocket-handler';
|
||||
|
||||
type MyAccelerationStatus = 'requested' | 'accelerating' | 'done';
|
||||
|
||||
@@ -37,14 +40,20 @@ export interface AccelerationHistory {
|
||||
};
|
||||
|
||||
class AccelerationApi {
|
||||
private ws: WebSocket | null = null;
|
||||
private useWebsocket: boolean = config.MEMPOOL.OFFICIAL && config.MEMPOOL_SERVICES.ACCELERATIONS;
|
||||
private startedWebsocketLoop: boolean = false;
|
||||
private websocketConnected: boolean = false;
|
||||
private onDemandPollingEnabled = !config.MEMPOOL_SERVICES.ACCELERATIONS;
|
||||
private apiPath = config.MEMPOOL.OFFICIAL ? (config.MEMPOOL_SERVICES.API + '/accelerator/accelerations') : (config.EXTERNAL_DATA_SERVER.MEMPOOL_API + '/accelerations');
|
||||
private _accelerations: Acceleration[] | null = null;
|
||||
private _accelerations: Record<string, Acceleration> = {};
|
||||
private lastPoll = 0;
|
||||
private forcePoll = false;
|
||||
private myAccelerations: Record<string, { status: MyAccelerationStatus, added: number, acceleration?: Acceleration }> = {};
|
||||
|
||||
public get accelerations(): Acceleration[] | null {
|
||||
public constructor() {}
|
||||
|
||||
public getAccelerations(): Record<string, Acceleration> {
|
||||
return this._accelerations;
|
||||
}
|
||||
|
||||
@@ -72,11 +81,18 @@ class AccelerationApi {
|
||||
}
|
||||
}
|
||||
|
||||
public async $updateAccelerations(): Promise<Acceleration[] | null> {
|
||||
public async $updateAccelerations(): Promise<Record<string, Acceleration> | null> {
|
||||
if (this.useWebsocket && this.websocketConnected) {
|
||||
return this._accelerations;
|
||||
}
|
||||
if (!this.onDemandPollingEnabled) {
|
||||
const accelerations = await this.$fetchAccelerations();
|
||||
if (accelerations) {
|
||||
this._accelerations = accelerations;
|
||||
const latestAccelerations = {};
|
||||
for (const acc of accelerations) {
|
||||
latestAccelerations[acc.txid] = acc;
|
||||
}
|
||||
this._accelerations = latestAccelerations;
|
||||
return this._accelerations;
|
||||
}
|
||||
} else {
|
||||
@@ -85,7 +101,7 @@ class AccelerationApi {
|
||||
return null;
|
||||
}
|
||||
|
||||
private async $updateAccelerationsOnDemand(): Promise<Acceleration[] | null> {
|
||||
private async $updateAccelerationsOnDemand(): Promise<Record<string, Acceleration> | null> {
|
||||
const shouldUpdate = this.forcePoll
|
||||
|| this.countMyAccelerationsWithStatus('requested') > 0
|
||||
|| (this.countMyAccelerationsWithStatus('accelerating') > 0 && this.lastPoll < (Date.now() - (10 * 60 * 1000)));
|
||||
@@ -120,7 +136,11 @@ class AccelerationApi {
|
||||
}
|
||||
}
|
||||
|
||||
this._accelerations = Object.values(this.myAccelerations).map(({ acceleration }) => acceleration).filter(acc => acc) as Acceleration[];
|
||||
const latestAccelerations = {};
|
||||
for (const acc of Object.values(this.myAccelerations).map(({ acceleration }) => acceleration).filter(acc => acc) as Acceleration[]) {
|
||||
latestAccelerations[acc.txid] = acc;
|
||||
}
|
||||
this._accelerations = latestAccelerations;
|
||||
return this._accelerations;
|
||||
}
|
||||
|
||||
@@ -152,6 +172,110 @@ class AccelerationApi {
|
||||
}
|
||||
return anyAccelerated;
|
||||
}
|
||||
|
||||
// get a list of accelerations that have changed between two sets of accelerations
|
||||
public getAccelerationDelta(oldAccelerationMap: Record<string, Acceleration>, newAccelerationMap: Record<string, Acceleration>): string[] {
|
||||
const changed: string[] = [];
|
||||
const mempoolCache = mempool.getMempool();
|
||||
|
||||
for (const acceleration of Object.values(newAccelerationMap)) {
|
||||
// skip transactions we don't know about
|
||||
if (!mempoolCache[acceleration.txid]) {
|
||||
continue;
|
||||
}
|
||||
if (oldAccelerationMap[acceleration.txid] == null) {
|
||||
// new acceleration
|
||||
changed.push(acceleration.txid);
|
||||
} else {
|
||||
if (oldAccelerationMap[acceleration.txid].feeDelta !== acceleration.feeDelta) {
|
||||
// feeDelta changed
|
||||
changed.push(acceleration.txid);
|
||||
} else if (oldAccelerationMap[acceleration.txid].pools?.length) {
|
||||
let poolsChanged = false;
|
||||
const pools = new Set();
|
||||
oldAccelerationMap[acceleration.txid].pools.forEach(pool => {
|
||||
pools.add(pool);
|
||||
});
|
||||
acceleration.pools.forEach(pool => {
|
||||
if (!pools.has(pool)) {
|
||||
poolsChanged = true;
|
||||
} else {
|
||||
pools.delete(pool);
|
||||
}
|
||||
});
|
||||
if (pools.size > 0) {
|
||||
poolsChanged = true;
|
||||
}
|
||||
if (poolsChanged) {
|
||||
// pools changed
|
||||
changed.push(acceleration.txid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const oldTxid of Object.keys(oldAccelerationMap)) {
|
||||
if (!newAccelerationMap[oldTxid]) {
|
||||
// removed
|
||||
changed.push(oldTxid);
|
||||
}
|
||||
}
|
||||
|
||||
return changed;
|
||||
}
|
||||
|
||||
private handleWebsocketMessage(msg: any): void {
|
||||
if (msg?.accelerations !== null) {
|
||||
const latestAccelerations = {};
|
||||
for (const acc of msg?.accelerations || []) {
|
||||
latestAccelerations[acc.txid] = acc;
|
||||
}
|
||||
this._accelerations = latestAccelerations;
|
||||
websocketHandler.handleAccelerationsChanged(this._accelerations);
|
||||
}
|
||||
}
|
||||
|
||||
public async connectWebsocket(): Promise<void> {
|
||||
if (this.startedWebsocketLoop) {
|
||||
return;
|
||||
}
|
||||
while (this.useWebsocket) {
|
||||
this.startedWebsocketLoop = true;
|
||||
if (!this.ws) {
|
||||
this.ws = new WebSocket(`${config.MEMPOOL_SERVICES.API.replace('https://', 'ws://').replace('http://', 'ws://')}/accelerator/ws`);
|
||||
this.websocketConnected = true;
|
||||
|
||||
this.ws.on('open', () => {
|
||||
logger.info('Acceleration websocket opened');
|
||||
this.ws?.send(JSON.stringify({
|
||||
'watch-accelerations': true
|
||||
}));
|
||||
});
|
||||
|
||||
this.ws.on('error', (error) => {
|
||||
logger.err('Acceleration websocket error: ' + error);
|
||||
this.ws = null;
|
||||
this.websocketConnected = false;
|
||||
});
|
||||
|
||||
this.ws.on('close', () => {
|
||||
logger.info('Acceleration websocket closed');
|
||||
this.ws = null;
|
||||
this.websocketConnected = false;
|
||||
});
|
||||
|
||||
this.ws.on('message', (data, isBinary) => {
|
||||
try {
|
||||
const parsedMsg = JSON.parse((isBinary ? data : data.toString()) as string);
|
||||
this.handleWebsocketMessage(parsedMsg);
|
||||
} catch (e) {
|
||||
logger.warn('Failed to parse acceleration websocket message: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
});
|
||||
}
|
||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new AccelerationApi();
|
||||
Reference in New Issue
Block a user