From a9f8bbbcce197dfb62b4a1ad25930942ab861f6b Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 16 Jun 2023 19:00:52 -0400 Subject: [PATCH] Add network and schema versioning to redis cache --- backend/src/api/redis-cache.ts | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 4b3c956c0..facbafd34 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -6,9 +6,18 @@ import config from '../config'; import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces'; import rbfCache from './rbf-cache'; +enum NetworkDB { + mainnet = 0, + testnet, + signet, + liquid, + liquidtestnet, +} + class RedisCache { private client; private connected = false; + private schemaVersion = 1; private cacheQueue: MempoolTransactionExtended[] = []; private txFlushLimit: number = 1000; @@ -18,7 +27,8 @@ class RedisCache { const redisConfig = { socket: { path: config.REDIS.UNIX_SOCKET_PATH - } + }, + database: NetworkDB[config.MEMPOOL.NETWORK], }; this.client = createClient(redisConfig); this.client.on('error', (e) => { @@ -30,9 +40,16 @@ class RedisCache { private async $ensureConnected(): Promise { if (!this.connected && config.REDIS.ENABLED) { - return this.client.connect().then(() => { + return this.client.connect().then(async () => { this.connected = true; logger.info(`Redis client connected`); + const version = await this.client.get('schema_version'); + if (version !== this.schemaVersion) { + // schema changed + // perform migrations or flush DB if necessary + logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); + await this.client.set('schema_version', this.schemaVersion); + } }); } } @@ -63,18 +80,22 @@ class RedisCache { } async $flushTransactions() { - await this.$addTransactions(this.cacheQueue); - this.cacheQueue = []; + const success = await this.$addTransactions(this.cacheQueue); + if (success) { + this.cacheQueue = []; + } } - async $addTransactions(newTransactions: MempoolTransactionExtended[]) { + async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { try { await this.$ensureConnected(); await Promise.all(newTransactions.map(tx => { return this.client.json.set('tx:' + tx.txid, '$', tx); })); + return true; } catch (e) { logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); + return false; } }