Merge pull request #4043 from mempool/mononaut/mempool-sync-status
Mempool inSync status
This commit is contained in:
		
						commit
						81d1c0a4d5
					
				| @ -3,6 +3,7 @@ import { IEsploraApi } from './esplora-api.interface'; | ||||
| export interface AbstractBitcoinApi { | ||||
|   $getRawMempool(): Promise<IEsploraApi.Transaction['txid'][]>; | ||||
|   $getRawTransaction(txId: string, skipConversion?: boolean, addPrevout?: boolean, lazyPrevouts?: boolean): Promise<IEsploraApi.Transaction>; | ||||
|   $getMempoolTransactions(lastTxid: string); | ||||
|   $getTransactionHex(txId: string): Promise<string>; | ||||
|   $getBlockHeightTip(): Promise<number>; | ||||
|   $getBlockHashTip(): Promise<string>; | ||||
|  | ||||
| @ -59,6 +59,10 @@ class BitcoinApi implements AbstractBitcoinApi { | ||||
|       }); | ||||
|   } | ||||
| 
 | ||||
|   $getMempoolTransactions(lastTxid: string): Promise<IEsploraApi.Transaction[]> { | ||||
|     return Promise.resolve([]); | ||||
|   } | ||||
| 
 | ||||
|   $getTransactionHex(txId: string): Promise<string> { | ||||
|     return this.$getRawTransaction(txId, true) | ||||
|       .then((tx) => tx.hex || ''); | ||||
|  | ||||
| @ -69,6 +69,10 @@ class ElectrsApi implements AbstractBitcoinApi { | ||||
|     return this.$queryWrapper<IEsploraApi.Transaction>(config.ESPLORA.REST_API_URL + '/tx/' + txId); | ||||
|   } | ||||
| 
 | ||||
|   async $getMempoolTransactions(lastSeenTxid?: string): Promise<IEsploraApi.Transaction[]> { | ||||
|     return this.$queryWrapper<IEsploraApi.Transaction[]>(config.ESPLORA.REST_API_URL + '/mempool/txs' + (lastSeenTxid ? '/' + lastSeenTxid : '')); | ||||
|   } | ||||
| 
 | ||||
|   $getTransactionHex(txId: string): Promise<string> { | ||||
|     return this.$queryWrapper<string>(config.ESPLORA.REST_API_URL + '/tx/' + txId + '/hex'); | ||||
|   } | ||||
|  | ||||
| @ -1,5 +1,5 @@ | ||||
| import config from '../config'; | ||||
| import bitcoinApi from './bitcoin/bitcoin-api-factory'; | ||||
| import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; | ||||
| import { MempoolTransactionExtended, TransactionExtended, VbytesPerSecond } from '../mempool.interfaces'; | ||||
| import logger from '../logger'; | ||||
| import { Common } from './common'; | ||||
| @ -9,6 +9,7 @@ import loadingIndicators from './loading-indicators'; | ||||
| import bitcoinClient from './bitcoin/bitcoin-client'; | ||||
| import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; | ||||
| import rbfCache from './rbf-cache'; | ||||
| import { IEsploraApi } from './bitcoin/esplora-api.interface'; | ||||
| 
 | ||||
| class Mempool { | ||||
|   private inSync: boolean = false; | ||||
| @ -103,6 +104,44 @@ class Mempool { | ||||
|     this.addToSpendMap(Object.values(this.mempoolCache)); | ||||
|   } | ||||
| 
 | ||||
|   public async $reloadMempool(expectedCount: number): Promise<MempoolTransactionExtended[]> { | ||||
|     let count = 0; | ||||
|     let done = false; | ||||
|     let last_txid; | ||||
|     const newTransactions: MempoolTransactionExtended[] = []; | ||||
|     loadingIndicators.setProgress('mempool', count / expectedCount * 100); | ||||
|     while (!done) { | ||||
|       try { | ||||
|         const result = await bitcoinApi.$getMempoolTransactions(last_txid); | ||||
|         if (result) { | ||||
|           for (const tx of result) { | ||||
|             const extendedTransaction = transactionUtils.extendMempoolTransaction(tx); | ||||
|             if (!this.mempoolCache[extendedTransaction.txid]) { | ||||
|               newTransactions.push(extendedTransaction); | ||||
|               this.mempoolCache[extendedTransaction.txid] = extendedTransaction; | ||||
|             } | ||||
|             count++; | ||||
|           } | ||||
|           logger.info(`Fetched ${count} of ${expectedCount} mempool transactions from esplora`); | ||||
|           if (result.length > 0) { | ||||
|             last_txid = result[result.length - 1].txid; | ||||
|           } else { | ||||
|             done = true; | ||||
|           } | ||||
|           if (Math.floor((count / expectedCount) * 100) < 100) { | ||||
|             loadingIndicators.setProgress('mempool', count / expectedCount * 100); | ||||
|           } | ||||
|         } else { | ||||
|           done = true; | ||||
|         } | ||||
|       } catch(err) { | ||||
|         logger.err('failed to fetch bulk mempool transactions from esplora'); | ||||
|       } | ||||
|     } | ||||
|     return newTransactions; | ||||
|     logger.info(`Done inserting loaded mempool transactions into local cache`); | ||||
|   } | ||||
| 
 | ||||
|   public async $updateMemPoolInfo() { | ||||
|     this.mempoolInfo = await this.$getMempoolInfo(); | ||||
|   } | ||||
| @ -143,7 +182,7 @@ class Mempool { | ||||
|     const currentMempoolSize = Object.keys(this.mempoolCache).length; | ||||
|     this.updateTimerProgress(timer, 'got raw mempool'); | ||||
|     const diff = transactions.length - currentMempoolSize; | ||||
|     const newTransactions: MempoolTransactionExtended[] = []; | ||||
|     let newTransactions: MempoolTransactionExtended[] = []; | ||||
| 
 | ||||
|     this.mempoolCacheDelta = Math.abs(diff); | ||||
| 
 | ||||
| @ -162,41 +201,57 @@ class Mempool { | ||||
|     }; | ||||
| 
 | ||||
|     let intervalTimer = Date.now(); | ||||
|     for (const txid of transactions) { | ||||
|       if (!this.mempoolCache[txid]) { | ||||
|         try { | ||||
|           const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); | ||||
|           this.updateTimerProgress(timer, 'fetched new transaction'); | ||||
|           this.mempoolCache[txid] = transaction; | ||||
|           if (this.inSync) { | ||||
|             this.txPerSecondArray.push(new Date().getTime()); | ||||
|             this.vBytesPerSecondArray.push({ | ||||
|               unixTime: new Date().getTime(), | ||||
|               vSize: transaction.vsize, | ||||
|             }); | ||||
|           } | ||||
|           hasChange = true; | ||||
|           newTransactions.push(transaction); | ||||
|         } catch (e: any) { | ||||
|           if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { | ||||
|             this.missingTxCount++; | ||||
|           } | ||||
|           logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       if (Date.now() - intervalTimer > 5_000) { | ||||
|          | ||||
|         if (this.inSync) { | ||||
|           // Break and restart mempool loop if we spend too much time processing
 | ||||
|           // new transactions that may lead to falling behind on block height
 | ||||
|           logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); | ||||
|           break; | ||||
|         } else { | ||||
|           const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; | ||||
|           logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); | ||||
|           loadingIndicators.setProgress('mempool', progress); | ||||
|           intervalTimer = Date.now() | ||||
|     let loaded = false; | ||||
|     if (config.MEMPOOL.BACKEND === 'esplora' && currentMempoolSize < transactions.length * 0.5 && transactions.length > 20_000) { | ||||
|       this.inSync = false; | ||||
|       logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); | ||||
|       try { | ||||
|         newTransactions = await this.$reloadMempool(transactions.length); | ||||
|         loaded = true; | ||||
|       } catch (e) { | ||||
|         logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (!loaded) { | ||||
|       for (const txid of transactions) { | ||||
|         if (!this.mempoolCache[txid]) { | ||||
|           try { | ||||
|             const transaction = await transactionUtils.$getMempoolTransactionExtended(txid, false, false, false); | ||||
|             this.updateTimerProgress(timer, 'fetched new transaction'); | ||||
|             this.mempoolCache[txid] = transaction; | ||||
|             if (this.inSync) { | ||||
|               this.txPerSecondArray.push(new Date().getTime()); | ||||
|               this.vBytesPerSecondArray.push({ | ||||
|                 unixTime: new Date().getTime(), | ||||
|                 vSize: transaction.vsize, | ||||
|               }); | ||||
|             } | ||||
|             hasChange = true; | ||||
|             newTransactions.push(transaction); | ||||
|           } catch (e: any) { | ||||
|             if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { | ||||
|               this.missingTxCount++; | ||||
|             } | ||||
|             logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e)); | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|         if (Date.now() - intervalTimer > 5_000) { | ||||
|           if (this.inSync) { | ||||
|             // Break and restart mempool loop if we spend too much time processing
 | ||||
|             // new transactions that may lead to falling behind on block height
 | ||||
|             logger.debug('Breaking mempool loop because the 5s time limit exceeded.'); | ||||
|             break; | ||||
|           } else { | ||||
|             const progress = (currentMempoolSize + newTransactions.length) / transactions.length * 100; | ||||
|             logger.debug(`Mempool is synchronizing. Processed ${newTransactions.length}/${diff} txs (${Math.round(progress)}%)`); | ||||
|             if (Math.floor(progress) < 100) { | ||||
|               loadingIndicators.setProgress('mempool', progress); | ||||
|             } | ||||
|             intervalTimer = Date.now() | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| @ -246,12 +301,6 @@ class Mempool { | ||||
|     const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx)); | ||||
|     this.latestTransactions = newTransactionsStripped.concat(this.latestTransactions).slice(0, 6); | ||||
| 
 | ||||
|     if (!this.inSync && transactions.length === newMempoolSize) { | ||||
|       this.inSync = true; | ||||
|       logger.notice('The mempool is now in sync!'); | ||||
|       loadingIndicators.setProgress('mempool', 100); | ||||
|     } | ||||
| 
 | ||||
|     this.mempoolCacheDelta = Math.abs(transactions.length - newMempoolSize); | ||||
| 
 | ||||
|     if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { | ||||
| @ -263,6 +312,12 @@ class Mempool { | ||||
|       this.updateTimerProgress(timer, 'completed async mempool callback'); | ||||
|     } | ||||
| 
 | ||||
|     if (!this.inSync && transactions.length === newMempoolSize) { | ||||
|       this.inSync = true; | ||||
|       logger.notice('The mempool is now in sync!'); | ||||
|       loadingIndicators.setProgress('mempool', 100); | ||||
|     } | ||||
| 
 | ||||
|     const end = new Date().getTime(); | ||||
|     const time = end - start; | ||||
|     logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`); | ||||
|  | ||||
| @ -604,7 +604,7 @@ class WebsocketHandler { | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       if (client['track-mempool-block'] >= 0) { | ||||
|       if (client['track-mempool-block'] >= 0 && memPool.isInSync()) { | ||||
|         const index = client['track-mempool-block']; | ||||
|         if (mBlockDeltas[index]) { | ||||
|           response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, { | ||||
| @ -644,7 +644,7 @@ class WebsocketHandler { | ||||
|     memPool.handleMinedRbfTransactions(rbfTransactions); | ||||
|     memPool.removeFromSpendMap(transactions); | ||||
| 
 | ||||
|     if (config.MEMPOOL.AUDIT) { | ||||
|     if (config.MEMPOOL.AUDIT && memPool.isInSync()) { | ||||
|       let projectedBlocks; | ||||
|       let auditMempool = _memPool; | ||||
|       // template calculation functions have mempool side effects, so calculate audits using
 | ||||
| @ -665,7 +665,7 @@ class WebsocketHandler { | ||||
|         projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); | ||||
|       } | ||||
| 
 | ||||
|       if (Common.indexingEnabled() && memPool.isInSync()) { | ||||
|       if (Common.indexingEnabled()) { | ||||
|         const { censored, added, fresh, sigop, fullrbf, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool); | ||||
|         const matchRate = Math.round(score * 100 * 100) / 100; | ||||
| 
 | ||||
| @ -858,7 +858,7 @@ class WebsocketHandler { | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       if (client['track-mempool-block'] >= 0) { | ||||
|       if (client['track-mempool-block'] >= 0 && memPool.isInSync()) { | ||||
|         const index = client['track-mempool-block']; | ||||
|         if (mBlockDeltas && mBlockDeltas[index]) { | ||||
|           response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, { | ||||
|  | ||||
| @ -117,7 +117,14 @@ export class MempoolBlocksComponent implements OnInit, OnChanges, OnDestroy { | ||||
|     }); | ||||
|     this.reduceMempoolBlocksToFitScreen(this.mempoolBlocks); | ||||
|     this.stateService.isTabHidden$.subscribe((tabHidden) => this.tabHidden = tabHidden); | ||||
|     this.loadingBlocks$ = this.stateService.isLoadingWebSocket$; | ||||
|     this.loadingBlocks$ = combineLatest([ | ||||
|       this.stateService.isLoadingWebSocket$, | ||||
|       this.stateService.isLoadingMempool$ | ||||
|     ]).pipe( | ||||
|       switchMap(([loadingBlocks, loadingMempool]) => { | ||||
|         return of(loadingBlocks || loadingMempool); | ||||
|       }) | ||||
|     ); | ||||
| 
 | ||||
|     this.mempoolBlocks$ = merge( | ||||
|       of(true), | ||||
|  | ||||
| @ -113,6 +113,7 @@ export class StateService { | ||||
|   mempoolTxPosition$ = new Subject<{ txid: string, position: MempoolPosition}>(); | ||||
|   blockTransactions$ = new Subject<Transaction>(); | ||||
|   isLoadingWebSocket$ = new ReplaySubject<boolean>(1); | ||||
|   isLoadingMempool$ = new BehaviorSubject<boolean>(true); | ||||
|   vbytesPerSecond$ = new ReplaySubject<number>(1); | ||||
|   previousRetarget$ = new ReplaySubject<number>(1); | ||||
|   backendInfo$ = new ReplaySubject<IBackendInfo>(1); | ||||
|  | ||||
| @ -368,6 +368,11 @@ export class WebsocketService { | ||||
| 
 | ||||
|     if (response.loadingIndicators) { | ||||
|       this.stateService.loadingIndicators$.next(response.loadingIndicators); | ||||
|       if (response.loadingIndicators.mempool != null && response.loadingIndicators.mempool < 100) { | ||||
|         this.stateService.isLoadingMempool$.next(true); | ||||
|       } else { | ||||
|         this.stateService.isLoadingMempool$.next(false); | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (response.mempoolInfo) { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user