Compare commits
2 Commits
master
...
mononaut/f
Author | SHA1 | Date | |
---|---|---|---|
|
6bb9ffd21a | ||
|
9a8e5b7896 |
@ -1,7 +1,7 @@
|
||||
import { IEsploraApi } from './esplora-api.interface';
|
||||
|
||||
export interface AbstractBitcoinApi {
|
||||
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>;
|
||||
$getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}>;
|
||||
$getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>;
|
||||
$getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]>;
|
||||
$getAllMempoolTransactions(lastTxid: string);
|
||||
@ -25,6 +25,7 @@ export interface AbstractBitcoinApi {
|
||||
$getBatchedOutspends(txId: string[]): Promise<IEsploraApi.Outspend[][]>;
|
||||
|
||||
startHealthChecks(): void;
|
||||
isFailedOver(): boolean;
|
||||
}
|
||||
export interface BitcoinRpcCredentials {
|
||||
host: string;
|
||||
|
@ -137,8 +137,12 @@ class BitcoinApi implements AbstractBitcoinApi {
|
||||
throw new Error('Method getScriptHashTransactions not supported by the Bitcoin RPC API.');
|
||||
}
|
||||
|
||||
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
|
||||
return this.bitcoindClient.getRawMemPool();
|
||||
async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
|
||||
const txids = await this.bitcoindClient.getRawMemPool();
|
||||
return {
|
||||
txids,
|
||||
local: true,
|
||||
};
|
||||
}
|
||||
|
||||
$getAddressPrefix(prefix: string): string[] {
|
||||
@ -356,6 +360,9 @@ class BitcoinApi implements AbstractBitcoinApi {
|
||||
}
|
||||
|
||||
public startHealthChecks(): void {};
|
||||
public isFailedOver(): boolean {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export default BitcoinApi;
|
||||
|
@ -638,8 +638,8 @@ class BitcoinRoutes {
|
||||
|
||||
private async getMempoolTxIds(req: Request, res: Response) {
|
||||
try {
|
||||
const rawMempool = await bitcoinApi.$getRawMempool();
|
||||
res.send(rawMempool);
|
||||
const { txids } = await bitcoinApi.$getRawMempool();
|
||||
res.send(txids);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import http from 'http';
|
||||
import { AbstractBitcoinApi } from './bitcoin-api-abstract-factory';
|
||||
import { IEsploraApi } from './esplora-api.interface';
|
||||
import logger from '../../logger';
|
||||
import mempool from '../mempool';
|
||||
|
||||
interface FailoverHost {
|
||||
host: string,
|
||||
@ -17,6 +18,8 @@ interface FailoverHost {
|
||||
}
|
||||
|
||||
class FailoverRouter {
|
||||
isFailedOver: boolean = false;
|
||||
preferredHost: FailoverHost;
|
||||
activeHost: FailoverHost;
|
||||
fallbackHost: FailoverHost;
|
||||
hosts: FailoverHost[];
|
||||
@ -46,6 +49,7 @@ class FailoverRouter {
|
||||
socket: !!config.ESPLORA.UNIX_SOCKET_PATH,
|
||||
preferred: true,
|
||||
};
|
||||
this.preferredHost = this.activeHost;
|
||||
this.fallbackHost = this.activeHost;
|
||||
this.hosts.unshift(this.activeHost);
|
||||
this.multihost = this.hosts.length > 1;
|
||||
@ -151,6 +155,7 @@ class FailoverRouter {
|
||||
this.sortHosts();
|
||||
this.activeHost = this.hosts[0];
|
||||
logger.warn(`Switching esplora host to ${this.activeHost.host}`);
|
||||
this.isFailedOver = this.activeHost !== this.preferredHost;
|
||||
}
|
||||
|
||||
private addFailure(host: FailoverHost): FailoverHost {
|
||||
@ -164,7 +169,7 @@ class FailoverRouter {
|
||||
}
|
||||
}
|
||||
|
||||
private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true): Promise<T> {
|
||||
private async $query<T>(method: 'get'| 'post', path, data: any, responseType = 'json', host = this.activeHost, retry: boolean = true, withSource = false): Promise<T | { data: T, host: FailoverHost }> {
|
||||
let axiosConfig;
|
||||
let url;
|
||||
if (host.socket) {
|
||||
@ -177,8 +182,17 @@ class FailoverRouter {
|
||||
return (method === 'post'
|
||||
? this.requestConnection.post<T>(url, data, axiosConfig)
|
||||
: this.requestConnection.get<T>(url, axiosConfig)
|
||||
).then((response) => { host.failures = Math.max(0, host.failures - 1); return response.data; })
|
||||
.catch((e) => {
|
||||
).then((response) => {
|
||||
host.failures = Math.max(0, host.failures - 1);
|
||||
if (withSource) {
|
||||
return {
|
||||
data: response.data,
|
||||
host,
|
||||
};
|
||||
} else {
|
||||
return response.data;
|
||||
}
|
||||
}).catch((e) => {
|
||||
let fallbackHost = this.fallbackHost;
|
||||
if (e?.response?.status !== 404) {
|
||||
logger.warn(`esplora request failed ${e?.response?.status || 500} ${host.host}${path}`);
|
||||
@ -186,7 +200,7 @@ class FailoverRouter {
|
||||
}
|
||||
if (retry && e?.code === 'ECONNREFUSED' && this.multihost) {
|
||||
// Retry immediately
|
||||
return this.$query(method, path, data, responseType, fallbackHost, false);
|
||||
return this.$query(method, path, data, responseType, fallbackHost, false, withSource);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
@ -194,19 +208,27 @@ class FailoverRouter {
|
||||
}
|
||||
|
||||
public async $get<T>(path, responseType = 'json'): Promise<T> {
|
||||
return this.$query<T>('get', path, null, responseType);
|
||||
return this.$query<T>('get', path, null, responseType, this.activeHost, true) as Promise<T>;
|
||||
}
|
||||
|
||||
public async $post<T>(path, data: any, responseType = 'json'): Promise<T> {
|
||||
return this.$query<T>('post', path, data, responseType);
|
||||
return this.$query<T>('post', path, data, responseType) as Promise<T>;
|
||||
}
|
||||
|
||||
public async $getWithSource<T>(path, responseType = 'json'): Promise<{ data: T, host: FailoverHost }> {
|
||||
return this.$query<T>('get', path, null, responseType, this.activeHost, true, true) as Promise<{ data: T, host: FailoverHost }>;
|
||||
}
|
||||
}
|
||||
|
||||
class ElectrsApi implements AbstractBitcoinApi {
|
||||
private failoverRouter = new FailoverRouter();
|
||||
|
||||
$getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]> {
|
||||
return this.failoverRouter.$get<IEsploraApi.Transaction['txid'][]>('/mempool/txids');
|
||||
async $getRawMempool(): Promise<{ txids: IEsploraApi.Transaction['txid'][], local: boolean}> {
|
||||
const result = await this.failoverRouter.$getWithSource<IEsploraApi.Transaction['txid'][]>('/mempool/txids', 'json');
|
||||
return {
|
||||
txids: result.data,
|
||||
local: result.host === this.failoverRouter.preferredHost,
|
||||
};
|
||||
}
|
||||
|
||||
$getRawTransaction(txId: string): Promise<IEsploraApi.Transaction> {
|
||||
@ -302,6 +324,10 @@ class ElectrsApi implements AbstractBitcoinApi {
|
||||
public startHealthChecks(): void {
|
||||
this.failoverRouter.startHealthChecks();
|
||||
}
|
||||
|
||||
public isFailedOver(): boolean {
|
||||
return this.failoverRouter.isFailedOver;
|
||||
}
|
||||
}
|
||||
|
||||
export default ElectrsApi;
|
||||
|
@ -26,6 +26,9 @@ class Mempool {
|
||||
|
||||
private accelerations: { [txId: string]: Acceleration } = {};
|
||||
|
||||
private failoverTimes: number[] = [];
|
||||
private statisticsPaused: boolean = false;
|
||||
|
||||
private txPerSecondArray: number[] = [];
|
||||
private txPerSecond: number = 0;
|
||||
|
||||
@ -164,6 +167,15 @@ class Mempool {
|
||||
return this.mempoolInfo;
|
||||
}
|
||||
|
||||
public getStatisticsIsPaused(): boolean {
|
||||
return this.statisticsPaused;
|
||||
}
|
||||
|
||||
public logFailover(): void {
|
||||
this.failoverTimes.push(Date.now());
|
||||
this.statisticsPaused = true;
|
||||
}
|
||||
|
||||
public getTxPerSecond(): number {
|
||||
return this.txPerSecond;
|
||||
}
|
||||
@ -242,6 +254,10 @@ class Mempool {
|
||||
logger.debug(`fetched ${txs.length} transactions`);
|
||||
this.updateTimerProgress(timer, 'fetched new transactions');
|
||||
|
||||
if (bitcoinApi.isFailedOver()) {
|
||||
this.failoverTimes.push(Date.now());
|
||||
}
|
||||
|
||||
for (const transaction of txs) {
|
||||
this.mempoolCache[transaction.txid] = transaction;
|
||||
if (this.inSync) {
|
||||
@ -259,6 +275,10 @@ class Mempool {
|
||||
}
|
||||
}
|
||||
|
||||
if (bitcoinApi.isFailedOver()) {
|
||||
this.failoverTimes.push(Date.now());
|
||||
}
|
||||
|
||||
if (txs.length < slice.length) {
|
||||
const missing = slice.length - txs.length;
|
||||
if (config.MEMPOOL.BACKEND === 'esplora') {
|
||||
@ -491,6 +511,10 @@ class Mempool {
|
||||
|
||||
private updateTxPerSecond() {
|
||||
const nowMinusTimeSpan = new Date().getTime() - (1000 * config.STATISTICS.TX_PER_SECOND_SAMPLE_PERIOD);
|
||||
|
||||
this.failoverTimes = this.failoverTimes.filter((unixTime) => unixTime > nowMinusTimeSpan);
|
||||
this.statisticsPaused = this.failoverTimes.length > 0;
|
||||
|
||||
this.txPerSecondArray = this.txPerSecondArray.filter((unixTime) => unixTime > nowMinusTimeSpan);
|
||||
this.txPerSecond = this.txPerSecondArray.length / config.STATISTICS.TX_PER_SECOND_SAMPLE_PERIOD || 0;
|
||||
|
||||
|
@ -29,9 +29,10 @@ class Statistics {
|
||||
}
|
||||
|
||||
private async runStatistics(): Promise<void> {
|
||||
if (!memPool.isInSync()) {
|
||||
if (!memPool.isInSync() || memPool.getStatisticsIsPaused()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const currentMempool = memPool.getMempool();
|
||||
const txPerSecond = memPool.getTxPerSecond();
|
||||
const vBytesPerSecond = memPool.getVBytesPerSecond();
|
||||
|
@ -71,9 +71,8 @@ class WebsocketHandler {
|
||||
private updateSocketData(): void {
|
||||
const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
|
||||
const da = difficultyAdjustment.getDifficultyAdjustment();
|
||||
this.updateSocketDataFields({
|
||||
const socketData = {
|
||||
'mempoolInfo': memPool.getMempoolInfo(),
|
||||
'vBytesPerSecond': memPool.getVBytesPerSecond(),
|
||||
'blocks': _blocks,
|
||||
'conversions': priceUpdater.getLatestPrices(),
|
||||
'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
|
||||
@ -82,7 +81,11 @@ class WebsocketHandler {
|
||||
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
|
||||
'da': da?.previousTime ? da : undefined,
|
||||
'fees': feeApi.getRecommendedFee(),
|
||||
});
|
||||
};
|
||||
if (!memPool.getStatisticsIsPaused()) {
|
||||
socketData['vBytesPerSecond'] = memPool.getVBytesPerSecond();
|
||||
}
|
||||
this.updateSocketDataFields(socketData);
|
||||
}
|
||||
|
||||
public getSerializedInitData(): string {
|
||||
@ -414,7 +417,7 @@ class WebsocketHandler {
|
||||
const mBlocks = mempoolBlocks.getMempoolBlocks();
|
||||
const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
|
||||
const mempoolInfo = memPool.getMempoolInfo();
|
||||
const vBytesPerSecond = memPool.getVBytesPerSecond();
|
||||
const vBytesPerSecond = memPool.getStatisticsIsPaused() ? null : memPool.getVBytesPerSecond();
|
||||
const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions);
|
||||
const da = difficultyAdjustment.getDifficultyAdjustment();
|
||||
memPool.handleRbfTransactions(rbfTransactions);
|
||||
@ -440,13 +443,15 @@ class WebsocketHandler {
|
||||
// update init data
|
||||
const socketDataFields = {
|
||||
'mempoolInfo': mempoolInfo,
|
||||
'vBytesPerSecond': vBytesPerSecond,
|
||||
'mempool-blocks': mBlocks,
|
||||
'transactions': latestTransactions,
|
||||
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
|
||||
'da': da?.previousTime ? da : undefined,
|
||||
'fees': recommendedFees,
|
||||
};
|
||||
if (vBytesPerSecond != null) {
|
||||
socketDataFields['vBytesPerSecond'] = vBytesPerSecond;
|
||||
}
|
||||
if (rbfSummary) {
|
||||
socketDataFields['rbfSummary'] = rbfSummary;
|
||||
}
|
||||
@ -496,7 +501,9 @@ class WebsocketHandler {
|
||||
|
||||
if (client['want-stats']) {
|
||||
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
|
||||
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
|
||||
if (vBytesPerSecond != null) {
|
||||
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
|
||||
}
|
||||
response['transactions'] = getCachedResponse('transactions', latestTransactions);
|
||||
if (da?.previousTime) {
|
||||
response['da'] = getCachedResponse('da', da);
|
||||
@ -784,7 +791,9 @@ class WebsocketHandler {
|
||||
|
||||
if (client['want-stats']) {
|
||||
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
|
||||
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
|
||||
if (!memPool.getStatisticsIsPaused()) {
|
||||
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
|
||||
}
|
||||
response['fees'] = getCachedResponse('fees', fees);
|
||||
|
||||
if (da?.previousTime) {
|
||||
|
@ -191,10 +191,14 @@ class Server {
|
||||
logger.debug(msg);
|
||||
}
|
||||
}
|
||||
const newMempool = await bitcoinApi.$getRawMempool();
|
||||
|
||||
const { txids: newMempool, local: fromLocalNode } = await bitcoinApi.$getRawMempool();
|
||||
const numHandledBlocks = await blocks.$updateBlocks();
|
||||
const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1);
|
||||
if (numHandledBlocks === 0) {
|
||||
if (!fromLocalNode) {
|
||||
memPool.logFailover();
|
||||
}
|
||||
await memPool.$updateMempool(newMempool, pollRate);
|
||||
}
|
||||
indexer.$run();
|
||||
|
Loading…
x
Reference in New Issue
Block a user