Merge pull request #3700 from mempool/mononaut/fix-async-cache-load

await for mempool change handler after loading disk cache
This commit is contained in:
softsimon 2023-05-01 01:54:33 +04:00 committed by GitHub
commit a87b604153
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 18 deletions

View File

@ -124,7 +124,7 @@ class DiskCache {
} }
} }
loadMempoolCache(): void { async $loadMempoolCache(): Promise<void> {
if (!fs.existsSync(DiskCache.FILE_NAME)) { if (!fs.existsSync(DiskCache.FILE_NAME)) {
return; return;
} }
@ -168,7 +168,7 @@ class DiskCache {
} }
} }
memPool.setMempool(data.mempool); await memPool.$setMempool(data.mempool);
blocks.setBlocks(data.blocks); blocks.setBlocks(data.blocks);
blocks.setBlockSummaries(data.blockSummaries || []); blocks.setBlockSummaries(data.blockSummaries || []);
} catch (e) { } catch (e) {

View File

@ -144,7 +144,7 @@ class MempoolBlocks {
return mempoolBlockDeltas; return mempoolBlockDeltas;
} }
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> { public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> {
// prepare a stripped down version of the mempool with only the minimum necessary data // prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread // to reduce the overhead of passing this data to the worker thread
const strippedMempool: { [txid: string]: ThreadTransaction } = {}; const strippedMempool: { [txid: string]: ThreadTransaction } = {};
@ -202,10 +202,10 @@ class MempoolBlocks {
return this.mempoolBlocks; return this.mempoolBlocks;
} }
public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise<void> { public async $updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise<void> {
if (!this.txSelectionWorker) { if (!this.txSelectionWorker) {
// need to reset the worker // need to reset the worker
this.makeBlockTemplates(newMempool, saveResults); await this.$makeBlockTemplates(newMempool, saveResults);
return; return;
} }
// prepare a stripped down version of the mempool with only the minimum necessary data // prepare a stripped down version of the mempool with only the minimum necessary data

View File

@ -20,7 +20,7 @@ class Mempool {
maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 }; maxmempool: 300000000, mempoolminfee: 0.00001000, minrelaytxfee: 0.00001000 };
private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => void) | undefined; deletedTransactions: TransactionExtended[]) => void) | undefined;
private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[], private $asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => Promise<void>) | undefined; deletedTransactions: TransactionExtended[]) => Promise<void>) | undefined;
private txPerSecondArray: number[] = []; private txPerSecondArray: number[] = [];
@ -73,20 +73,20 @@ class Mempool {
public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; }, public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: TransactionExtended; },
newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => Promise<void>) { newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]) => Promise<void>) {
this.asyncMempoolChangedCallback = fn; this.$asyncMempoolChangedCallback = fn;
} }
public getMempool(): { [txid: string]: TransactionExtended } { public getMempool(): { [txid: string]: TransactionExtended } {
return this.mempoolCache; return this.mempoolCache;
} }
public setMempool(mempoolData: { [txId: string]: TransactionExtended }) { public async $setMempool(mempoolData: { [txId: string]: TransactionExtended }) {
this.mempoolCache = mempoolData; this.mempoolCache = mempoolData;
if (this.mempoolChangedCallback) { if (this.mempoolChangedCallback) {
this.mempoolChangedCallback(this.mempoolCache, [], []); this.mempoolChangedCallback(this.mempoolCache, [], []);
} }
if (this.asyncMempoolChangedCallback) { if (this.$asyncMempoolChangedCallback) {
this.asyncMempoolChangedCallback(this.mempoolCache, [], []); await this.$asyncMempoolChangedCallback(this.mempoolCache, [], []);
} }
} }
@ -230,9 +230,9 @@ class Mempool {
if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions);
} }
if (this.asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) { if (this.$asyncMempoolChangedCallback && (hasChange || deletedTransactions.length)) {
this.updateTimerProgress(timer, 'running async mempool callback'); this.updateTimerProgress(timer, 'running async mempool callback');
await this.asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions); await this.$asyncMempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions);
this.updateTimerProgress(timer, 'completed async mempool callback'); this.updateTimerProgress(timer, 'completed async mempool callback');
} }

