[accelerator] on-demand polling support
This commit is contained in:
		
							parent
							
								
									b0db348605
								
							
						
					
					
						commit
						b49a6c4cac
					
				| @ -396,10 +396,6 @@ class Mempool { | ||||
|   } | ||||
| 
 | ||||
|   public $updateAccelerations(newAccelerations: Acceleration[]): string[] { | ||||
|     if (!config.MEMPOOL_SERVICES.ACCELERATIONS) { | ||||
|       return []; | ||||
|     } | ||||
| 
 | ||||
|     try { | ||||
|       const changed: string[] = []; | ||||
| 
 | ||||
|  | ||||
| @ -9,6 +9,7 @@ import bitcoinClient from '../bitcoin/bitcoin-client'; | ||||
| import mining from "./mining"; | ||||
| import PricesRepository from '../../repositories/PricesRepository'; | ||||
| import AccelerationRepository from '../../repositories/AccelerationRepository'; | ||||
| import accelerationApi from '../services/acceleration'; | ||||
| 
 | ||||
| class MiningRoutes { | ||||
|   public initRoutes(app: Application) { | ||||
| @ -41,6 +42,8 @@ class MiningRoutes { | ||||
|       .get(config.MEMPOOL.API_URL_PREFIX + 'accelerations/block/:height', this.$getAccelerationsByHeight) | ||||
|       .get(config.MEMPOOL.API_URL_PREFIX + 'accelerations/recent/:interval', this.$getRecentAccelerations) | ||||
|       .get(config.MEMPOOL.API_URL_PREFIX + 'accelerations/total', this.$getAccelerationTotals) | ||||
|       .get(config.MEMPOOL.API_URL_PREFIX + 'accelerations', this.$getActiveAccelerations) | ||||
|       .post(config.MEMPOOL.API_URL_PREFIX + 'acceleration/request/:txid', this.$requestAcceleration) | ||||
|     ; | ||||
|   } | ||||
| 
 | ||||
| @ -445,6 +448,37 @@ class MiningRoutes { | ||||
|       res.status(500).send(e instanceof Error ? e.message : e); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $getActiveAccelerations(req: Request, res: Response): Promise<void> { | ||||
|     try { | ||||
|       res.header('Pragma', 'public'); | ||||
|       res.header('Cache-control', 'public'); | ||||
|       res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString()); | ||||
|       if (!config.MEMPOOL_SERVICES.ACCELERATIONS || ['testnet', 'signet', 'liquidtestnet', 'liquid'].includes(config.MEMPOOL.NETWORK)) { | ||||
|         res.status(400).send('Acceleration data is not available.'); | ||||
|         return; | ||||
|       } | ||||
|       res.status(200).send(accelerationApi.accelerations || []); | ||||
|     } catch (e) { | ||||
|       res.status(500).send(e instanceof Error ? e.message : e); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $requestAcceleration(req: Request, res: Response): Promise<void> { | ||||
|     if (config.MEMPOOL_SERVICES.ACCELERATIONS || config.MEMPOOL.OFFICIAL) { | ||||
|       res.status(405).send('not available.'); | ||||
|       return; | ||||
|     } | ||||
|     res.setHeader('Pragma', 'no-cache'); | ||||
|     res.setHeader('Cache-control', 'private, no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0'); | ||||
|     res.setHeader('expires', -1); | ||||
|     try { | ||||
|       accelerationApi.accelerationRequested(req.params.txid); | ||||
|       res.status(200).send('ok'); | ||||
|     } catch (e) { | ||||
|       res.status(500).send(e instanceof Error ? e.message : e); | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| export default new MiningRoutes(); | ||||
|  | ||||
| @ -1,8 +1,10 @@ | ||||
| import config from '../../config'; | ||||
| import logger from '../../logger'; | ||||
| import { BlockExtended, PoolTag } from '../../mempool.interfaces'; | ||||
| import { BlockExtended } from '../../mempool.interfaces'; | ||||
| import axios from 'axios'; | ||||
| 
 | ||||
| type MyAccelerationStatus = 'requested' | 'accelerating' | 'done'; | ||||
| 
 | ||||
| export interface Acceleration { | ||||
|   txid: string, | ||||
|   added: number, | ||||
| @ -35,18 +37,88 @@ export interface AccelerationHistory { | ||||
| }; | ||||
| 
 | ||||
| class AccelerationApi { | ||||
|   public async $fetchAccelerations(): Promise<Acceleration[] | null> { | ||||
|   private apiPath = config.MEMPOOL.OFFICIAL ? (config.MEMPOOL_SERVICES.API + '/accelerator/accelerations') : (config.EXTERNAL_DATA_SERVER.MEMPOOL_API + '/accelerations'); | ||||
|   private _accelerations: Acceleration[] | null = null; | ||||
|   private lastPoll = 0; | ||||
|   private forcePoll = false; | ||||
|   private myAccelerations: Record<string, { status: MyAccelerationStatus, added: number, acceleration?: Acceleration }> = {}; | ||||
| 
 | ||||
|   public get accelerations(): Acceleration[] | null { | ||||
|     return this._accelerations; | ||||
|   } | ||||
| 
 | ||||
|   public countMyAccelerationsWithStatus(filter: MyAccelerationStatus): number { | ||||
|     return Object.values(this.myAccelerations).reduce((count, {status}) => { return count + (status === filter ? 1 : 0); }, 0); | ||||
|   } | ||||
| 
 | ||||
|   public accelerationRequested(txid: string): void { | ||||
|     this.myAccelerations[txid] = { status: 'requested', added: Date.now() }; | ||||
|   } | ||||
| 
 | ||||
|   public accelerationConfirmed(): void { | ||||
|     this.forcePoll = true; | ||||
|   } | ||||
| 
 | ||||
|   private async $fetchAccelerations(): Promise<Acceleration[] | null> { | ||||
|     try { | ||||
|       const response = await axios.get(this.apiPath, { responseType: 'json', timeout: 10000 }); | ||||
|       return response?.data || []; | ||||
|     } catch (e) { | ||||
|       logger.warn('Failed to fetch current accelerations from the mempool services backend: ' + (e instanceof Error ? e.message : e)); | ||||
|       return null; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public async $updateAccelerations(): Promise<Acceleration[] | null> { | ||||
|     if (config.MEMPOOL_SERVICES.ACCELERATIONS) { | ||||
|       try { | ||||
|         const response = await axios.get(`${config.MEMPOOL_SERVICES.API}/accelerator/accelerations`, { responseType: 'json', timeout: 10000 }); | ||||
|         return response.data as Acceleration[]; | ||||
|       } catch (e) { | ||||
|         logger.warn('Failed to fetch current accelerations from the mempool services backend: ' + (e instanceof Error ? e.message : e)); | ||||
|         return null; | ||||
|       const accelerations = await this.$fetchAccelerations(); | ||||
|       if (accelerations) { | ||||
|         this._accelerations = accelerations; | ||||
|         return this._accelerations; | ||||
|       } | ||||
|     } else { | ||||
|       return []; | ||||
|       return this.$updateAccelerationsOnDemand(); | ||||
|     } | ||||
|     return null; | ||||
|   } | ||||
| 
 | ||||
|   private async $updateAccelerationsOnDemand(): Promise<Acceleration[] | null> { | ||||
|     const shouldUpdate = this.forcePoll | ||||
|       || this.countMyAccelerationsWithStatus('requested') > 0 | ||||
|       || (this.countMyAccelerationsWithStatus('accelerating') > 0 && this.lastPoll < (Date.now() - (10 * 60 * 1000))); | ||||
| 
 | ||||
|     // update accelerations if necessary
 | ||||
|     if (shouldUpdate) { | ||||
|       const accelerations = await this.$fetchAccelerations(); | ||||
|       this.lastPoll = Date.now(); | ||||
|       this.forcePoll = false; | ||||
|       if (accelerations) { | ||||
|         const latestAccelerations: Record<string, Acceleration> = {}; | ||||
|         // set relevant accelerations to 'accelerating'
 | ||||
|         for (const acc of accelerations) { | ||||
|           if (this.myAccelerations[acc.txid]) { | ||||
|             latestAccelerations[acc.txid] = acc; | ||||
|             this.myAccelerations[acc.txid] = { status: 'accelerating', added: Date.now(), acceleration: acc }; | ||||
|           } | ||||
|         } | ||||
|         // txs that are no longer accelerating are either confirmed or canceled, so mark for expiry
 | ||||
|         for (const [txid, { status, acceleration }] of Object.entries(this.myAccelerations)) { | ||||
|           if (status === 'accelerating' && !latestAccelerations[txid]) { | ||||
|             this.myAccelerations[txid] = { status: 'done', added: Date.now(), acceleration }; | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     // clear expired accelerations (confirmed / failed / not accepted) after 10 minutes
 | ||||
|     for (const [txid, { status, added }] of Object.entries(this.myAccelerations)) { | ||||
|       if (['requested', 'done'].includes(status) && added < (Date.now() - (1000 * 60 * 10))) { | ||||
|         delete this.myAccelerations[txid]; | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     this._accelerations = Object.values(this.myAccelerations).map(({ acceleration }) => acceleration).filter(acc => acc) as Acceleration[]; | ||||
|     return this._accelerations; | ||||
|   } | ||||
| 
 | ||||
|   public async $fetchAccelerationHistory(page?: number, status?: string): Promise<AccelerationHistory[] | null> { | ||||
|  | ||||
| @ -538,9 +538,9 @@ class WebsocketHandler { | ||||
|     } | ||||
| 
 | ||||
|     if (config.MEMPOOL.RUST_GBT) { | ||||
|       await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, newMempool, added, removed, candidates, config.MEMPOOL_SERVICES.ACCELERATIONS); | ||||
|       await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, newMempool, added, removed, candidates, true); | ||||
|     } else { | ||||
|       await mempoolBlocks.$updateBlockTemplates(transactionIds, newMempool, added, removed, candidates, accelerationDelta, true, config.MEMPOOL_SERVICES.ACCELERATIONS); | ||||
|       await mempoolBlocks.$updateBlockTemplates(transactionIds, newMempool, added, removed, candidates, accelerationDelta, true, true); | ||||
|     } | ||||
| 
 | ||||
|     const mBlocks = mempoolBlocks.getMempoolBlocks(); | ||||
| @ -949,18 +949,14 @@ class WebsocketHandler { | ||||
|     if (config.MEMPOOL.AUDIT && memPool.isInSync()) { | ||||
|       let projectedBlocks; | ||||
|       const auditMempool = _memPool; | ||||
|       const isAccelerated = config.MEMPOOL_SERVICES.ACCELERATIONS && accelerationApi.isAcceleratedBlock(block, Object.values(mempool.getAccelerations())); | ||||
|       const isAccelerated = accelerationApi.isAcceleratedBlock(block, Object.values(mempool.getAccelerations())); | ||||
| 
 | ||||
|       if ((config.MEMPOOL_SERVICES.ACCELERATIONS)) { | ||||
|         if (config.MEMPOOL.RUST_GBT) { | ||||
|           const added = memPool.limitGBT ? (candidates?.added || []) : []; | ||||
|           const removed = memPool.limitGBT ? (candidates?.removed || []) : []; | ||||
|           projectedBlocks = await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, auditMempool, added, removed, candidates, isAccelerated, block.extras.pool.id); | ||||
|         } else { | ||||
|           projectedBlocks = await mempoolBlocks.$makeBlockTemplates(transactionIds, auditMempool, candidates, false, isAccelerated, block.extras.pool.id); | ||||
|         } | ||||
|       if (config.MEMPOOL.RUST_GBT) { | ||||
|         const added = memPool.limitGBT ? (candidates?.added || []) : []; | ||||
|         const removed = memPool.limitGBT ? (candidates?.removed || []) : []; | ||||
|         projectedBlocks = await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, auditMempool, added, removed, candidates, isAccelerated, block.extras.pool.id); | ||||
|       } else { | ||||
|         projectedBlocks = mempoolBlocks.getMempoolBlocksWithTransactions(); | ||||
|         projectedBlocks = await mempoolBlocks.$makeBlockTemplates(transactionIds, auditMempool, candidates, false, isAccelerated, block.extras.pool.id); | ||||
|       } | ||||
| 
 | ||||
|       if (Common.indexingEnabled()) { | ||||
| @ -1040,7 +1036,7 @@ class WebsocketHandler { | ||||
|       const removed = memPool.limitGBT ? (candidates?.removed || []) : transactions; | ||||
|       await mempoolBlocks.$rustUpdateBlockTemplates(transactionIds, _memPool, added, removed, candidates, true); | ||||
|     } else { | ||||
|       await mempoolBlocks.$makeBlockTemplates(transactionIds, _memPool, candidates, true, config.MEMPOOL_SERVICES.ACCELERATIONS); | ||||
|       await mempoolBlocks.$makeBlockTemplates(transactionIds, _memPool, candidates, true, true); | ||||
|     } | ||||
|     const mBlocks = mempoolBlocks.getMempoolBlocks(); | ||||
|     const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); | ||||
|  | ||||
| @ -229,7 +229,7 @@ class Server { | ||||
|       const newMempool = await bitcoinApi.$getRawMempool(); | ||||
|       const minFeeMempool = memPool.limitGBT ? await bitcoinSecondClient.getRawMemPool() : null; | ||||
|       const minFeeTip = memPool.limitGBT ? await bitcoinSecondClient.getBlockCount() : -1; | ||||
|       const newAccelerations = await accelerationApi.$fetchAccelerations(); | ||||
|       const newAccelerations = await accelerationApi.$updateAccelerations(); | ||||
|       const numHandledBlocks = await blocks.$updateBlocks(); | ||||
|       const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerIsRunning() ? 10 : 1); | ||||
|       if (numHandledBlocks === 0) { | ||||
|  | ||||
| @ -213,6 +213,15 @@ class AccelerationRepository { | ||||
|         this.$saveAcceleration(accelerationInfo, block, block.extras.pool.id, successfulAccelerations); | ||||
|       } | ||||
|     } | ||||
|     let anyConfirmed = false; | ||||
|     for (const acc of accelerations) { | ||||
|       if (blockTxs[acc.txid]) { | ||||
|         anyConfirmed = true; | ||||
|       } | ||||
|     } | ||||
|     if (anyConfirmed) { | ||||
|       accelerationApi.accelerationConfirmed(); | ||||
|     } | ||||
|     const lastSyncedHeight = await this.$getLastSyncedHeight(); | ||||
|     // if we've missed any blocks, let the indexer catch up from the last synced height on the next run
 | ||||
|     if (block.height === lastSyncedHeight + 1) { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user