diff --git a/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts b/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts index c233ed5d7..f610ed883 100644 --- a/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts +++ b/backend/src/api/bitcoin/bitcoin-api-abstract-factory.ts @@ -3,6 +3,7 @@ import { IEsploraApi } from './esplora-api.interface'; export interface AbstractBitcoinApi { $getRawMempool(): Promise; $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise; + $getMempoolTransactions(lastTxid: string); $getTransactionHex(txId: string): Promise; $getBlockHeightTip(): Promise; $getBlockHashTip(): Promise; diff --git a/backend/src/api/bitcoin/bitcoin-api.ts b/backend/src/api/bitcoin/bitcoin-api.ts index c045d8664..3ccea01ef 100644 --- a/backend/src/api/bitcoin/bitcoin-api.ts +++ b/backend/src/api/bitcoin/bitcoin-api.ts @@ -59,6 +59,10 @@ class BitcoinApi implements AbstractBitcoinApi { }); } + $getMempoolTransactions(lastTxid: string): Promise { + return Promise.resolve([]); + } + $getTransactionHex(txId: string): Promise { return this.$getRawTransaction(txId, true) .then((tx) => tx.hex || ''); diff --git a/backend/src/api/bitcoin/esplora-api.ts b/backend/src/api/bitcoin/esplora-api.ts index 5bfff5730..73a44a845 100644 --- a/backend/src/api/bitcoin/esplora-api.ts +++ b/backend/src/api/bitcoin/esplora-api.ts @@ -69,6 +69,10 @@ class ElectrsApi implements AbstractBitcoinApi { return this.$queryWrapper(config.ESPLORA.REST_API_URL + '/tx/' + txId); } + async $getMempoolTransactions(lastSeenTxid?: string): Promise { + return this.$queryWrapper(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : '')); + } + $getTransactionHex(txId: string): Promise { return this.$queryWrapper(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex'); } diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index d988ea47a..945b78738 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -1,5 +1,5 @@ import config from '../config'; -import bitcoinApi from './bitcoin/bitcoin-api-factory'; +import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; import logger from '../logger'; import { Common } from './common'; @@ -9,6 +9,7 @@ import loadingIndicators from './loading-indicators'; import bitcoinClient from './bitcoin/bitcoin-client'; import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import rbfCache from './rbf-cache'; +import { IEsploraApi } from './bitcoin/esplora-api.interface'; class Mempool { private inSync: boolean = false; @@ -103,6 +104,44 @@ class Mempool { this.addToSpendMap(Object.values(this.mempoolCache)); } + public async $reloadMempool(expectedCount: number): Promise { + let count = 0; + let done = false; + let last_txid; + const newTransactions: MempoolTransactionExtended[] = []; + loadingIndicators.setProgress('mempool', count / expectedCount * 100); + while (!done) { + try { + const result = await bitcoinApi.$getMempoolTransactions(last_txid); + if (result) { + for (const tx of result) { + const extendedTransaction = transactionUtils.extendMempoolTransaction(tx); + if (!this.mempoolCache[extendedTransaction.txid]) { + newTransactions.push(extendedTransaction); + this.mempoolCache[extendedTransaction.txid] = extendedTransaction; + } + count++; + } + logger.info(`Fetched ${count} of ${expectedCount} mempool transactions from esplora`); + if (result.length > 0) { + last_txid = result[result.length - 1].txid; + } else { + done = true; + } + if (Math.floor((count / expectedCount) * 100) < 100) { + loadingIndicators.setProgress('mempool', count / expectedCount * 100); + } + } else { + done = true; + } + } catch(err) { + logger.err('failed to fetch bulk mempool transactions from esplora'); + } + } + return newTransactions; + logger.info(`Done inserting loaded mempool transactions into local cache`); + } + public async $updateMemPoolInfo() { this.mempoolInfo = await this.$getMempoolInfo(); } @@ -143,7 +182,7 @@ class Mempool { const currentMempoolSize = Object.keys(this.mempoolCache).length; this.updateTimerProgress(timer, 'got raw mempool'); const diff = transactions.length - currentMempoolSize; - const newTransactions: MempoolTransactionExtended[] = []; + let newTransactions: MempoolTransactionExtended[] = []; this.mempoolCacheDelta = Math.abs(diff); @@ -162,41 +201,57 @@ class Mempool { }; let intervalTimer = Date.now(); - for (const txid of transactions) { - if (!this.mempoolCache[txid]) { - try { - const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); - this.updateTimerProgress(timer, 'fetched new transaction'); - this.mempoolCache[txid] = transaction; - if (this.inSync) { - this.txPerSecondArray.push(new Date().getTime()); - this.vBytesPerSecondArray.push({ - unixTime: new Date().getTime(), - vSize: transaction.vsize, - }); - } - hasChange = true; - newTransactions.push(transaction); - } catch (e: any) { - if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { - this.missingTxCount++; - } - logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); - } - } - if (Date.now() - intervalTimer > 5_000) { - - if (this.inSync) { - // Break and restart mempool loop if we spend too much time processing - // new transactions that may lead to falling behind on block height - logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); - break; - } else { - const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; - logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); - loadingIndicators.setProgress('mempool', progress); - intervalTimer = Date.now() + let loaded = false; + if (config.MEMPOOL.BACKEND === 'esplora' && currentMempoolSize < transactions.length * 0.5 && transactions.length > 20_000) { + this.inSync = false; + logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); + try { + newTransactions = await this.$reloadMempool(transactions.length); + loaded = true; + } catch (e) { + logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); + } + } + + if (!loaded) { + for (const txid of transactions) { + if (!this.mempoolCache[txid]) { + try { + const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); + this.updateTimerProgress(timer, 'fetched new transaction'); + this.mempoolCache[txid] = transaction; + if (this.inSync) { + this.txPerSecondArray.push(new Date().getTime()); + this.vBytesPerSecondArray.push({ + unixTime: new Date().getTime(), + vSize: transaction.vsize, + }); + } + hasChange = true; + newTransactions.push(transaction); + } catch (e: any) { + if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { + this.missingTxCount++; + } + logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); + } + } + + if (Date.now() - intervalTimer > 5_000) { + if (this.inSync) { + // Break and restart mempool loop if we spend too much time processing + // new transactions that may lead to falling behind on block height + logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); + break; + } else { + const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; + logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); + if (Math.floor(progress) < 100) { + loadingIndicators.setProgress('mempool', progress); + } + intervalTimer = Date.now() + } } } } @@ -246,12 +301,6 @@ class Mempool { const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx)); this.latestTransactions = newTransactionsStripped.concat(this.latestTransactions).slice(0, 6); - if (!this.inSync && transactions.length === newMempoolSize) { - this.inSync = true; - logger.notice('The mempool is now in sync!'); - loadingIndicators.setProgress('mempool', 100); - } - this.mempoolCacheDelta = Math.abs(transactions.length - newMempoolSize); if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { @@ -263,6 +312,12 @@ class Mempool { this.updateTimerProgress(timer, 'completed async mempool callback'); } + if (!this.inSync && transactions.length === newMempoolSize) { + this.inSync = true; + logger.notice('The mempool is now in sync!'); + loadingIndicators.setProgress('mempool', 100); + } + const end = new Date().getTime(); const time = end - start; logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`); diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index a0c031175..56c8513cd 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -604,7 +604,7 @@ class WebsocketHandler { } } - if (client['track-mempool-block'] >= 0) { + if (client['track-mempool-block'] >= 0 && memPool.isInSync()) { const index = client['track-mempool-block']; if (mBlockDeltas[index]) { response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, { @@ -644,7 +644,7 @@ class WebsocketHandler { memPool.handleMinedRbfTransactions(rbfTransactions); memPool.removeFromSpendMap(transactions); - if (config.MEMPOOL.AUDIT) { + if (config.MEMPOOL.AUDIT && memPool.isInSync()) { let projectedBlocks; let auditMempool = _memPool; // template calculation functions have mempool side effects, so calculate audits using @@ -665,7 +665,7 @@ class WebsocketHandler { projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); } - if (Common.indexingEnabled() && memPool.isInSync()) { + if (Common.indexingEnabled()) { const { censored, added, fresh, sigop, fullrbf, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool); const matchRate = Math.round(score * 100 * 100) / 100; @@ -858,7 +858,7 @@ class WebsocketHandler { } } - if (client['track-mempool-block'] >= 0) { + if (client['track-mempool-block'] >= 0 && memPool.isInSync()) { const index = client['track-mempool-block']; if (mBlockDeltas && mBlockDeltas[index]) { response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, { diff --git a/frontend/src/app/components/mempool-blocks/mempool-blocks.component.ts b/frontend/src/app/components/mempool-blocks/mempool-blocks.component.ts index e6d5a4bf6..71075b261 100644 --- a/frontend/src/app/components/mempool-blocks/mempool-blocks.component.ts +++ b/frontend/src/app/components/mempool-blocks/mempool-blocks.component.ts @@ -117,7 +117,14 @@ export class MempoolBlocksComponent implements OnInit, OnChanges, OnDestroy { }); this.reduceMempoolBlocksToFitScreen(this.mempoolBlocks); this.stateService.isTabHidden$.subscribe((tabHidden) => this.tabHidden = tabHidden); - this.loadingBlocks$ = this.stateService.isLoadingWebSocket$; + this.loadingBlocks$ = combineLatest([ + this.stateService.isLoadingWebSocket$, + this.stateService.isLoadingMempool$ + ]).pipe( + switchMap(([loadingBlocks, loadingMempool]) => { + return of(loadingBlocks || loadingMempool); + }) + ); this.mempoolBlocks$ = merge( of(true), diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index 5ebca9ba1..ea00f12ab 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -113,6 +113,7 @@ export class StateService { mempoolTxPosition$ = new Subject<{ txid: string, position: MempoolPosition}>(); blockTransactions$ = new Subject(); isLoadingWebSocket$ = new ReplaySubject(1); + isLoadingMempool$ = new BehaviorSubject(true); vbytesPerSecond$ = new ReplaySubject(1); previousRetarget$ = new ReplaySubject(1); backendInfo$ = new ReplaySubject(1); diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index f32f772ac..e70424cdc 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -368,6 +368,11 @@ export class WebsocketService { if (response.loadingIndicators) { this.stateService.loadingIndicators$.next(response.loadingIndicators); + if (response.loadingIndicators.mempool != null && response.loadingIndicators.mempool < 100) { + this.stateService.isLoadingMempool$.next(true); + } else { + this.stateService.isLoadingMempool$.next(false); + } } if (response.mempoolInfo) {