View File

@ -247,14 +247,14 @@ class WebsocketHandler {
}); });
} }
async handleMempoolChange(newMempool: { [txid: string]: TransactionExtended }, async $handleMempoolChange(newMempool: { [txid: string]: TransactionExtended },
newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): Promise<void> { newTransactions: TransactionExtended[], deletedTransactions: TransactionExtended[]): Promise<void> {
if (!this.wss) { if (!this.wss) {
throw new Error('WebSocket.Server is not set'); throw new Error('WebSocket.Server is not set');
} }
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true); await mempoolBlocks.$updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid), true);
} else { } else {
mempoolBlocks.updateMempoolBlocks(newMempool, true); mempoolBlocks.updateMempoolBlocks(newMempool, true);
} }
@ -429,7 +429,7 @@ class WebsocketHandler {
// a cloned copy of the mempool if we're running a different algorithm for mempool updates // a cloned copy of the mempool if we're running a different algorithm for mempool updates
const auditMempool = (config.MEMPOOL.ADVANCED_GBT_AUDIT === config.MEMPOOL.ADVANCED_GBT_MEMPOOL) ? _memPool : deepClone(_memPool); const auditMempool = (config.MEMPOOL.ADVANCED_GBT_AUDIT === config.MEMPOOL.ADVANCED_GBT_MEMPOOL) ? _memPool : deepClone(_memPool);
if (config.MEMPOOL.ADVANCED_GBT_AUDIT) { if (config.MEMPOOL.ADVANCED_GBT_AUDIT) {
projectedBlocks = await mempoolBlocks.makeBlockTemplates(auditMempool, false); projectedBlocks = await mempoolBlocks.$makeBlockTemplates(auditMempool, false);
} else { } else {
projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false); projectedBlocks = mempoolBlocks.updateMempoolBlocks(auditMempool, false);
} }
@ -486,7 +486,7 @@ class WebsocketHandler {
} }
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) { if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.updateBlockTemplates(_memPool, [], removed, true); await mempoolBlocks.$updateBlockTemplates(_memPool, [], removed, true);
} else { } else {
mempoolBlocks.updateMempoolBlocks(_memPool, true); mempoolBlocks.updateMempoolBlocks(_memPool, true);
} }

View File

@ -120,7 +120,7 @@ class Server {
await poolsUpdater.updatePoolsJson(); // Needs to be done before loading the disk cache because we sometimes wipe it await poolsUpdater.updatePoolsJson(); // Needs to be done before loading the disk cache because we sometimes wipe it
await syncAssets.syncAssets$(); await syncAssets.syncAssets$();
if (config.MEMPOOL.ENABLED) { if (config.MEMPOOL.ENABLED) {
diskCache.loadMempoolCache(); await diskCache.$loadMempoolCache();
} }
if (config.STATISTICS.ENABLED && config.DATABASE.ENABLED && cluster.isPrimary) { if (config.STATISTICS.ENABLED && config.DATABASE.ENABLED && cluster.isPrimary) {
@ -238,7 +238,7 @@ class Server {
websocketHandler.setupConnectionHandling(); websocketHandler.setupConnectionHandling();
if (config.MEMPOOL.ENABLED) { if (config.MEMPOOL.ENABLED) {
statistics.setNewStatisticsEntryCallback(websocketHandler.handleNewStatistic.bind(websocketHandler)); statistics.setNewStatisticsEntryCallback(websocketHandler.handleNewStatistic.bind(websocketHandler));
memPool.setAsyncMempoolChangedCallback(websocketHandler.handleMempoolChange.bind(websocketHandler)); memPool.setAsyncMempoolChangedCallback(websocketHandler.$handleMempoolChange.bind(websocketHandler));
blocks.setNewAsyncBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler)); blocks.setNewAsyncBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler));
} }
priceUpdater.setRatesChangedCallback(websocketHandler.handleNewConversionRates.bind(websocketHandler)); priceUpdater.setRatesChangedCallback(websocketHandler.handleNewConversionRates.bind(websocketHandler));