mempool/backend/src/api/redis-cache.ts

254 lines
8.4 KiB
TypeScript
Raw Normal View History

import { createClient } from 'redis';
import memPool from './mempool';
import blocks from './blocks';
import logger from '../logger';
import config from '../config';
2023-06-16 18:55:22 -04:00
import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces';
2023-05-12 16:31:01 -06:00
import rbfCache from './rbf-cache';
import transactionUtils from './transaction-utils';
enum NetworkDB {
mainnet = 0,
testnet,
signet,
liquid,
liquidtestnet,
}
class RedisCache {
private client;
private connected = false;
private schemaVersion = 1;
private cacheQueue: MempoolTransactionExtended[] = [];
private txFlushLimit: number = 10000;
constructor() {
if (config.REDIS.ENABLED) {
const redisConfig = {
socket: {
path: config.REDIS.UNIX_SOCKET_PATH
},
database: NetworkDB[config.MEMPOOL.NETWORK],
};
this.client = createClient(redisConfig);
this.client.on('error', (e) => {
logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
});
this.$ensureConnected();
this.client.exists('mempool:0').then((mempoolExists) => {
if (!mempoolExists) {
for (let i = 0; i < 16; i++) {
this.client.json.set(`mempool:${i.toString(16)}`, '$', {});
}
}
});
}
}
private async $ensureConnected(): Promise<void> {
if (!this.connected && config.REDIS.ENABLED) {
return this.client.connect().then(async () => {
this.connected = true;
logger.info(`Redis client connected`);
const version = await this.client.get('schema_version');
if (version !== this.schemaVersion) {
// schema changed
// perform migrations or flush DB if necessary
logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`);
await this.client.set('schema_version', this.schemaVersion);
}
});
}
}
async $updateBlocks(blocks: BlockExtended[]) {
try {
await this.$ensureConnected();
await this.client.json.set('blocks', '$', blocks);
} catch (e) {
logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $updateBlockSummaries(summaries: BlockSummary[]) {
try {
await this.$ensureConnected();
await this.client.json.set('block-summaries', '$', summaries);
} catch (e) {
logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $addTransaction(tx: MempoolTransactionExtended) {
this.cacheQueue.push(tx);
if (this.cacheQueue.length >= this.txFlushLimit) {
await this.$flushTransactions();
}
}
async $flushTransactions() {
const success = await this.$addTransactions(this.cacheQueue);
if (success) {
logger.info(`Flushed ${this.cacheQueue.length} transactions to Redis cache`);
this.cacheQueue = [];
} else {
logger.err(`Failed to flush ${this.cacheQueue.length} transactions to Redis cache`);
}
}
private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise<boolean> {
try {
await this.$ensureConnected();
await Promise.all(newTransactions.map(tx => {
const minified: any = { ...tx };
delete minified.hex;
for (const vin of minified.vin) {
delete vin.inner_redeemscript_asm;
delete vin.inner_witnessscript_asm;
delete vin.scriptsig_asm;
}
for (const vout of minified.vout) {
delete vout.scriptpubkey_asm;
}
return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, minified);
}));
return true;
} catch (e) {
logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`);
return false;
}
}
async $removeTransactions(transactions: string[]) {
try {
await this.$ensureConnected();
await Promise.all(transactions.map(txid => {
return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid);
}));
} catch (e) {
logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
2023-05-12 16:31:01 -06:00
async $setRbfEntry(type: string, txid: string, value: any): Promise<void> {
try {
await this.$ensureConnected();
await this.client.json.set(`rbf:${type}:${txid}`, '$', value);
} catch (e) {
logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $removeRbfEntry(type: string, txid: string): Promise<void> {
try {
await this.$ensureConnected();
await this.client.del(`rbf:${type}:${txid}`);
} catch (e) {
logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $getBlocks(): Promise<BlockExtended[]> {
try {
await this.$ensureConnected();
return this.client.json.get('blocks');
} catch (e) {
logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
return [];
}
}
async $getBlockSummaries(): Promise<BlockSummary[]> {
try {
await this.$ensureConnected();
return this.client.json.get('block-summaries');
} catch (e) {
logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
return [];
}
}
2023-06-16 18:55:22 -04:00
async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> {
const start = Date.now();
let mempool = {};
try {
await this.$ensureConnected();
for (let i = 0; i < 16; i++) {
const shard = await this.client.json.get(`mempool:${i.toString(16)}`);
logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`);
mempool = Object.assign(mempool, shard);
}
logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `);
logger.info(`Loaded redis cache in ${Date.now() - start} ms`);
return mempool || {};
} catch (e) {
logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`);
}
return {};
}
2023-05-12 16:31:01 -06:00
async $getRbfEntries(type: string): Promise<any[]> {
try {
await this.$ensureConnected();
const keys = await this.client.keys(`rbf:${type}:*`);
2023-06-16 18:55:22 -04:00
const promises: Promise<MempoolTransactionExtended[]>[] = [];
2023-05-12 16:31:01 -06:00
for (let i = 0; i < keys.length; i += 10000) {
const keySlice = keys.slice(i, i + 10000);
if (!keySlice.length) {
continue;
}
promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] ));
}
const entries = await Promise.all(promises);
return entries.flat();
} catch (e) {
logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`);
return [];
}
}
async $loadCache() {
logger.info('Restoring mempool and blocks data from Redis cache');
// Load block data
const loadedBlocks = await this.$getBlocks();
const loadedBlockSummaries = await this.$getBlockSummaries();
// Load mempool
const loadedMempool = await this.$getMempool();
this.inflateLoadedTxs(loadedMempool);
2023-05-12 16:31:01 -06:00
// Load rbf data
const rbfTxs = await this.$getRbfEntries('tx');
const rbfTrees = await this.$getRbfEntries('tree');
const rbfExpirations = await this.$getRbfEntries('exp');
// Set loaded data
blocks.setBlocks(loadedBlocks || []);
blocks.setBlockSummaries(loadedBlockSummaries || []);
await memPool.$setMempool(loadedMempool);
2023-05-12 16:31:01 -06:00
await rbfCache.load({
txs: rbfTxs,
trees: rbfTrees.map(loadedTree => loadedTree[1]),
expiring: rbfExpirations,
});
}
private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }) {
for (const tx of Object.values(mempool)) {
for (const vin of tx.vin) {
if (vin.scriptsig) {
vin.scriptsig_asm = transactionUtils.convertScriptSigAsm(vin.scriptsig);
transactionUtils.addInnerScriptsToVin(vin);
}
}
for (const vout of tx.vout) {
if (vout.scriptpubkey) {
vout.scriptpubkey_asm = transactionUtils.convertScriptSigAsm(vout.scriptpubkey);
}
}
}
}
}
export default new RedisCache();