switch from redis-json to simple key-value redis entries
This commit is contained in:
		
							parent
							
								
									a393f42b5e
								
							
						
					
					
						commit
						c79a597c96
					
				@ -179,6 +179,7 @@ class DiskCache {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    try {
 | 
			
		||||
      const start = Date.now();
 | 
			
		||||
      let data: any = {};
 | 
			
		||||
      const cacheData = fs.readFileSync(DiskCache.FILE_NAME, 'utf8');
 | 
			
		||||
      if (cacheData) {
 | 
			
		||||
@ -220,6 +221,8 @@ class DiskCache {
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      logger.info(`Loaded mempool from disk cache in ${Date.now() - start} ms`);
 | 
			
		||||
 | 
			
		||||
      await memPool.$setMempool(data.mempool);
 | 
			
		||||
      if (!this.ignoreBlocksCache) {
 | 
			
		||||
        blocks.setBlocks(data.blocks);
 | 
			
		||||
 | 
			
		||||
@ -360,14 +360,14 @@ class RbfCache {
 | 
			
		||||
 | 
			
		||||
  public async load({ txs, trees, expiring }): Promise<void> {
 | 
			
		||||
    txs.forEach(txEntry => {
 | 
			
		||||
      this.txs.set(txEntry[0], txEntry[1]);
 | 
			
		||||
      this.txs.set(txEntry.key, txEntry.value);
 | 
			
		||||
    });
 | 
			
		||||
    for (const deflatedTree of trees) {
 | 
			
		||||
      await this.importTree(deflatedTree.root, deflatedTree.root, deflatedTree, this.txs);
 | 
			
		||||
    }
 | 
			
		||||
    expiring.forEach(expiringEntry => {
 | 
			
		||||
      if (this.txs.has(expiringEntry[0])) {
 | 
			
		||||
        this.expiring.set(expiringEntry[0], new Date(expiringEntry[1]).getTime());
 | 
			
		||||
      if (this.txs.has(expiringEntry.key)) {
 | 
			
		||||
        this.expiring.set(expiringEntry.key, new Date(expiringEntry.value).getTime());
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
    this.cleanup();
 | 
			
		||||
 | 
			
		||||
@ -36,13 +36,6 @@ class RedisCache {
 | 
			
		||||
        logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      });
 | 
			
		||||
      this.$ensureConnected();
 | 
			
		||||
      this.client.exists('mempool:0').then((mempoolExists) => {
 | 
			
		||||
        if (!mempoolExists) {
 | 
			
		||||
          for (let i = 0; i < 16; i++) {
 | 
			
		||||
            this.client.json.set(`mempool:${i.toString(16)}`, '$', {});
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -65,7 +58,7 @@ class RedisCache {
 | 
			
		||||
  async $updateBlocks(blocks: BlockExtended[]) {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await this.client.json.set('blocks', '$', blocks);
 | 
			
		||||
      await this.client.set('blocks', JSON.stringify(blocks));
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
    }
 | 
			
		||||
@ -74,7 +67,7 @@ class RedisCache {
 | 
			
		||||
  async $updateBlockSummaries(summaries: BlockSummary[]) {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await this.client.json.set('block-summaries', '$', summaries);
 | 
			
		||||
      await this.client.set('block-summaries', JSON.stringify(summaries));
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
    }
 | 
			
		||||
@ -98,9 +91,12 @@ class RedisCache {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise<boolean> {
 | 
			
		||||
    if (!newTransactions.length) {
 | 
			
		||||
      return true;
 | 
			
		||||
    }
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await Promise.all(newTransactions.map(tx => {
 | 
			
		||||
      const msetData = newTransactions.map(tx => {
 | 
			
		||||
        const minified: any = { ...tx };
 | 
			
		||||
        delete minified.hex;
 | 
			
		||||
        for (const vin of minified.vin) {
 | 
			
		||||
@ -111,8 +107,9 @@ class RedisCache {
 | 
			
		||||
        for (const vout of minified.vout) {
 | 
			
		||||
          delete vout.scriptpubkey_asm;
 | 
			
		||||
        }
 | 
			
		||||
        return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, minified);
 | 
			
		||||
      }));
 | 
			
		||||
        return [`mempool:tx:${tx.txid}`, JSON.stringify(minified)];
 | 
			
		||||
      });
 | 
			
		||||
      await this.client.MSET(msetData);
 | 
			
		||||
      return true;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
@ -123,9 +120,9 @@ class RedisCache {
 | 
			
		||||
  async $removeTransactions(transactions: string[]) {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await Promise.all(transactions.map(txid => {
 | 
			
		||||
        return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid);
 | 
			
		||||
      }));
 | 
			
		||||
      for (let i = 0; i < Math.ceil(transactions.length / 1000); i++) {
 | 
			
		||||
        await this.client.del(transactions.slice(i * 1000, (i + 1) * 1000).map(txid => `mempool:tx:${txid}`));
 | 
			
		||||
      }
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
    }
 | 
			
		||||
@ -134,7 +131,7 @@ class RedisCache {
 | 
			
		||||
  async $setRbfEntry(type: string, txid: string, value: any): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await this.client.json.set(`rbf:${type}:${txid}`, '$', value);
 | 
			
		||||
      await this.client.set(`rbf:${type}:${txid}`, JSON.stringify(value));
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
    }
 | 
			
		||||
@ -152,7 +149,8 @@ class RedisCache {
 | 
			
		||||
  async $getBlocks(): Promise<BlockExtended[]> {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      return this.client.json.get('blocks');
 | 
			
		||||
      const json = await this.client.get('blocks');
 | 
			
		||||
      return JSON.parse(json);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      return [];
 | 
			
		||||
@ -162,7 +160,8 @@ class RedisCache {
 | 
			
		||||
  async $getBlockSummaries(): Promise<BlockSummary[]> {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      return this.client.json.get('block-summaries');
 | 
			
		||||
      const json = await this.client.get('block-summaries');
 | 
			
		||||
      return JSON.parse(json);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      return [];
 | 
			
		||||
@ -171,16 +170,14 @@ class RedisCache {
 | 
			
		||||
 | 
			
		||||
  async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> {
 | 
			
		||||
    const start = Date.now();
 | 
			
		||||
    let mempool = {};
 | 
			
		||||
    const mempool = {};
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      for (let i = 0; i < 16; i++) {
 | 
			
		||||
        const shard = await this.client.json.get(`mempool:${i.toString(16)}`);
 | 
			
		||||
        logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`);
 | 
			
		||||
        mempool = Object.assign(mempool, shard);
 | 
			
		||||
      const mempoolList = await this.scanKeys<MempoolTransactionExtended>('mempool:tx:*');
 | 
			
		||||
      for (const tx of mempoolList) {
 | 
			
		||||
        mempool[tx.key] = tx.value;
 | 
			
		||||
      }
 | 
			
		||||
      logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `);
 | 
			
		||||
      logger.info(`Loaded redis cache in ${Date.now() - start} ms`);
 | 
			
		||||
      logger.info(`Loaded mempool from Redis cache in ${Date.now() - start} ms`);
 | 
			
		||||
      return mempool || {};
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
@ -191,17 +188,8 @@ class RedisCache {
 | 
			
		||||
  async $getRbfEntries(type: string): Promise<any[]> {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      const keys = await this.client.keys(`rbf:${type}:*`);
 | 
			
		||||
      const promises: Promise<MempoolTransactionExtended[]>[] = [];
 | 
			
		||||
      for (let i = 0; i < keys.length; i += 10000) {
 | 
			
		||||
        const keySlice = keys.slice(i, i + 10000);
 | 
			
		||||
        if (!keySlice.length) {
 | 
			
		||||
          continue;
 | 
			
		||||
        }
 | 
			
		||||
        promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] ));
 | 
			
		||||
      }
 | 
			
		||||
      const entries = await Promise.all(promises);
 | 
			
		||||
      return entries.flat();
 | 
			
		||||
      const rbfEntries = await this.scanKeys<MempoolTransactionExtended[]>(`rbf:${type}:*`);
 | 
			
		||||
      return rbfEntries;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      return [];
 | 
			
		||||
@ -227,7 +215,7 @@ class RedisCache {
 | 
			
		||||
    await memPool.$setMempool(loadedMempool);
 | 
			
		||||
    await rbfCache.load({
 | 
			
		||||
      txs: rbfTxs,
 | 
			
		||||
      trees: rbfTrees.map(loadedTree => loadedTree[1]),
 | 
			
		||||
      trees: rbfTrees.map(loadedTree => loadedTree.value),
 | 
			
		||||
      expiring: rbfExpirations,
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
@ -248,6 +236,37 @@ class RedisCache {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async scanKeys<T>(pattern): Promise<{ key: string, value: T }[]> {
 | 
			
		||||
    logger.info(`loading Redis entries for ${pattern}`);
 | 
			
		||||
    let keys: string[] = [];
 | 
			
		||||
    const result: { key: string, value: T }[] = [];
 | 
			
		||||
    const patternLength = pattern.length - 1;
 | 
			
		||||
    let count = 0;
 | 
			
		||||
    const processValues = async (keys): Promise<void> => {
 | 
			
		||||
      const values = await this.client.MGET(keys);
 | 
			
		||||
      for (let i = 0; i < values.length; i++) {
 | 
			
		||||
        if (values[i]) {
 | 
			
		||||
          result.push({ key: keys[i].slice(patternLength), value: JSON.parse(values[i]) });
 | 
			
		||||
          count++;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      logger.info(`loaded ${count} entries from Redis cache`);
 | 
			
		||||
    };
 | 
			
		||||
    for await (const key of this.client.scanIterator({
 | 
			
		||||
      MATCH: pattern,
 | 
			
		||||
      COUNT: 100
 | 
			
		||||
    })) {
 | 
			
		||||
      keys.push(key);
 | 
			
		||||
      if (keys.length >= 10000) {
 | 
			
		||||
        await processValues(keys);
 | 
			
		||||
        keys = [];
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (keys.length) {
 | 
			
		||||
      await processValues(keys);
 | 
			
		||||
    }
 | 
			
		||||
    return result;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default new RedisCache();
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user