Extend esplora api to support returning origin, improve stats pause logic

This commit is contained in:
Mononaut 2023-09-14 22:44:24 +00:00
parent 9a8e5b7896
commit 6bb9ffd21a
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
7 changed files with 61 additions and 21 deletions

View File

@ -1,7 +1,7 @@
import { IEsploraApi } from './esplora-api.interface'; import { IEsploraApi } from './esplora-api.interface';
export interface AbstractBitcoinApi { export interface AbstractBitcoinApi {
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>; $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}>;
$getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>; $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
$getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>; $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>;
$getAllMempoolTransactions(lastTxid: string); $getAllMempoolTransactions(lastTxid: string);

View File

@ -137,8 +137,12 @@ class BitcoinApi implements AbstractBitcoinApi {
throw new Error('Method getScriptHashTransactions not supported by the Bitcoin RPC API.'); throw new Error('Method getScriptHashTransactions not supported by the Bitcoin RPC API.');
} }
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> { async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
return this.bitcoindClient.getRawMemPool(); const txids = await this.bitcoindClient.getRawMemPool();
return {
txids,
local: true,
};
} }
$getAddressPrefix(prefix: string): string[] { $getAddressPrefix(prefix: string): string[] {

View File

@ -638,8 +638,8 @@ class BitcoinRoutes {
private async getMempoolTxIds(req: Request, res: Response) { private async getMempoolTxIds(req: Request, res: Response) {
try { try {
const rawMempool = await bitcoinApi.$getRawMempool(); const { txids } = await bitcoinApi.$getRawMempool();
res.send(rawMempool); res.send(txids);
} catch (e) { } catch (e) {
res.status(500).send(e instanceof Error ? e.message : e); res.status(500).send(e instanceof Error ? e.message : e);
} }

View File

@ -4,6 +4,7 @@ import http from 'http';
import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory'; import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory';
import { IEsploraApi } from './esplora-api.interface'; import { IEsploraApi } from './esplora-api.interface';
import logger from '../../logger'; import logger from '../../logger';
import mempool from '../mempool';
interface FailoverHost { interface FailoverHost {
host: string, host: string,
@ -168,7 +169,7 @@ class FailoverRouter {
} }
} }
private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true): Promise<T> { private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true, withSource = false): Promise<T | { data: T, host: FailoverHost }> {
let axiosConfig; let axiosConfig;
let url; let url;
if (host.socket) { if (host.socket) {
@ -181,8 +182,17 @@ class FailoverRouter {
return (method === 'post' return (method === 'post'
? this.requestConnection.post<T>(url, data, axiosConfig) ? this.requestConnection.post<T>(url, data, axiosConfig)
: this.requestConnection.get<T>(url, axiosConfig) : this.requestConnection.get<T>(url, axiosConfig)
).then((response) => { host.failures = Math.max(0, host.failures - 1); return response.data; }) ).then((response) => {
.catch((e) => { host.failures = Math.max(0, host.failures - 1);
if (withSource) {
return {
data: response.data,
host,
};
} else {
return response.data;
}
}).catch((e) => {
let fallbackHost = this.fallbackHost; let fallbackHost = this.fallbackHost;
if (e?.response?.status !== 404) { if (e?.response?.status !== 404) {
logger.warn(`esplora request failed ${e?.response?.status || 500} ${host.host}${path}`); logger.warn(`esplora request failed ${e?.response?.status || 500} ${host.host}${path}`);
@ -190,7 +200,7 @@ class FailoverRouter {
} }
if (retry && e?.code === 'ECONNREFUSED' && this.multihost) { if (retry && e?.code === 'ECONNREFUSED' && this.multihost) {
// Retry immediately // Retry immediately
return this.$query(method, path, data, responseType, fallbackHost, false); return this.$query(method, path, data, responseType, fallbackHost, false, withSource);
} else { } else {
throw e; throw e;
} }
@ -198,19 +208,27 @@ class FailoverRouter {
} }
public async $get<T>(path, responseType = 'json'): Promise<T> { public async $get<T>(path, responseType = 'json'): Promise<T> {
return this.$query<T>('get', path, null, responseType); return this.$query<T>('get', path, null, responseType, this.activeHost, true) as Promise<T>;
} }
public async $post<T>(path, data: any, responseType = 'json'): Promise<T> { public async $post<T>(path, data: any, responseType = 'json'): Promise<T> {
return this.$query<T>('post', path, data, responseType); return this.$query<T>('post', path, data, responseType) as Promise<T>;
}
public async $getWithSource<T>(path, responseType = 'json'): Promise<{ data: T, host: FailoverHost }> {
return this.$query<T>('get', path, null, responseType, this.activeHost, true, true) as Promise<{ data: T, host: FailoverHost }>;
} }
} }
class ElectrsApi implements AbstractBitcoinApi { class ElectrsApi implements AbstractBitcoinApi {
private failoverRouter = new FailoverRouter(); private failoverRouter = new FailoverRouter();
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> { async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
return this.failoverRouter.$get<IEsploraApi.Transaction['txid'][]>('/mempool/txids'); const result = await this.failoverRouter.$getWithSource<IEsploraApi.Transaction['txid'][]>('/mempool/txids', 'json');
return {
txids: result.data,
local: result.host === this.failoverRouter.preferredHost,
};
} }
$getRawTransaction(txId: string): Promise<IEsploraApi.Transaction> { $getRawTransaction(txId: string): Promise<IEsploraApi.Transaction> {

View File

@ -171,6 +171,11 @@ class Mempool {
return this.statisticsPaused; return this.statisticsPaused;
} }
public logFailover(): void {
this.failoverTimes.push(Date.now());
this.statisticsPaused = true;
}
public getTxPerSecond(): number { public getTxPerSecond(): number {
return this.txPerSecond; return this.txPerSecond;
} }

View File

@ -71,9 +71,8 @@ class WebsocketHandler {
private updateSocketData(): void { private updateSocketData(): void {
const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT); const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
const da = difficultyAdjustment.getDifficultyAdjustment(); const da = difficultyAdjustment.getDifficultyAdjustment();
this.updateSocketDataFields({ const socketData = {
'mempoolInfo': memPool.getMempoolInfo(), 'mempoolInfo': memPool.getMempoolInfo(),
'vBytesPerSecond': memPool.getStatisticsIsPaused() ? null : memPool.getVBytesPerSecond(),
'blocks': _blocks, 'blocks': _blocks,
'conversions': priceUpdater.getLatestPrices(), 'conversions': priceUpdater.getLatestPrices(),
'mempool-blocks': mempoolBlocks.getMempoolBlocks(), 'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
@ -82,7 +81,11 @@ class WebsocketHandler {
'loadingIndicators': loadingIndicators.getLoadingIndicators(), 'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined, 'da': da?.previousTime ? da : undefined,
'fees': feeApi.getRecommendedFee(), 'fees': feeApi.getRecommendedFee(),
}); };
if (!memPool.getStatisticsIsPaused()) {
socketData['vBytesPerSecond'] = memPool.getVBytesPerSecond();
}
this.updateSocketDataFields(socketData);
} }
public getSerializedInitData(): string { public getSerializedInitData(): string {
@ -414,7 +417,7 @@ class WebsocketHandler {
const mBlocks = mempoolBlocks.getMempoolBlocks(); const mBlocks = mempoolBlocks.getMempoolBlocks();
const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
const mempoolInfo = memPool.getMempoolInfo(); const mempoolInfo = memPool.getMempoolInfo();
const vBytesPerSecond = memPool.getVBytesPerSecond(); const vBytesPerSecond = memPool.getStatisticsIsPaused() ? null : memPool.getVBytesPerSecond();
const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions);
const da = difficultyAdjustment.getDifficultyAdjustment(); const da = difficultyAdjustment.getDifficultyAdjustment();
memPool.handleRbfTransactions(rbfTransactions); memPool.handleRbfTransactions(rbfTransactions);
@ -440,13 +443,15 @@ class WebsocketHandler {
// update init data // update init data
const socketDataFields = { const socketDataFields = {
'mempoolInfo': mempoolInfo, 'mempoolInfo': mempoolInfo,
'vBytesPerSecond': vBytesPerSecond,
'mempool-blocks': mBlocks, 'mempool-blocks': mBlocks,
'transactions': latestTransactions, 'transactions': latestTransactions,
'loadingIndicators': loadingIndicators.getLoadingIndicators(), 'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined, 'da': da?.previousTime ? da : undefined,
'fees': recommendedFees, 'fees': recommendedFees,
}; };
if (vBytesPerSecond != null) {
socketDataFields['vBytesPerSecond'] = vBytesPerSecond;
}
if (rbfSummary) { if (rbfSummary) {
socketDataFields['rbfSummary'] = rbfSummary; socketDataFields['rbfSummary'] = rbfSummary;
} }
@ -496,7 +501,9 @@ class WebsocketHandler {
if (client['want-stats']) { if (client['want-stats']) {
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo); response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond); if (vBytesPerSecond != null) {
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
}
response['transactions'] = getCachedResponse('transactions', latestTransactions); response['transactions'] = getCachedResponse('transactions', latestTransactions);
if (da?.previousTime) { if (da?.previousTime) {
response['da'] = getCachedResponse('da', da); response['da'] = getCachedResponse('da', da);
@ -784,7 +791,9 @@ class WebsocketHandler {
if (client['want-stats']) { if (client['want-stats']) {
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo); response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond()); if (!memPool.getStatisticsIsPaused()) {
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
}
response['fees'] = getCachedResponse('fees', fees); response['fees'] = getCachedResponse('fees', fees);
if (da?.previousTime) { if (da?.previousTime) {

View File

@ -191,10 +191,14 @@ class Server {
logger.debug(msg); logger.debug(msg);
} }
} }
const newMempool = await bitcoinApi.$getRawMempool();
const { txids: newMempool, local: fromLocalNode } = await bitcoinApi.$getRawMempool();
const numHandledBlocks = await blocks.$updateBlocks(); const numHandledBlocks = await blocks.$updateBlocks();
const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1); const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1);
if (numHandledBlocks === 0) { if (numHandledBlocks === 0) {
if (!fromLocalNode) {
memPool.logFailover();
}
await memPool.$updateMempool(newMempool, pollRate); await memPool.$updateMempool(newMempool, pollRate);
} }
indexer.$run(); indexer.$run();