diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index dc3634a6f..73a6fdfeb 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -86,6 +86,10 @@ class Mempool { public async $setMempool(mempoolData: { [txId: string]: MempoolTransactionExtended }) { this.mempoolCache = mempoolData; let count = 0; + const redisTimer = Date.now(); + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + logger.debug(`Migrating ${Object.keys(this.mempoolCache).length} transactions from disk cache to Redis cache`); + } for (const txid of Object.keys(this.mempoolCache)) { if (!this.mempoolCache[txid].sigops || this.mempoolCache[txid].effectiveFeePerVsize == null) { this.mempoolCache[txid] = transactionUtils.extendMempoolTransaction(this.mempoolCache[txid]); @@ -94,6 +98,13 @@ class Mempool { this.mempoolCache[txid].order = transactionUtils.txidToOrdering(txid); } count++; + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + await redisCache.$addTransaction(this.mempoolCache[txid]); + } + } + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + await redisCache.$flushTransactions(); + logger.debug(`Finished migrating cache transactions in ${((Date.now() - redisTimer) / 1000).toFixed(2)} seconds`); } if (this.mempoolChangedCallback) { this.mempoolChangedCallback(this.mempoolCache, [], []); @@ -102,10 +113,6 @@ class Mempool { await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []); } this.addToSpendMap(Object.values(this.mempoolCache)); - if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { - logger.debug('copying mempool from disk cache into Redis'); - await redisCache.$addTransactions(Object.values(mempoolData)); - } } public async $reloadMempool(expectedCount: number): Promise { @@ -212,6 +219,7 @@ class Mempool { logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); try { newTransactions = await this.$reloadMempool(transactions.length); + redisCache.$addTransactions(newTransactions); loaded = true; } catch (e) { logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); @@ -234,6 +242,10 @@ class Mempool { } hasChange = true; newTransactions.push(transaction); + + if (config.REDIS.ENABLED) { + await redisCache.$addTransaction(transaction); + } } catch (e: any) { if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { this.missingTxCount++; @@ -324,7 +336,7 @@ class Mempool { // Update Redis cache if (config.REDIS.ENABLED) { - await redisCache.$addTransactions(newTransactions); + await redisCache.$flushTransactions(); await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); await rbfCache.updateCache(); } diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index e89b2302d..4b3c956c0 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -10,6 +10,9 @@ class RedisCache { private client; private connected = false; + private cacheQueue: MempoolTransactionExtended[] = []; + private txFlushLimit: number = 1000; + constructor() { if (config.REDIS.ENABLED) { const redisConfig = { @@ -52,6 +55,18 @@ class RedisCache { } } + async $addTransaction(tx: MempoolTransactionExtended) { + this.cacheQueue.push(tx); + if (this.cacheQueue.length > this.txFlushLimit) { + await this.$flushTransactions(); + } + } + + async $flushTransactions() { + await this.$addTransactions(this.cacheQueue); + this.cacheQueue = []; + } + async $addTransactions(newTransactions: MempoolTransactionExtended[]) { try { await this.$ensureConnected(); @@ -118,6 +133,7 @@ class RedisCache { await this.$ensureConnected(); const keys = await this.client.keys('tx:*'); const promises: Promise[] = []; + let returned = 0; for (let i = 0; i < keys.length; i += 10000) { const keySlice = keys.slice(i, i + 10000); if (!keySlice.length) { @@ -131,6 +147,8 @@ class RedisCache { } } } + logger.info(`Loaded ${(returned * 10000) + (chunk.length)}/${keys.length} transactions from Redis cache`); + returned++; })); } await Promise.all(promises);