Merge pull request #4183 from mempool/mononaut/attitude-adjustment
Don't overload core with mempool tx requests
This commit is contained in:
		
						commit
						24696d0408
					
				| @ -5,6 +5,7 @@ import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; | |||||||
| import * as bitcoinjs from 'bitcoinjs-lib'; | import * as bitcoinjs from 'bitcoinjs-lib'; | ||||||
| import logger from '../logger'; | import logger from '../logger'; | ||||||
| import config from '../config'; | import config from '../config'; | ||||||
|  | import pLimit from '../utils/p-limit'; | ||||||
| 
 | 
 | ||||||
| class TransactionUtils { | class TransactionUtils { | ||||||
|   constructor() { } |   constructor() { } | ||||||
| @ -74,8 +75,12 @@ class TransactionUtils { | |||||||
| 
 | 
 | ||||||
|   public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<MempoolTransactionExtended[]> { |   public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<MempoolTransactionExtended[]> { | ||||||
|     if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') { |     if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') { | ||||||
|       const results = await Promise.allSettled(txids.map(txid => this.$getTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore, true))); |       const limiter = pLimit(8); // Run 8 requests at a time
 | ||||||
|       return (results.filter(r => r.status === 'fulfilled') as PromiseFulfilledResult<MempoolTransactionExtended>[]).map(r => r.value); |       const results = await Promise.allSettled(txids.map( | ||||||
|  |         txid => limiter(() => this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore)) | ||||||
|  |       )); | ||||||
|  |       return results.filter(reply => reply.status === 'fulfilled') | ||||||
|  |         .map(r => (r as PromiseFulfilledResult<MempoolTransactionExtended>).value); | ||||||
|     } else { |     } else { | ||||||
|       const transactions = await bitcoinApi.$getMempoolTransactions(txids); |       const transactions = await bitcoinApi.$getMempoolTransactions(txids); | ||||||
|       return transactions.map(transaction => { |       return transactions.map(transaction => { | ||||||
|  | |||||||
							
								
								
									
										179
									
								
								backend/src/utils/p-limit.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										179
									
								
								backend/src/utils/p-limit.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,179 @@ | |||||||
|  | /* | ||||||
|  | MIT License | ||||||
|  | 
 | ||||||
|  | Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
 | ||||||
|  | 
 | ||||||
|  | Permission is hereby granted, free of charge, to any person obtaining a copy of this | ||||||
|  | software and associated documentation files (the "Software"), to deal in the Software | ||||||
|  | without restriction, including without limitation the rights to use, copy, modify, | ||||||
|  | merge, publish, distribute, sublicense, and/or sell copies of the Software, and to | ||||||
|  | permit persons to whom the Software is furnished to do so, subject to the following | ||||||
|  | conditions: | ||||||
|  | 
 | ||||||
|  | The above copyright notice and this permission notice shall be included in all copies | ||||||
|  | or substantial portions of the Software. | ||||||
|  | 
 | ||||||
|  | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | ||||||
|  | INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A | ||||||
|  | PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | ||||||
|  | HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF | ||||||
|  | CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE | ||||||
|  | OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | /* | ||||||
|  | How it works: | ||||||
|  | `this._head` is an instance of `Node` which keeps track of its current value and nests | ||||||
|  | another instance of `Node` that keeps the value that comes after it. When a value is | ||||||
|  | provided to `.enqueue()`, the code needs to iterate through `this._head`, going deeper | ||||||
|  | and deeper to find the last value. However, iterating through every single item is slow. | ||||||
|  | This problem is solved by saving a reference to the last value as `this._tail` so that | ||||||
|  | it can reference it to add a new value. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | class Node { | ||||||
|  |   value; | ||||||
|  |   next; | ||||||
|  | 
 | ||||||
|  |   constructor(value) { | ||||||
|  |     this.value = value; | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | class Queue { | ||||||
|  |   private _head; | ||||||
|  |   private _tail; | ||||||
|  |   private _size; | ||||||
|  | 
 | ||||||
|  |   constructor() { | ||||||
|  |     this.clear(); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   enqueue(value) { | ||||||
|  |     const node = new Node(value); | ||||||
|  | 
 | ||||||
|  |     if (this._head) { | ||||||
|  |       this._tail.next = node; | ||||||
|  |       this._tail = node; | ||||||
|  |     } else { | ||||||
|  |       this._head = node; | ||||||
|  |       this._tail = node; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     this._size++; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   dequeue() { | ||||||
|  |     const current = this._head; | ||||||
|  |     if (!current) { | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     this._head = this._head.next; | ||||||
|  |     this._size--; | ||||||
|  |     return current.value; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   clear() { | ||||||
|  |     this._head = undefined; | ||||||
|  |     this._tail = undefined; | ||||||
|  |     this._size = 0; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   get size() { | ||||||
|  |     return this._size; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   *[Symbol.iterator]() { | ||||||
|  |     let current = this._head; | ||||||
|  | 
 | ||||||
|  |     while (current) { | ||||||
|  |       yield current.value; | ||||||
|  |       current = current.next; | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | interface LimitFunction { | ||||||
|  |   readonly activeCount: number; | ||||||
|  |   readonly pendingCount: number; | ||||||
|  |   clearQueue: () => void; | ||||||
|  |   <Arguments extends unknown[], ReturnType>( | ||||||
|  |     fn: (...args: Arguments) => PromiseLike<ReturnType> | ReturnType, | ||||||
|  |     ...args: Arguments | ||||||
|  |   ): Promise<ReturnType>; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | export default function pLimit(concurrency: number): LimitFunction { | ||||||
|  |   if ( | ||||||
|  |     !( | ||||||
|  |       (Number.isInteger(concurrency) || | ||||||
|  |         concurrency === Number.POSITIVE_INFINITY) && | ||||||
|  |       concurrency > 0 | ||||||
|  |     ) | ||||||
|  |   ) { | ||||||
|  |     throw new TypeError('Expected `concurrency` to be a number from 1 and up'); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   const queue = new Queue(); | ||||||
|  |   let activeCount = 0; | ||||||
|  | 
 | ||||||
|  |   const next = () => { | ||||||
|  |     activeCount--; | ||||||
|  | 
 | ||||||
|  |     if (queue.size > 0) { | ||||||
|  |       queue.dequeue()(); | ||||||
|  |     } | ||||||
|  |   }; | ||||||
|  | 
 | ||||||
|  |   const run = async (fn, resolve, args) => { | ||||||
|  |     activeCount++; | ||||||
|  | 
 | ||||||
|  |     const result = (async () => fn(...args))(); | ||||||
|  | 
 | ||||||
|  |     resolve(result); | ||||||
|  | 
 | ||||||
|  |     try { | ||||||
|  |       await result; | ||||||
|  |     } catch {} | ||||||
|  | 
 | ||||||
|  |     next(); | ||||||
|  |   }; | ||||||
|  | 
 | ||||||
|  |   const enqueue = (fn, resolve, args) => { | ||||||
|  |     queue.enqueue(run.bind(undefined, fn, resolve, args)); | ||||||
|  | 
 | ||||||
|  |     (async () => { | ||||||
|  |       // This function needs to wait until the next microtask before comparing
 | ||||||
|  |       // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
 | ||||||
|  |       // when the run function is dequeued and called. The comparison in the if-statement
 | ||||||
|  |       // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
 | ||||||
|  |       await Promise.resolve(); | ||||||
|  | 
 | ||||||
|  |       if (activeCount < concurrency && queue.size > 0) { | ||||||
|  |         queue.dequeue()(); | ||||||
|  |       } | ||||||
|  |     })(); | ||||||
|  |   }; | ||||||
|  | 
 | ||||||
|  |   const generator = (fn, ...args) => | ||||||
|  |     new Promise((resolve) => { | ||||||
|  |       enqueue(fn, resolve, args); | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|  |   Object.defineProperties(generator, { | ||||||
|  |     activeCount: { | ||||||
|  |       get: () => activeCount, | ||||||
|  |     }, | ||||||
|  |     pendingCount: { | ||||||
|  |       get: () => queue.size, | ||||||
|  |     }, | ||||||
|  |     clearQueue: { | ||||||
|  |       value: () => { | ||||||
|  |         queue.clear(); | ||||||
|  |       }, | ||||||
|  |     }, | ||||||
|  |   }); | ||||||
|  | 
 | ||||||
|  |   return generator as any; | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user