Implement Redis cache for block and mempool data
This commit is contained in:
@@ -26,6 +26,8 @@ import PricesRepository from '../repositories/PricesRepository';
|
||||
import priceUpdater from '../tasks/price-updater';
|
||||
import chainTips from './chain-tips';
|
||||
import websocketHandler from './websocket-handler';
|
||||
import redisCache from './redis-cache';
|
||||
import rbfCache from './rbf-cache';
|
||||
|
||||
class Blocks {
|
||||
private blocks: BlockExtended[] = [];
|
||||
@@ -804,10 +806,18 @@ class Blocks {
|
||||
if (this.newBlockCallbacks.length) {
|
||||
this.newBlockCallbacks.forEach((cb) => cb(blockExtended, txIds, transactions));
|
||||
}
|
||||
if (!memPool.hasPriority() && (block.height % config.MEMPOOL.DISK_CACHE_BLOCK_INTERVAL === 0)) {
|
||||
if (config.MEMPOOL.CACHE_ENABLED && !memPool.hasPriority() && (block.height % config.MEMPOOL.DISK_CACHE_BLOCK_INTERVAL === 0)) {
|
||||
diskCache.$saveCacheToDisk();
|
||||
}
|
||||
|
||||
// Update Redis cache
|
||||
if (config.REDIS.ENABLED) {
|
||||
await redisCache.$updateBlocks(this.blocks);
|
||||
await redisCache.$updateBlockSummaries(this.blockSummaries);
|
||||
await redisCache.$removeTransactions(txIds);
|
||||
await rbfCache.updateCache();
|
||||
}
|
||||
|
||||
handledBlocks++;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ class DiskCache {
|
||||
};
|
||||
|
||||
constructor() {
|
||||
if (!cluster.isPrimary) {
|
||||
if (!cluster.isPrimary || !config.MEMPOOL.CACHE_ENABLED) {
|
||||
return;
|
||||
}
|
||||
process.on('SIGINT', (e) => {
|
||||
@@ -39,7 +39,7 @@ class DiskCache {
|
||||
}
|
||||
|
||||
async $saveCacheToDisk(sync: boolean = false): Promise<void> {
|
||||
if (!cluster.isPrimary) {
|
||||
if (!cluster.isPrimary || !config.MEMPOOL.CACHE_ENABLED) {
|
||||
return;
|
||||
}
|
||||
if (this.isWritingCache) {
|
||||
@@ -175,7 +175,7 @@ class DiskCache {
|
||||
}
|
||||
|
||||
async $loadMempoolCache(): Promise<void> {
|
||||
if (!fs.existsSync(DiskCache.FILE_NAME)) {
|
||||
if (!config.MEMPOOL.CACHE_ENABLED || !fs.existsSync(DiskCache.FILE_NAME)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
||||
@@ -9,7 +9,7 @@ import loadingIndicators from './loading-indicators';
|
||||
import bitcoinClient from './bitcoin/bitcoin-client';
|
||||
import bitcoinSecondClient from './bitcoin/bitcoin-second-client';
|
||||
import rbfCache from './rbf-cache';
|
||||
import { IEsploraApi } from './bitcoin/esplora-api.interface';
|
||||
import redisCache from './redis-cache';
|
||||
|
||||
class Mempool {
|
||||
private inSync: boolean = false;
|
||||
@@ -102,6 +102,10 @@ class Mempool {
|
||||
await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []);
|
||||
}
|
||||
this.addToSpendMap(Object.values(this.mempoolCache));
|
||||
if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) {
|
||||
logger.debug('copying mempool from disk cache into Redis');
|
||||
await redisCache.$addTransactions(Object.values(mempoolData));
|
||||
}
|
||||
}
|
||||
|
||||
public async $reloadMempool(expectedCount: number): Promise<MempoolTransactionExtended[]> {
|
||||
@@ -318,6 +322,12 @@ class Mempool {
|
||||
loadingIndicators.setProgress('mempool', 100);
|
||||
}
|
||||
|
||||
// Update Redis cache
|
||||
if (config.REDIS.ENABLED) {
|
||||
await redisCache.$addTransactions(newTransactions);
|
||||
await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid));
|
||||
}
|
||||
|
||||
const end = new Date().getTime();
|
||||
const time = end - start;
|
||||
logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`);
|
||||
|
||||
140
backend/src/api/redis-cache.ts
Normal file
140
backend/src/api/redis-cache.ts
Normal file
@@ -0,0 +1,140 @@
|
||||
import { createClient } from 'redis';
|
||||
import memPool from './mempool';
|
||||
import blocks from './blocks';
|
||||
import logger from '../logger';
|
||||
import config from '../config';
|
||||
import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces';
|
||||
|
||||
class RedisCache {
|
||||
private client;
|
||||
private connected = false;
|
||||
|
||||
constructor() {
|
||||
if (config.REDIS.ENABLED) {
|
||||
const redisConfig = {
|
||||
socket: {
|
||||
path: config.REDIS.UNIX_SOCKET_PATH
|
||||
}
|
||||
};
|
||||
this.client = createClient(redisConfig);
|
||||
this.client.on('error', (e) => {
|
||||
logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`);
|
||||
});
|
||||
this.$ensureConnected();
|
||||
}
|
||||
}
|
||||
|
||||
private async $ensureConnected(): Promise<void> {
|
||||
if (!this.connected && config.REDIS.ENABLED) {
|
||||
return this.client.connect().then(() => {
|
||||
this.connected = true;
|
||||
logger.info(`Redis client connected`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async $updateBlocks(blocks: BlockExtended[]) {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
await this.client.json.set('blocks', '$', blocks);
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
}
|
||||
}
|
||||
|
||||
async $updateBlockSummaries(summaries: BlockSummary[]) {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
await this.client.json.set('block-summaries', '$', summaries);
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
}
|
||||
}
|
||||
|
||||
async $addTransactions(newTransactions: TransactionExtended[]) {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
await Promise.all(newTransactions.map(tx => {
|
||||
return this.client.json.set('tx:' + tx.txid, '$', tx);
|
||||
}));
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
}
|
||||
}
|
||||
|
||||
async $removeTransactions(transactions: string[]) {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
await Promise.all(transactions.map(txid => {
|
||||
return this.client.del('tx:' + txid);
|
||||
}));
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
}
|
||||
}
|
||||
|
||||
async $getBlocks(): Promise<BlockExtended[]> {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
return this.client.json.get('blocks');
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async $getBlockSummaries(): Promise<BlockSummary[]> {
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
return this.client.json.get('block-summaries');
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async $getMempool(): Promise<{ [txid: string]: TransactionExtended }> {
|
||||
const mempool = {};
|
||||
try {
|
||||
await this.$ensureConnected();
|
||||
const keys = await this.client.keys('tx:*');
|
||||
const promises: Promise<TransactionExtended[]>[] = [];
|
||||
for (let i = 0; i < keys.length; i += 10000) {
|
||||
const keySlice = keys.slice(i, i + 10000);
|
||||
if (!keySlice.length) {
|
||||
continue;
|
||||
}
|
||||
promises.push(this.client.json.mGet(keySlice, '$').then(chunk => {
|
||||
for (const txs of chunk) {
|
||||
for (const tx of txs) {
|
||||
if (tx) {
|
||||
mempool[tx.txid] = tx;
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
await Promise.all(promises);
|
||||
} catch (e) {
|
||||
logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`);
|
||||
}
|
||||
return mempool;
|
||||
}
|
||||
|
||||
async $loadCache() {
|
||||
logger.info('Restoring mempool and blocks data from Redis cache');
|
||||
// Load block data
|
||||
const loadedBlocks = await this.$getBlocks();
|
||||
const loadedBlockSummaries = await this.$getBlockSummaries();
|
||||
// Load mempool
|
||||
const loadedMempool = await this.$getMempool();
|
||||
|
||||
// Set loaded data
|
||||
blocks.setBlocks(loadedBlocks || []);
|
||||
blocks.setBlockSummaries(loadedBlockSummaries || []);
|
||||
await memPool.$setMempool(loadedMempool);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
export default new RedisCache();
|
||||
Reference in New Issue
Block a user