Merge pull request #4632 from mempool/mononaut/redis-error-handling
Handle Redis errors and disconnects
This commit is contained in:
		
						commit
						035068a72e
					
				| @ -19,45 +19,90 @@ class RedisCache { | ||||
|   private client; | ||||
|   private connected = false; | ||||
|   private schemaVersion = 1; | ||||
|   private redisConfig: any; | ||||
| 
 | ||||
|   private pauseFlush: boolean = false; | ||||
|   private cacheQueue: MempoolTransactionExtended[] = []; | ||||
|   private removeQueue: string[] = []; | ||||
|   private rbfCacheQueue: { type: string, txid: string, value: any }[] = []; | ||||
|   private rbfRemoveQueue: { type: string, txid: string }[] = []; | ||||
|   private txFlushLimit: number = 10000; | ||||
| 
 | ||||
|   constructor() { | ||||
|     if (config.REDIS.ENABLED) { | ||||
|       const redisConfig = { | ||||
|       this.redisConfig = { | ||||
|         socket: { | ||||
|           path: config.REDIS.UNIX_SOCKET_PATH | ||||
|         }, | ||||
|         database: NetworkDB[config.MEMPOOL.NETWORK], | ||||
|       }; | ||||
|       this.client = createClient(redisConfig); | ||||
|       this.client.on('error', (e) => { | ||||
|         logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); | ||||
|       }); | ||||
|       this.$ensureConnected(); | ||||
|       setInterval(() => { this.$ensureConnected(); }, 10000); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $ensureConnected(): Promise<void> { | ||||
|   private async $ensureConnected(): Promise<boolean> { | ||||
|     if (!this.connected && config.REDIS.ENABLED) { | ||||
|       return this.client.connect().then(async () => { | ||||
|         this.connected = true; | ||||
|         logger.info(`Redis client connected`); | ||||
|         const version = await this.client.get('schema_version'); | ||||
|         if (version !== this.schemaVersion) { | ||||
|           // schema changed
 | ||||
|           // perform migrations or flush DB if necessary
 | ||||
|           logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); | ||||
|           await this.client.set('schema_version', this.schemaVersion); | ||||
|         } | ||||
|       }); | ||||
|       try { | ||||
|         this.client = createClient(this.redisConfig); | ||||
|         this.client.on('error', async (e) => { | ||||
|           logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); | ||||
|           this.connected = false; | ||||
|           await this.client.disconnect(); | ||||
|         }); | ||||
|         await this.client.connect().then(async () => { | ||||
|           try { | ||||
|             const version = await this.client.get('schema_version'); | ||||
|             this.connected = true; | ||||
|             if (version !== this.schemaVersion) { | ||||
|               // schema changed
 | ||||
|               // perform migrations or flush DB if necessary
 | ||||
|               logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); | ||||
|               await this.client.set('schema_version', this.schemaVersion); | ||||
|             } | ||||
|             logger.info(`Redis client connected`); | ||||
|             return true; | ||||
|           } catch (e) { | ||||
|             this.connected = false; | ||||
|             logger.warn('Failed to connect to Redis'); | ||||
|             return false; | ||||
|           } | ||||
|         }); | ||||
|         await this.$onConnected(); | ||||
|         return true; | ||||
|       } catch (e) { | ||||
|         logger.warn('Error connecting to Redis: ' + (e instanceof Error ? e.message : e)); | ||||
|         return false; | ||||
|       } | ||||
|     } else { | ||||
|       try { | ||||
|         // test connection
 | ||||
|         await this.client.get('schema_version'); | ||||
|         return true; | ||||
|       } catch (e) { | ||||
|         logger.warn('Lost connection to Redis: ' + (e instanceof Error ? e.message : e)); | ||||
|         logger.warn('Attempting to reconnect in 10 seconds'); | ||||
|         this.connected = false; | ||||
|         return false; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $updateBlocks(blocks: BlockExtended[]) { | ||||
|   private async $onConnected(): Promise<void> { | ||||
|     await this.$flushTransactions(); | ||||
|     await this.$removeTransactions([]); | ||||
|     await this.$flushRbfQueues(); | ||||
|   } | ||||
| 
 | ||||
|   async $updateBlocks(blocks: BlockExtended[]): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to update blocks in Redis cache: Redis is not connected`); | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       await this.client.set('blocks', JSON.stringify(blocks)); | ||||
|       logger.debug(`Saved latest blocks to Redis cache`); | ||||
|     } catch (e) { | ||||
| @ -65,9 +110,15 @@ class RedisCache { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $updateBlockSummaries(summaries: BlockSummary[]) { | ||||
|   async $updateBlockSummaries(summaries: BlockSummary[]): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to update block summaries in Redis cache: Redis is not connected`); | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       await this.client.set('block-summaries', JSON.stringify(summaries)); | ||||
|       logger.debug(`Saved latest block summaries to Redis cache`); | ||||
|     } catch (e) { | ||||
| @ -75,30 +126,35 @@ class RedisCache { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $addTransaction(tx: MempoolTransactionExtended) { | ||||
|   async $addTransaction(tx: MempoolTransactionExtended): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     this.cacheQueue.push(tx); | ||||
|     if (this.cacheQueue.length >= this.txFlushLimit) { | ||||
|       await this.$flushTransactions(); | ||||
|       if (!this.pauseFlush) { | ||||
|         await this.$flushTransactions(); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $flushTransactions() { | ||||
|     const success = await this.$addTransactions(this.cacheQueue); | ||||
|     if (success) { | ||||
|       logger.debug(`Saved ${this.cacheQueue.length} transactions to Redis cache`); | ||||
|       this.cacheQueue = []; | ||||
|     } else { | ||||
|       logger.err(`Failed to save ${this.cacheQueue.length} transactions to Redis cache`); | ||||
|   async $flushTransactions(): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.cacheQueue.length) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to add ${this.cacheQueue.length} transactions to Redis cache: Redis not connected`); | ||||
|       return; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise<boolean> { | ||||
|     if (!newTransactions.length) { | ||||
|       return true; | ||||
|     } | ||||
|     this.pauseFlush = false; | ||||
| 
 | ||||
|     const toAdd = this.cacheQueue.slice(0, this.txFlushLimit); | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       const msetData = newTransactions.map(tx => { | ||||
|       const msetData = toAdd.map(tx => { | ||||
|         const minified: any = { ...tx }; | ||||
|         delete minified.hex; | ||||
|         for (const vin of minified.vin) { | ||||
| @ -112,30 +168,53 @@ class RedisCache { | ||||
|         return [`mempool:tx:${tx.txid}`, JSON.stringify(minified)]; | ||||
|       }); | ||||
|       await this.client.MSET(msetData); | ||||
|       return true; | ||||
|       // successful, remove transactions from cache queue
 | ||||
|       this.cacheQueue = this.cacheQueue.slice(toAdd.length); | ||||
|       logger.debug(`Saved ${toAdd.length} transactions to Redis cache, ${this.cacheQueue.length} left in queue`); | ||||
|     } catch (e) { | ||||
|       logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
|       return false; | ||||
|       logger.warn(`Failed to add ${toAdd.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
|       this.pauseFlush = true; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $removeTransactions(transactions: string[]) { | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|   async $removeTransactions(transactions: string[]): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     const toRemove = this.removeQueue.concat(transactions); | ||||
|     this.removeQueue = []; | ||||
|     let failed: string[] = []; | ||||
|     let numRemoved = 0; | ||||
|     if (this.connected) { | ||||
|       const sliceLength = config.REDIS.BATCH_QUERY_BASE_SIZE; | ||||
|       for (let i = 0; i < Math.ceil(transactions.length / sliceLength); i++) { | ||||
|         const slice = transactions.slice(i * sliceLength, (i + 1) * sliceLength); | ||||
|         await this.client.unlink(slice.map(txid => `mempool:tx:${txid}`)); | ||||
|         logger.debug(`Deleted ${slice.length} transactions from the Redis cache`); | ||||
|       for (let i = 0; i < Math.ceil(toRemove.length / sliceLength); i++) { | ||||
|         const slice = toRemove.slice(i * sliceLength, (i + 1) * sliceLength); | ||||
|         try { | ||||
|           await this.client.unlink(slice.map(txid => `mempool:tx:${txid}`)); | ||||
|           numRemoved+= sliceLength; | ||||
|           logger.debug(`Deleted ${slice.length} transactions from the Redis cache`); | ||||
|         } catch (e) { | ||||
|           logger.warn(`Failed to remove ${slice.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
|           failed = failed.concat(slice); | ||||
|         } | ||||
|       } | ||||
|     } catch (e) { | ||||
|       logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
|       // concat instead of replace, in case more txs have been added in the meantime
 | ||||
|       this.removeQueue = this.removeQueue.concat(failed); | ||||
|     } else { | ||||
|       this.removeQueue = this.removeQueue.concat(toRemove); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $setRbfEntry(type: string, txid: string, value: any): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       this.rbfCacheQueue.push({ type, txid, value }); | ||||
|       logger.warn(`Failed to set RBF ${type} in Redis cache: Redis is not connected`); | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       await this.client.set(`rbf:${type}:${txid}`, JSON.stringify(value)); | ||||
|     } catch (e) { | ||||
|       logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
| @ -143,17 +222,55 @@ class RedisCache { | ||||
|   } | ||||
| 
 | ||||
|   async $removeRbfEntry(type: string, txid: string): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       this.rbfRemoveQueue.push({ type, txid }); | ||||
|       logger.warn(`Failed to remove RBF ${type} from Redis cache: Redis is not connected`); | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       await this.client.unlink(`rbf:${type}:${txid}`); | ||||
|     } catch (e) { | ||||
|       logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $getBlocks(): Promise<BlockExtended[]> { | ||||
|   private async $flushRbfQueues(): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       return; | ||||
|     } | ||||
|     try { | ||||
|       const toAdd = this.rbfCacheQueue; | ||||
|       this.rbfCacheQueue = []; | ||||
|       for (const { type, txid, value } of toAdd) { | ||||
|         await this.$setRbfEntry(type, txid, value); | ||||
|       } | ||||
|       logger.debug(`Saved ${toAdd.length} queued RBF entries to the Redis cache`); | ||||
|       const toRemove = this.rbfRemoveQueue; | ||||
|       this.rbfRemoveQueue = []; | ||||
|       for (const { type, txid } of toRemove) { | ||||
|         await this.$removeRbfEntry(type, txid); | ||||
|       } | ||||
|       logger.debug(`Removed ${toRemove.length} queued RBF entries from the Redis cache`); | ||||
|     } catch (e) { | ||||
|       logger.warn(`Failed to flush RBF cache event queues after reconnecting to Redis: ${e instanceof Error ? e.message : e}`); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $getBlocks(): Promise<BlockExtended[]> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return []; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to retrieve blocks from Redis cache: Redis is not connected`); | ||||
|       return []; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       const json = await this.client.get('blocks'); | ||||
|       return JSON.parse(json); | ||||
|     } catch (e) { | ||||
| @ -163,8 +280,14 @@ class RedisCache { | ||||
|   } | ||||
| 
 | ||||
|   async $getBlockSummaries(): Promise<BlockSummary[]> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return []; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to retrieve blocks from Redis cache: Redis is not connected`); | ||||
|       return []; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       const json = await this.client.get('block-summaries'); | ||||
|       return JSON.parse(json); | ||||
|     } catch (e) { | ||||
| @ -174,10 +297,16 @@ class RedisCache { | ||||
|   } | ||||
| 
 | ||||
|   async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return {}; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to retrieve mempool from Redis cache: Redis is not connected`); | ||||
|       return {}; | ||||
|     } | ||||
|     const start = Date.now(); | ||||
|     const mempool = {}; | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       const mempoolList = await this.scanKeys<MempoolTransactionExtended>('mempool:tx:*'); | ||||
|       for (const tx of mempoolList) { | ||||
|         mempool[tx.key] = tx.value; | ||||
| @ -191,8 +320,14 @@ class RedisCache { | ||||
|   } | ||||
| 
 | ||||
|   async $getRbfEntries(type: string): Promise<any[]> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return []; | ||||
|     } | ||||
|     if (!this.connected) { | ||||
|       logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: Redis is not connected`); | ||||
|       return []; | ||||
|     } | ||||
|     try { | ||||
|       await this.$ensureConnected(); | ||||
|       const rbfEntries = await this.scanKeys<MempoolTransactionExtended[]>(`rbf:${type}:*`); | ||||
|       return rbfEntries; | ||||
|     } catch (e) { | ||||
| @ -201,7 +336,10 @@ class RedisCache { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $loadCache() { | ||||
|   async $loadCache(): Promise<void> { | ||||
|     if (!config.REDIS.ENABLED) { | ||||
|       return; | ||||
|     } | ||||
|     logger.info('Restoring mempool and blocks data from Redis cache'); | ||||
|     // Load block data
 | ||||
|     const loadedBlocks = await this.$getBlocks(); | ||||
| @ -226,7 +364,7 @@ class RedisCache { | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }) { | ||||
|   private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }): void { | ||||
|     for (const tx of Object.values(mempool)) { | ||||
|       for (const vin of tx.vin) { | ||||
|         if (vin.scriptsig) { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user