diff --git a/backend/src/api/disk-cache.ts b/backend/src/api/disk-cache.ts index 04328a72a..6f603489a 100644 --- a/backend/src/api/disk-cache.ts +++ b/backend/src/api/disk-cache.ts @@ -179,6 +179,7 @@ class DiskCache { return; } try { + const start = Date.now(); let data: any = {}; const cacheData = fs.readFileSync(DiskCache.FILE_NAME, 'utf8'); if (cacheData) { @@ -220,6 +221,8 @@ class DiskCache { } } + logger.info(`Loaded mempool from disk cache in ${Date.now() - start} ms`); + await memPool.$setMempool(data.mempool); if (!this.ignoreBlocksCache) { blocks.setBlocks(data.blocks); diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index b5ae74072..b5592252c 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -360,14 +360,14 @@ class RbfCache { public async load({ txs, trees, expiring }): Promise { txs.forEach(txEntry => { - this.txs.set(txEntry[0], txEntry[1]); + this.txs.set(txEntry.key, txEntry.value); }); for (const deflatedTree of trees) { await this.importTree(deflatedTree.root, deflatedTree.root, deflatedTree, this.txs); } expiring.forEach(expiringEntry => { - if (this.txs.has(expiringEntry[0])) { - this.expiring.set(expiringEntry[0], new Date(expiringEntry[1]).getTime()); + if (this.txs.has(expiringEntry.key)) { + this.expiring.set(expiringEntry.key, new Date(expiringEntry.value).getTime()); } }); this.cleanup(); diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 540467caf..4a1375419 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -36,13 +36,6 @@ class RedisCache { logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); }); this.$ensureConnected(); - this.client.exists('mempool:0').then((mempoolExists) => { - if (!mempoolExists) { - for (let i = 0; i < 16; i++) { - this.client.json.set(`mempool:${i.toString(16)}`, '$', {}); - } - } - }); } } @@ -65,7 +58,7 @@ class RedisCache { async $updateBlocks(blocks: BlockExtended[]) { try { await this.$ensureConnected(); - await this.client.json.set('blocks', '$', blocks); + await this.client.set('blocks', JSON.stringify(blocks)); } catch (e) { logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -74,7 +67,7 @@ class RedisCache { async $updateBlockSummaries(summaries: BlockSummary[]) { try { await this.$ensureConnected(); - await this.client.json.set('block-summaries', '$', summaries); + await this.client.set('block-summaries', JSON.stringify(summaries)); } catch (e) { logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -98,9 +91,12 @@ class RedisCache { } private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { + if (!newTransactions.length) { + return true; + } try { await this.$ensureConnected(); - await Promise.all(newTransactions.map(tx => { + const msetData = newTransactions.map(tx => { const minified: any = { ...tx }; delete minified.hex; for (const vin of minified.vin) { @@ -111,8 +107,9 @@ class RedisCache { for (const vout of minified.vout) { delete vout.scriptpubkey_asm; } - return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, minified); - })); + return [`mempool:tx:${tx.txid}`, JSON.stringify(minified)]; + }); + await this.client.MSET(msetData); return true; } catch (e) { logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); @@ -123,9 +120,9 @@ class RedisCache { async $removeTransactions(transactions: string[]) { try { await this.$ensureConnected(); - await Promise.all(transactions.map(txid => { - return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid); - })); + for (let i = 0; i < Math.ceil(transactions.length / 1000); i++) { + await this.client.del(transactions.slice(i * 1000, (i + 1) * 1000).map(txid => `mempool:tx:${txid}`)); + } } catch (e) { logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -134,7 +131,7 @@ class RedisCache { async $setRbfEntry(type: string, txid: string, value: any): Promise { try { await this.$ensureConnected(); - await this.client.json.set(`rbf:${type}:${txid}`, '$', value); + await this.client.set(`rbf:${type}:${txid}`, JSON.stringify(value)); } catch (e) { logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -152,7 +149,8 @@ class RedisCache { async $getBlocks(): Promise { try { await this.$ensureConnected(); - return this.client.json.get('blocks'); + const json = await this.client.get('blocks'); + return JSON.parse(json); } catch (e) { logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -162,7 +160,8 @@ class RedisCache { async $getBlockSummaries(): Promise { try { await this.$ensureConnected(); - return this.client.json.get('block-summaries'); + const json = await this.client.get('block-summaries'); + return JSON.parse(json); } catch (e) { logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -171,16 +170,14 @@ class RedisCache { async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { const start = Date.now(); - let mempool = {}; + const mempool = {}; try { await this.$ensureConnected(); - for (let i = 0; i < 16; i++) { - const shard = await this.client.json.get(`mempool:${i.toString(16)}`); - logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`); - mempool = Object.assign(mempool, shard); + const mempoolList = await this.scanKeys('mempool:tx:*'); + for (const tx of mempoolList) { + mempool[tx.key] = tx.value; } - logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `); - logger.info(`Loaded redis cache in ${Date.now() - start} ms`); + logger.info(`Loaded mempool from Redis cache in ${Date.now() - start} ms`); return mempool || {}; } catch (e) { logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`); @@ -191,17 +188,8 @@ class RedisCache { async $getRbfEntries(type: string): Promise { try { await this.$ensureConnected(); - const keys = await this.client.keys(`rbf:${type}:*`); - const promises: Promise[] = []; - for (let i = 0; i < keys.length; i += 10000) { - const keySlice = keys.slice(i, i + 10000); - if (!keySlice.length) { - continue; - } - promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] )); - } - const entries = await Promise.all(promises); - return entries.flat(); + const rbfEntries = await this.scanKeys(`rbf:${type}:*`); + return rbfEntries; } catch (e) { logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -227,7 +215,7 @@ class RedisCache { await memPool.$setMempool(loadedMempool); await rbfCache.load({ txs: rbfTxs, - trees: rbfTrees.map(loadedTree => loadedTree[1]), + trees: rbfTrees.map(loadedTree => loadedTree.value), expiring: rbfExpirations, }); } @@ -248,6 +236,37 @@ class RedisCache { } } + private async scanKeys(pattern): Promise<{ key: string, value: T }[]> { + logger.info(`loading Redis entries for ${pattern}`); + let keys: string[] = []; + const result: { key: string, value: T }[] = []; + const patternLength = pattern.length - 1; + let count = 0; + const processValues = async (keys): Promise => { + const values = await this.client.MGET(keys); + for (let i = 0; i < values.length; i++) { + if (values[i]) { + result.push({ key: keys[i].slice(patternLength), value: JSON.parse(values[i]) }); + count++; + } + } + logger.info(`loaded ${count} entries from Redis cache`); + }; + for await (const key of this.client.scanIterator({ + MATCH: pattern, + COUNT: 100 + })) { + keys.push(key); + if (keys.length >= 10000) { + await processValues(keys); + keys = []; + } + } + if (keys.length) { + await processValues(keys); + } + return result; + } } export default new RedisCache();