Add transactions to Redis cache in manageable batches
This commit is contained in:
parent
b6cb539470
commit
d65bddd30b
@ -86,6 +86,10 @@ class Mempool {
|
|||||||
public async $setMempool(mempoolData: { [txId: string]: MempoolTransactionExtended }) {
|
public async $setMempool(mempoolData: { [txId: string]: MempoolTransactionExtended }) {
|
||||||
this.mempoolCache = mempoolData;
|
this.mempoolCache = mempoolData;
|
||||||
let count = 0;
|
let count = 0;
|
||||||
|
const redisTimer = Date.now();
|
||||||
|
if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) {
|
||||||
|
logger.debug(`Migrating ${Object.keys(this.mempoolCache).length} transactions from disk cache to Redis cache`);
|
||||||
|
}
|
||||||
for (const txid of Object.keys(this.mempoolCache)) {
|
for (const txid of Object.keys(this.mempoolCache)) {
|
||||||
if (!this.mempoolCache[txid].sigops || this.mempoolCache[txid].effectiveFeePerVsize == null) {
|
if (!this.mempoolCache[txid].sigops || this.mempoolCache[txid].effectiveFeePerVsize == null) {
|
||||||
this.mempoolCache[txid] = transactionUtils.extendMempoolTransaction(this.mempoolCache[txid]);
|
this.mempoolCache[txid] = transactionUtils.extendMempoolTransaction(this.mempoolCache[txid]);
|
||||||
@ -94,6 +98,13 @@ class Mempool {
|
|||||||
this.mempoolCache[txid].order = transactionUtils.txidToOrdering(txid);
|
this.mempoolCache[txid].order = transactionUtils.txidToOrdering(txid);
|
||||||
}
|
}
|
||||||
count++;
|
count++;
|
||||||
|
if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) {
|
||||||
|
await redisCache.$addTransaction(this.mempoolCache[txid]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) {
|
||||||
|
await redisCache.$flushTransactions();
|
||||||
|
logger.debug(`Finished migrating cache transactions in ${((Date.now() - redisTimer) / 1000).toFixed(2)} seconds`);
|
||||||
}
|
}
|
||||||
if (this.mempoolChangedCallback) {
|
if (this.mempoolChangedCallback) {
|
||||||
this.mempoolChangedCallback(this.mempoolCache, [], []);
|
this.mempoolChangedCallback(this.mempoolCache, [], []);
|
||||||
@ -102,10 +113,6 @@ class Mempool {
|
|||||||
await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []);
|
await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []);
|
||||||
}
|
}
|
||||||
this.addToSpendMap(Object.values(this.mempoolCache));
|
this.addToSpendMap(Object.values(this.mempoolCache));
|
||||||
if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) {
|
|
||||||
logger.debug('copying mempool from disk cache into Redis');
|
|
||||||
await redisCache.$addTransactions(Object.values(mempoolData));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $reloadMempool(expectedCount: number): Promise<MempoolTransactionExtended[]> {
|
public async $reloadMempool(expectedCount: number): Promise<MempoolTransactionExtended[]> {
|
||||||
@ -212,6 +219,7 @@ class Mempool {
|
|||||||
logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`);
|
logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`);
|
||||||
try {
|
try {
|
||||||
newTransactions = await this.$reloadMempool(transactions.length);
|
newTransactions = await this.$reloadMempool(transactions.length);
|
||||||
|
redisCache.$addTransactions(newTransactions);
|
||||||
loaded = true;
|
loaded = true;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions');
|
logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions');
|
||||||
@ -234,6 +242,10 @@ class Mempool {
|
|||||||
}
|
}
|
||||||
hasChange = true;
|
hasChange = true;
|
||||||
newTransactions.push(transaction);
|
newTransactions.push(transaction);
|
||||||
|
|
||||||
|
if (config.REDIS.ENABLED) {
|
||||||
|
await redisCache.$addTransaction(transaction);
|
||||||
|
}
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
|
if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) {
|
||||||
this.missingTxCount++;
|
this.missingTxCount++;
|
||||||
@ -324,7 +336,7 @@ class Mempool {
|
|||||||
|
|
||||||
// Update Redis cache
|
// Update Redis cache
|
||||||
if (config.REDIS.ENABLED) {
|
if (config.REDIS.ENABLED) {
|
||||||
await redisCache.$addTransactions(newTransactions);
|
await redisCache.$flushTransactions();
|
||||||
await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid));
|
await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid));
|
||||||
await rbfCache.updateCache();
|
await rbfCache.updateCache();
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,9 @@ class RedisCache {
|
|||||||
private client;
|
private client;
|
||||||
private connected = false;
|
private connected = false;
|
||||||
|
|
||||||
|
private cacheQueue: MempoolTransactionExtended[] = [];
|
||||||
|
private txFlushLimit: number = 1000;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
if (config.REDIS.ENABLED) {
|
if (config.REDIS.ENABLED) {
|
||||||
const redisConfig = {
|
const redisConfig = {
|
||||||
@ -52,6 +55,18 @@ class RedisCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async $addTransaction(tx: MempoolTransactionExtended) {
|
||||||
|
this.cacheQueue.push(tx);
|
||||||
|
if (this.cacheQueue.length > this.txFlushLimit) {
|
||||||
|
await this.$flushTransactions();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async $flushTransactions() {
|
||||||
|
await this.$addTransactions(this.cacheQueue);
|
||||||
|
this.cacheQueue = [];
|
||||||
|
}
|
||||||
|
|
||||||
async $addTransactions(newTransactions: MempoolTransactionExtended[]) {
|
async $addTransactions(newTransactions: MempoolTransactionExtended[]) {
|
||||||
try {
|
try {
|
||||||
await this.$ensureConnected();
|
await this.$ensureConnected();
|
||||||
@ -118,6 +133,7 @@ class RedisCache {
|
|||||||
await this.$ensureConnected();
|
await this.$ensureConnected();
|
||||||
const keys = await this.client.keys('tx:*');
|
const keys = await this.client.keys('tx:*');
|
||||||
const promises: Promise<MempoolTransactionExtended[]>[] = [];
|
const promises: Promise<MempoolTransactionExtended[]>[] = [];
|
||||||
|
let returned = 0;
|
||||||
for (let i = 0; i < keys.length; i += 10000) {
|
for (let i = 0; i < keys.length; i += 10000) {
|
||||||
const keySlice = keys.slice(i, i + 10000);
|
const keySlice = keys.slice(i, i + 10000);
|
||||||
if (!keySlice.length) {
|
if (!keySlice.length) {
|
||||||
@ -131,6 +147,8 @@ class RedisCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
logger.info(`Loaded ${(returned * 10000) + (chunk.length)}/${keys.length} transactions from Redis cache`);
|
||||||
|
returned++;
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
await Promise.all(promises);
|
await Promise.all(promises);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user