store redis mempool in sharded json object

This commit is contained in:
Mononaut 2023-07-13 09:25:05 +09:00
parent a9f8bbbcce
commit 6ac58f2da7
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E

View File

@ -35,6 +35,13 @@ class RedisCache {
logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
}); });
this.$ensureConnected(); this.$ensureConnected();
this.client.exists('mempool:0').then((mempoolExists) => {
if (!mempoolExists) {
for (let i = 0; i < 16; i++) {
this.client.json.set(`mempool:${i.toString(16)}`, '$', {});
}
}
});
} }
} }
@ -90,7 +97,7 @@ class RedisCache {
try { try {
await this.$ensureConnected(); await this.$ensureConnected();
await Promise.all(newTransactions.map(tx => { await Promise.all(newTransactions.map(tx => {
return this.client.json.set('tx:' + tx.txid, '$', tx); return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, tx);
})); }));
return true; return true;
} catch (e) { } catch (e) {
@ -103,7 +110,7 @@ class RedisCache {
try { try {
await this.$ensureConnected(); await this.$ensureConnected();
await Promise.all(transactions.map(txid => { await Promise.all(transactions.map(txid => {
return this.client.del('tx:' + txid); return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid);
})); }));
} catch (e) { } catch (e) {
logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`);
@ -149,34 +156,22 @@ class RedisCache {
} }
async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> {
const mempool = {}; const start = Date.now();
let mempool = {};
try { try {
await this.$ensureConnected(); await this.$ensureConnected();
const keys = await this.client.keys('tx:*'); for (let i = 0; i < 16; i++) {
const promises: Promise<MempoolTransactionExtended[]>[] = []; const shard = await this.client.json.get(`mempool:${i.toString(16)}`);
let returned = 0; logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`);
for (let i = 0; i < keys.length; i += 10000) { mempool = Object.assign(mempool, shard);
const keySlice = keys.slice(i, i + 10000);
if (!keySlice.length) {
continue;
}
promises.push(this.client.json.mGet(keySlice, '$').then(chunk => {
for (const txs of chunk) {
for (const tx of txs) {
if (tx) {
mempool[tx.txid] = tx;
}
}
}
logger.info(`Loaded ${(returned * 10000) + (chunk.length)}/${keys.length} transactions from Redis cache`);
returned++;
}));
} }
await Promise.all(promises); logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `);
logger.info(`Loaded redis cache in ${Date.now() - start} ms`);
return mempool || {};
} catch (e) { } catch (e) {
logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`); logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`);
} }
return mempool; return {};
} }
async $getRbfEntries(type: string): Promise<any[]> { async $getRbfEntries(type: string): Promise<any[]> {