From ba8ea7d0d158756342a2f4503c4e9e5f3bdc27fd Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 30 Jan 2024 00:38:38 +0000 Subject: [PATCH] Handle Redis errors and disconnects --- backend/src/api/redis-cache.ts | 254 +++++++++++++++++++++++++-------- 1 file changed, 196 insertions(+), 58 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index edfd2142b..d19d73a7f 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -19,45 +19,90 @@ class RedisCache { private client; private connected = false; private schemaVersion = 1; + private redisConfig: any; + private pauseFlush: boolean = false; private cacheQueue: MempoolTransactionExtended[] = []; + private removeQueue: string[] = []; + private rbfCacheQueue: { type: string, txid: string, value: any }[] = []; + private rbfRemoveQueue: { type: string, txid: string }[] = []; private txFlushLimit: number = 10000; constructor() { if (config.REDIS.ENABLED) { - const redisConfig = { + this.redisConfig = { socket: { path: config.REDIS.UNIX_SOCKET_PATH }, database: NetworkDB[config.MEMPOOL.NETWORK], }; - this.client = createClient(redisConfig); - this.client.on('error', (e) => { - logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); - }); this.$ensureConnected(); + setInterval(() => { this.$ensureConnected(); }, 10000); } } - private async $ensureConnected(): Promise { + private async $ensureConnected(): Promise { if (!this.connected && config.REDIS.ENABLED) { - return this.client.connect().then(async () => { - this.connected = true; - logger.info(`Redis client connected`); - const version = await this.client.get('schema_version'); - if (version !== this.schemaVersion) { - // schema changed - // perform migrations or flush DB if necessary - logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); - await this.client.set('schema_version', this.schemaVersion); - } - }); + try { + this.client = createClient(this.redisConfig); + this.client.on('error', async (e) => { + logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); + this.connected = false; + await this.client.disconnect(); + }); + await this.client.connect().then(async () => { + try { + const version = await this.client.get('schema_version'); + this.connected = true; + if (version !== this.schemaVersion) { + // schema changed + // perform migrations or flush DB if necessary + logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); + await this.client.set('schema_version', this.schemaVersion); + } + logger.info(`Redis client connected`); + return true; + } catch (e) { + this.connected = false; + logger.warn('Failed to connect to Redis'); + return false; + } + }); + await this.$onConnected(); + return true; + } catch (e) { + logger.warn('Error connecting to Redis: ' + (e instanceof Error ? e.message : e)); + return false; + } + } else { + try { + // test connection + await this.client.get('schema_version'); + return true; + } catch (e) { + logger.warn('Lost connection to Redis: ' + (e instanceof Error ? e.message : e)); + logger.warn('Attempting to reconnect in 10 seconds'); + this.connected = false; + return false; + } } } - async $updateBlocks(blocks: BlockExtended[]) { + private async $onConnected(): Promise { + await this.$flushTransactions(); + await this.$removeTransactions([]); + await this.$flushRbfQueues(); + } + + async $updateBlocks(blocks: BlockExtended[]): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.connected) { + logger.warn(`Failed to update blocks in Redis cache: Redis is not connected`); + return; + } try { - await this.$ensureConnected(); await this.client.set('blocks', JSON.stringify(blocks)); logger.debug(`Saved latest blocks to Redis cache`); } catch (e) { @@ -65,9 +110,15 @@ class RedisCache { } } - async $updateBlockSummaries(summaries: BlockSummary[]) { + async $updateBlockSummaries(summaries: BlockSummary[]): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.connected) { + logger.warn(`Failed to update block summaries in Redis cache: Redis is not connected`); + return; + } try { - await this.$ensureConnected(); await this.client.set('block-summaries', JSON.stringify(summaries)); logger.debug(`Saved latest block summaries to Redis cache`); } catch (e) { @@ -75,30 +126,35 @@ class RedisCache { } } - async $addTransaction(tx: MempoolTransactionExtended) { + async $addTransaction(tx: MempoolTransactionExtended): Promise { + if (!config.REDIS.ENABLED) { + return; + } this.cacheQueue.push(tx); if (this.cacheQueue.length >= this.txFlushLimit) { - await this.$flushTransactions(); + if (!this.pauseFlush) { + await this.$flushTransactions(); + } } } - async $flushTransactions() { - const success = await this.$addTransactions(this.cacheQueue); - if (success) { - logger.debug(`Saved ${this.cacheQueue.length} transactions to Redis cache`); - this.cacheQueue = []; - } else { - logger.err(`Failed to save ${this.cacheQueue.length} transactions to Redis cache`); + async $flushTransactions(): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.cacheQueue.length) { + return; + } + if (!this.connected) { + logger.warn(`Failed to add ${this.cacheQueue.length} transactions to Redis cache: Redis not connected`); + return; } - } - private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { - if (!newTransactions.length) { - return true; - } + this.pauseFlush = false; + + const toAdd = this.cacheQueue.slice(0, this.txFlushLimit); try { - await this.$ensureConnected(); - const msetData = newTransactions.map(tx => { + const msetData = toAdd.map(tx => { const minified: any = { ...tx }; delete minified.hex; for (const vin of minified.vin) { @@ -112,30 +168,53 @@ class RedisCache { return [`mempool:tx:${tx.txid}`, JSON.stringify(minified)]; }); await this.client.MSET(msetData); - return true; + // successful, remove transactions from cache queue + this.cacheQueue = this.cacheQueue.slice(toAdd.length); + logger.debug(`Saved ${toAdd.length} transactions to Redis cache, ${this.cacheQueue.length} left in queue`); } catch (e) { - logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); - return false; + logger.warn(`Failed to add ${toAdd.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); + this.pauseFlush = true; } } - async $removeTransactions(transactions: string[]) { - try { - await this.$ensureConnected(); + async $removeTransactions(transactions: string[]): Promise { + if (!config.REDIS.ENABLED) { + return; + } + const toRemove = this.removeQueue.concat(transactions); + this.removeQueue = []; + let failed: string[] = []; + let numRemoved = 0; + if (this.connected) { const sliceLength = config.REDIS.BATCH_QUERY_BASE_SIZE; - for (let i = 0; i < Math.ceil(transactions.length / sliceLength); i++) { - const slice = transactions.slice(i * sliceLength, (i + 1) * sliceLength); - await this.client.unlink(slice.map(txid => `mempool:tx:${txid}`)); - logger.debug(`Deleted ${slice.length} transactions from the Redis cache`); + for (let i = 0; i < Math.ceil(toRemove.length / sliceLength); i++) { + const slice = toRemove.slice(i * sliceLength, (i + 1) * sliceLength); + try { + await this.client.unlink(slice.map(txid => `mempool:tx:${txid}`)); + numRemoved+= sliceLength; + logger.debug(`Deleted ${slice.length} transactions from the Redis cache`); + } catch (e) { + logger.warn(`Failed to remove ${slice.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); + failed = failed.concat(slice); + } } - } catch (e) { - logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); + // concat instead of replace, in case more txs have been added in the meantime + this.removeQueue = this.removeQueue.concat(failed); + } else { + this.removeQueue = this.removeQueue.concat(toRemove); } } async $setRbfEntry(type: string, txid: string, value: any): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.connected) { + this.rbfCacheQueue.push({ type, txid, value }); + logger.warn(`Failed to set RBF ${type} in Redis cache: Redis is not connected`); + return; + } try { - await this.$ensureConnected(); await this.client.set(`rbf:${type}:${txid}`, JSON.stringify(value)); } catch (e) { logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); @@ -143,17 +222,55 @@ class RedisCache { } async $removeRbfEntry(type: string, txid: string): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.connected) { + this.rbfRemoveQueue.push({ type, txid }); + logger.warn(`Failed to remove RBF ${type} from Redis cache: Redis is not connected`); + return; + } try { - await this.$ensureConnected(); await this.client.unlink(`rbf:${type}:${txid}`); } catch (e) { logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`); } } - async $getBlocks(): Promise { + private async $flushRbfQueues(): Promise { + if (!config.REDIS.ENABLED) { + return; + } + if (!this.connected) { + return; + } + try { + const toAdd = this.rbfCacheQueue; + this.rbfCacheQueue = []; + for (const { type, txid, value } of toAdd) { + await this.$setRbfEntry(type, txid, value); + } + logger.debug(`Saved ${toAdd.length} queued RBF entries to the Redis cache`); + const toRemove = this.rbfRemoveQueue; + this.rbfRemoveQueue = []; + for (const { type, txid } of toRemove) { + await this.$removeRbfEntry(type, txid); + } + logger.debug(`Removed ${toRemove.length} queued RBF entries from the Redis cache`); + } catch (e) { + logger.warn(`Failed to flush RBF cache event queues after reconnecting to Redis: ${e instanceof Error ? e.message : e}`); + } + } + + async $getBlocks(): Promise { + if (!config.REDIS.ENABLED) { + return []; + } + if (!this.connected) { + logger.warn(`Failed to retrieve blocks from Redis cache: Redis is not connected`); + return []; + } try { - await this.$ensureConnected(); const json = await this.client.get('blocks'); return JSON.parse(json); } catch (e) { @@ -163,8 +280,14 @@ class RedisCache { } async $getBlockSummaries(): Promise { + if (!config.REDIS.ENABLED) { + return []; + } + if (!this.connected) { + logger.warn(`Failed to retrieve blocks from Redis cache: Redis is not connected`); + return []; + } try { - await this.$ensureConnected(); const json = await this.client.get('block-summaries'); return JSON.parse(json); } catch (e) { @@ -174,10 +297,16 @@ class RedisCache { } async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { + if (!config.REDIS.ENABLED) { + return {}; + } + if (!this.connected) { + logger.warn(`Failed to retrieve mempool from Redis cache: Redis is not connected`); + return {}; + } const start = Date.now(); const mempool = {}; try { - await this.$ensureConnected(); const mempoolList = await this.scanKeys('mempool:tx:*'); for (const tx of mempoolList) { mempool[tx.key] = tx.value; @@ -191,8 +320,14 @@ class RedisCache { } async $getRbfEntries(type: string): Promise { + if (!config.REDIS.ENABLED) { + return []; + } + if (!this.connected) { + logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: Redis is not connected`); + return []; + } try { - await this.$ensureConnected(); const rbfEntries = await this.scanKeys(`rbf:${type}:*`); return rbfEntries; } catch (e) { @@ -201,7 +336,10 @@ class RedisCache { } } - async $loadCache() { + async $loadCache(): Promise { + if (!config.REDIS.ENABLED) { + return; + } logger.info('Restoring mempool and blocks data from Redis cache'); // Load block data const loadedBlocks = await this.$getBlocks(); @@ -226,7 +364,7 @@ class RedisCache { }); } - private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }) { + private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }): void { for (const tx of Object.values(mempool)) { for (const vin of tx.vin) { if (vin.scriptsig) {