Add network and schema versioning to redis cache
This commit is contained in:
		
							parent
							
								
									d65bddd30b
								
							
						
					
					
						commit
						a9f8bbbcce
					
				@ -6,9 +6,18 @@ import config from '../config';
 | 
			
		||||
import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces';
 | 
			
		||||
import rbfCache from './rbf-cache';
 | 
			
		||||
 | 
			
		||||
enum NetworkDB {
 | 
			
		||||
  mainnet = 0,
 | 
			
		||||
  testnet,
 | 
			
		||||
  signet,
 | 
			
		||||
  liquid,
 | 
			
		||||
  liquidtestnet,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class RedisCache {
 | 
			
		||||
  private client;
 | 
			
		||||
  private connected = false;
 | 
			
		||||
  private schemaVersion = 1;
 | 
			
		||||
 | 
			
		||||
  private cacheQueue: MempoolTransactionExtended[] = [];
 | 
			
		||||
  private txFlushLimit: number = 1000;
 | 
			
		||||
@ -18,7 +27,8 @@ class RedisCache {
 | 
			
		||||
      const redisConfig = {
 | 
			
		||||
        socket: {
 | 
			
		||||
          path: config.REDIS.UNIX_SOCKET_PATH
 | 
			
		||||
        }
 | 
			
		||||
        },
 | 
			
		||||
        database: NetworkDB[config.MEMPOOL.NETWORK],
 | 
			
		||||
      };
 | 
			
		||||
      this.client = createClient(redisConfig);
 | 
			
		||||
      this.client.on('error', (e) => {
 | 
			
		||||
@ -30,9 +40,16 @@ class RedisCache {
 | 
			
		||||
 | 
			
		||||
  private async $ensureConnected(): Promise<void> {
 | 
			
		||||
    if (!this.connected && config.REDIS.ENABLED) {
 | 
			
		||||
      return this.client.connect().then(() => {
 | 
			
		||||
      return this.client.connect().then(async () => {
 | 
			
		||||
        this.connected = true;
 | 
			
		||||
        logger.info(`Redis client connected`);
 | 
			
		||||
        const version = await this.client.get('schema_version');
 | 
			
		||||
        if (version !== this.schemaVersion) {
 | 
			
		||||
          // schema changed
 | 
			
		||||
          // perform migrations or flush DB if necessary
 | 
			
		||||
          logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`);
 | 
			
		||||
          await this.client.set('schema_version', this.schemaVersion);
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -63,18 +80,22 @@ class RedisCache {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $flushTransactions() {
 | 
			
		||||
    await this.$addTransactions(this.cacheQueue);
 | 
			
		||||
    this.cacheQueue = [];
 | 
			
		||||
    const success = await this.$addTransactions(this.cacheQueue);
 | 
			
		||||
    if (success) {
 | 
			
		||||
      this.cacheQueue = [];
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async $addTransactions(newTransactions: MempoolTransactionExtended[]) {
 | 
			
		||||
  async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise<boolean> {
 | 
			
		||||
    try {
 | 
			
		||||
      await this.$ensureConnected();
 | 
			
		||||
      await Promise.all(newTransactions.map(tx => {
 | 
			
		||||
        return this.client.json.set('tx:' + tx.txid, '$', tx);
 | 
			
		||||
      }));
 | 
			
		||||
      return true;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      return false;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user