Merge branch 'master' into nymkappa/bugfix/tx-fetcher-crash

This commit is contained in:
softsimon
2023-01-26 17:02:35 +04:00
committed by GitHub
162 changed files with 41212 additions and 17875 deletions

View File

@@ -26,9 +26,9 @@
"INDEXING_BLOCKS_AMOUNT": 14,
"POOLS_JSON_TREE_URL": "__POOLS_JSON_TREE_URL__",
"POOLS_JSON_URL": "__POOLS_JSON_URL__",
"ADVANCED_GBT_AUDIT": "__ADVANCED_GBT_AUDIT__",
"ADVANCED_GBT_MEMPOOL": "__ADVANCED_GBT_MEMPOOL__",
"TRANSACTION_INDEXING": "__TRANSACTION_INDEXING__"
"ADVANCED_GBT_AUDIT": "__MEMPOOL_ADVANCED_GBT_AUDIT__",
"ADVANCED_GBT_MEMPOOL": "__MEMPOOL_ADVANCED_GBT_MEMPOOL__",
"CPFP_INDEXING": "__MEMPOOL_CPFP_INDEXING__"
},
"CORE_RPC": {
"HOST": "__CORE_RPC_HOST__",
@@ -101,7 +101,8 @@
"STATS_REFRESH_INTERVAL": 600,
"GRAPH_REFRESH_INTERVAL": 600,
"LOGGER_UPDATE_INTERVAL": 30,
"FORENSICS_INTERVAL": 43200
"FORENSICS_INTERVAL": 43200,
"FORENSICS_RATE_LIMIT": "__FORENSICS_RATE_LIMIT__"
},
"LND": {
"TLS_CERT_PATH": "",

View File

@@ -40,7 +40,7 @@ describe('Mempool Backend Config', () => {
POOLS_JSON_URL: 'https://raw.githubusercontent.com/mempool/mining-pools/master/pools.json',
ADVANCED_GBT_AUDIT: false,
ADVANCED_GBT_MEMPOOL: false,
TRANSACTION_INDEXING: false,
CPFP_INDEXING: false,
});
expect(config.ELECTRUM).toStrictEqual({ HOST: '127.0.0.1', PORT: 3306, TLS_ENABLED: true });

View File

@@ -1,10 +1,5 @@
import config from '../config';
import bitcoinApi from './bitcoin/bitcoin-api-factory';
import { Common } from './common';
import { TransactionExtended, MempoolBlockWithTransactions, AuditScore } from '../mempool.interfaces';
import blocksRepository from '../repositories/BlocksRepository';
import blocksAuditsRepository from '../repositories/BlocksAuditsRepository';
import blocks from '../api/blocks';
import { TransactionExtended, MempoolBlockWithTransactions } from '../mempool.interfaces';
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners

View File

@@ -2,6 +2,7 @@ import fs from 'fs';
import path from 'path';
import os from 'os';
import { IBackendInfo } from '../mempool.interfaces';
import config from '../config';
class BackendInfo {
private backendInfo: IBackendInfo;
@@ -22,7 +23,8 @@ class BackendInfo {
this.backendInfo = {
hostname: os.hostname(),
version: versionInfo.version,
gitCommit: versionInfo.gitCommit
gitCommit: versionInfo.gitCommit,
lightning: config.LIGHTNING.ENABLED
};
}

View File

@@ -17,4 +17,6 @@ function bitcoinApiFactory(): AbstractBitcoinApi {
}
}
export const bitcoinCoreApi = new BitcoinApi(bitcoinClient);
export default bitcoinApiFactory();

View File

@@ -18,6 +18,7 @@ import blocks from '../blocks';
import bitcoinClient from './bitcoin-client';
import difficultyAdjustment from '../difficulty-adjustment';
import transactionRepository from '../../repositories/TransactionRepository';
import rbfCache from '../rbf-cache';
class BitcoinRoutes {
public initRoutes(app: Application) {
@@ -31,6 +32,8 @@ class BitcoinRoutes {
.get(config.MEMPOOL.API_URL_PREFIX + 'backend-info', this.getBackendInfo)
.get(config.MEMPOOL.API_URL_PREFIX + 'init-data', this.getInitData)
.get(config.MEMPOOL.API_URL_PREFIX + 'validate-address/:address', this.validateAddress)
.get(config.MEMPOOL.API_URL_PREFIX + 'tx/:txId/replaces', this.getRbfHistory)
.get(config.MEMPOOL.API_URL_PREFIX + 'tx/:txId/cached', this.getCachedTx)
.post(config.MEMPOOL.API_URL_PREFIX + 'tx/push', this.$postTransactionForm)
.get(config.MEMPOOL.API_URL_PREFIX + 'donations', async (req, res) => {
try {
@@ -402,7 +405,8 @@ class BitcoinRoutes {
private async getLegacyBlocks(req: Request, res: Response) {
try {
const returnBlocks: IEsploraApi.Block[] = [];
const fromHeight = parseInt(req.params.height, 10) || blocks.getCurrentBlockHeight();
const tip = blocks.getCurrentBlockHeight();
const fromHeight = Math.min(parseInt(req.params.height, 10) || tip, tip);
// Check if block height exist in local cache to skip the hash lookup
const blockByHeight = blocks.getBlocks().find((b) => b.height === fromHeight);
@@ -588,6 +592,28 @@ class BitcoinRoutes {
}
}
private async getRbfHistory(req: Request, res: Response) {
try {
const result = rbfCache.getReplaces(req.params.txId);
res.json(result || []);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async getCachedTx(req: Request, res: Response) {
try {
const result = rbfCache.getTx(req.params.txId);
if (result) {
res.json(result);
} else {
res.status(404).send('not found');
}
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async getTransactionOutspends(req: Request, res: Response) {
try {
const result = await bitcoinApi.$getOutspends(req.params.txId);

View File

@@ -22,12 +22,10 @@ import poolsParser from './pools-parser';
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
import cpfpRepository from '../repositories/CpfpRepository';
import transactionRepository from '../repositories/TransactionRepository';
import mining from './mining/mining';
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
import PricesRepository from '../repositories/PricesRepository';
import priceUpdater from '../tasks/price-updater';
import { Block } from 'bitcoinjs-lib';
class Blocks {
private blocks: BlockExtended[] = [];
@@ -101,12 +99,23 @@ class Blocks {
transactions.push(tx);
transactionsFetched++;
} catch (e) {
if (i === 0) {
const msg = `Cannot fetch coinbase tx ${txIds[i]}. Reason: ` + (e instanceof Error ? e.message : e);
logger.err(msg);
throw new Error(msg);
} else {
logger.err(`Cannot fetch tx ${txIds[i]}. Reason: ` + (e instanceof Error ? e.message : e));
try {
if (config.MEMPOOL.BACKEND === 'esplora') {
// Try again with core
const tx = await transactionUtils.$getTransactionExtended(txIds[i], false, false, true);
transactions.push(tx);
transactionsFetched++;
} else {
throw e;
}
} catch (e) {
if (i === 0) {
const msg = `Cannot fetch coinbase tx ${txIds[i]}. Reason: ` + (e instanceof Error ? e.message : e);
logger.err(msg);
throw new Error(msg);
} else {
logger.err(`Cannot fetch tx ${txIds[i]}. Reason: ` + (e instanceof Error ? e.message : e));
}
}
}
}
@@ -296,7 +305,7 @@ class Blocks {
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
const blockPerSeconds = Math.max(1, indexedThisRun / elapsedSeconds);
const progress = Math.round(totalIndexed / indexedBlocks.length * 10000) / 100;
logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor} seconds`);
logger.debug(`Indexing block summary for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexedBlocks.length} (${progress}%) | elapsed: ${runningFor} seconds`, logger.tags.mining);
timer = new Date().getTime() / 1000;
indexedThisRun = 0;
}
@@ -309,12 +318,12 @@ class Blocks {
newlyIndexed++;
}
if (newlyIndexed > 0) {
logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`);
logger.notice(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
} else {
logger.debug(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`);
logger.debug(`Blocks summaries indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
}
} catch (e) {
logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(`Blocks summaries indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
throw e;
}
}
@@ -329,9 +338,10 @@ class Blocks {
try {
// Get all indexed block hash
const unindexedBlocks = await blocksRepository.$getCPFPUnindexedBlocks();
const unindexedBlockHeights = await blocksRepository.$getCPFPUnindexedBlocks();
logger.info(`Indexing cpfp data for ${unindexedBlockHeights.length} blocks`);
if (!unindexedBlocks?.length) {
if (!unindexedBlockHeights?.length) {
return;
}
@@ -340,30 +350,26 @@ class Blocks {
let countThisRun = 0;
let timer = new Date().getTime() / 1000;
const startedAt = new Date().getTime() / 1000;
for (const block of unindexedBlocks) {
for (const height of unindexedBlockHeights) {
// Logging
const hash = await bitcoinApi.$getBlockHash(height);
const elapsedSeconds = Math.max(1, new Date().getTime() / 1000 - timer);
if (elapsedSeconds > 5) {
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
const blockPerSeconds = Math.max(1, countThisRun / elapsedSeconds);
const progress = Math.round(count / unindexedBlocks.length * 10000) / 100;
logger.debug(`Indexing cpfp clusters for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlocks.length} (${progress}%) | elapsed: ${runningFor} seconds`);
const blockPerSeconds = (countThisRun / elapsedSeconds);
const progress = Math.round(count / unindexedBlockHeights.length * 10000) / 100;
logger.debug(`Indexing cpfp clusters for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlockHeights.length} (${progress}%) | elapsed: ${runningFor} seconds`);
timer = new Date().getTime() / 1000;
countThisRun = 0;
}
await this.$indexCPFP(block.hash, block.height); // Calculate and save CPFP data for transactions in this block
await this.$indexCPFP(hash, height); // Calculate and save CPFP data for transactions in this block
// Logging
count++;
countThisRun++;
}
if (count > 0) {
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
} else {
logger.debug(`CPFP indexing completed: indexed ${count} blocks`);
}
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
} catch (e) {
logger.err(`CPFP indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
throw e;
@@ -385,7 +391,7 @@ class Blocks {
const lastBlockToIndex = Math.max(0, currentBlockHeight - indexingBlockAmount + 1);
logger.debug(`Indexing blocks from #${currentBlockHeight} to #${lastBlockToIndex}`);
logger.debug(`Indexing blocks from #${currentBlockHeight} to #${lastBlockToIndex}`, logger.tags.mining);
loadingIndicators.setProgress('block-indexing', 0);
const chunkSize = 10000;
@@ -405,7 +411,7 @@ class Blocks {
continue;
}
logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`);
logger.info(`Indexing ${missingBlockHeights.length} blocks from #${currentBlockHeight} to #${endBlock}`, logger.tags.mining);
for (const blockHeight of missingBlockHeights) {
if (blockHeight < lastBlockToIndex) {
@@ -418,7 +424,7 @@ class Blocks {
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
const blockPerSeconds = Math.max(1, indexedThisRun / elapsedSeconds);
const progress = Math.round(totalIndexed / indexingBlockAmount * 10000) / 100;
logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress}%) | elapsed: ${runningFor} seconds`);
logger.debug(`Indexing block #${blockHeight} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${totalIndexed}/${indexingBlockAmount} (${progress}%) | elapsed: ${runningFor} seconds`, logger.tags.mining);
timer = new Date().getTime() / 1000;
indexedThisRun = 0;
loadingIndicators.setProgress('block-indexing', progress, false);
@@ -435,13 +441,13 @@ class Blocks {
currentBlockHeight -= chunkSize;
}
if (newlyIndexed > 0) {
logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`);
logger.notice(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
} else {
logger.debug(`Block indexing completed: indexed ${newlyIndexed} blocks`);
logger.debug(`Block indexing completed: indexed ${newlyIndexed} blocks`, logger.tags.mining);
}
loadingIndicators.setProgress('block-indexing', 100);
} catch (e) {
logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e));
logger.err('Block indexing failed. Trying again in 10 seconds. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
loadingIndicators.setProgress('block-indexing', 100);
throw e;
}
@@ -519,7 +525,7 @@ class Blocks {
for (let i = 10; i >= 0; --i) {
const newBlock = await this.$indexBlock(lastBlock['height'] - i);
await this.$getStrippedBlockTransactions(newBlock.id, true, true);
if (config.MEMPOOL.TRANSACTION_INDEXING) {
if (config.MEMPOOL.CPFP_INDEXING) {
await this.$indexCPFP(newBlock.id, lastBlock['height'] - i);
}
}
@@ -537,7 +543,7 @@ class Blocks {
priceId: lastestPriceId,
}]);
} else {
logger.info(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`)
logger.info(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`, logger.tags.mining);
setTimeout(() => {
indexer.runSingleTask('blocksPrices');
}, 10000);
@@ -547,7 +553,7 @@ class Blocks {
if (Common.blocksSummariesIndexingEnabled() === true) {
await this.$getStrippedBlockTransactions(blockExtended.id, true);
}
if (config.MEMPOOL.TRANSACTION_INDEXING) {
if (config.MEMPOOL.CPFP_INDEXING) {
this.$indexCPFP(blockExtended.id, this.currentBlockHeight);
}
}
@@ -677,7 +683,12 @@ class Blocks {
}
public async $getBlocks(fromHeight?: number, limit: number = 15): Promise<BlockExtended[]> {
let currentHeight = fromHeight !== undefined ? fromHeight : await blocksRepository.$mostRecentBlockHeight();
let currentHeight = fromHeight !== undefined ? fromHeight : this.currentBlockHeight;
if (currentHeight > this.currentBlockHeight) {
limit -= currentHeight - this.currentBlockHeight;
currentHeight = this.currentBlockHeight;
}
const returnBlocks: BlockExtended[] = [];
if (currentHeight < 0) {
@@ -741,32 +752,15 @@ class Blocks {
}
public async $indexCPFP(hash: string, height: number): Promise<void> {
let transactions;
if (false/*Common.blocksSummariesIndexingEnabled()*/) {
transactions = await this.$getStrippedBlockTransactions(hash);
const rawBlock = await bitcoinApi.$getRawBlock(hash);
const block = Block.fromBuffer(rawBlock);
const txMap = {};
for (const tx of block.transactions || []) {
txMap[tx.getId()] = tx;
}
for (const tx of transactions) {
if (txMap[tx.txid]?.ins) {
tx.vin = txMap[tx.txid].ins.map(vin => {
return {
txid: vin.hash
};
});
}
}
} else {
const block = await bitcoinClient.getBlock(hash, 2);
transactions = block.tx.map(tx => {
tx.vsize = tx.weight / 4;
return tx;
});
}
const block = await bitcoinClient.getBlock(hash, 2);
const transactions = block.tx.map(tx => {
tx.vsize = tx.weight / 4;
tx.fee *= 100_000_000;
return tx;
});
const clusters: any[] = [];
let cluster: TransactionStripped[] = [];
let ancestors: { [txid: string]: boolean } = {};
for (let i = transactions.length - 1; i >= 0; i--) {
@@ -778,12 +772,14 @@ class Blocks {
totalFee += tx?.fee || 0;
totalVSize += tx.vsize;
});
const effectiveFeePerVsize = (totalFee * 100_000_000) / totalVSize;
const effectiveFeePerVsize = totalFee / totalVSize;
if (cluster.length > 1) {
await cpfpRepository.$saveCluster(height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: (tx.fee || 0) * 100_000_000 }; }), effectiveFeePerVsize);
for (const tx of cluster) {
await transactionRepository.$setCluster(tx.txid, cluster[0].txid);
}
clusters.push({
root: cluster[0].txid,
height,
txs: cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }),
effectiveFeePerVsize,
});
}
cluster = [];
ancestors = {};
@@ -793,7 +789,10 @@ class Blocks {
ancestors[vin.txid] = true;
});
}
await blocksRepository.$setCPFPIndexed(hash);
const result = await cpfpRepository.$batchSaveClusters(clusters);
if (!result) {
await cpfpRepository.$insertProgressMarker(height);
}
}
}

View File

@@ -35,24 +35,31 @@ export class Common {
}
static getFeesInRange(transactions: TransactionExtended[], rangeLength: number) {
const arr = [transactions[transactions.length - 1].effectiveFeePerVsize];
const filtered: TransactionExtended[] = [];
let lastValidRate = Infinity;
// filter out anomalous fee rates to ensure monotonic range
for (const tx of transactions) {
if (tx.effectiveFeePerVsize <= lastValidRate) {
filtered.push(tx);
lastValidRate = tx.effectiveFeePerVsize;
}
}
const arr = [filtered[filtered.length - 1].effectiveFeePerVsize];
const chunk = 1 / (rangeLength - 1);
let itemsToAdd = rangeLength - 2;
while (itemsToAdd > 0) {
arr.push(transactions[Math.floor(transactions.length * chunk * itemsToAdd)].effectiveFeePerVsize);
arr.push(filtered[Math.floor(filtered.length * chunk * itemsToAdd)].effectiveFeePerVsize);
itemsToAdd--;
}
arr.push(transactions[0].effectiveFeePerVsize);
arr.push(filtered[0].effectiveFeePerVsize);
return arr;
}
static findRbfTransactions(added: TransactionExtended[], deleted: TransactionExtended[]): { [txid: string]: TransactionExtended } {
const matches: { [txid: string]: TransactionExtended } = {};
deleted
// The replaced tx must have at least one input with nSequence < maxint-1 (Thats the opt-in)
.filter((tx) => tx.vin.some((vin) => vin.sequence < 0xfffffffe))
.forEach((deletedTx) => {
const foundMatches = added.find((addedTx) => {
// The new tx must, absolutely speaking, pay at least as much fee as the replaced tx.
@@ -61,7 +68,7 @@ export class Common {
&& addedTx.feePerVsize > deletedTx.feePerVsize
// Spends one or more of the same inputs
&& deletedTx.vin.some((deletedVin) =>
addedTx.vin.some((vin) => vin.txid === deletedVin.txid));
addedTx.vin.some((vin) => vin.txid === deletedVin.txid && vin.vout === deletedVin.vout));
});
if (foundMatches) {
matches[deletedTx.txid] = foundMatches;
@@ -190,7 +197,7 @@ export class Common {
static cpfpIndexingEnabled(): boolean {
return (
Common.indexingEnabled() &&
config.MEMPOOL.TRANSACTION_INDEXING === true
config.MEMPOOL.CPFP_INDEXING === true
);
}

View File

@@ -2,9 +2,12 @@ import config from '../config';
import DB from '../database';
import logger from '../logger';
import { Common } from './common';
import blocksRepository from '../repositories/BlocksRepository';
import cpfpRepository from '../repositories/CpfpRepository';
import { RowDataPacket } from 'mysql2';
class DatabaseMigration {
private static currentVersion = 49;
private static currentVersion = 52;
private queryTimeout = 3600_000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@@ -442,6 +445,29 @@ class DatabaseMigration {
await this.$executeQuery('TRUNCATE TABLE `blocks_audits`');
await this.updateToSchemaVersion(49);
}
if (databaseSchemaVersion < 50) {
await this.$executeQuery('ALTER TABLE `blocks` DROP COLUMN `cpfp_indexed`');
await this.updateToSchemaVersion(50);
}
if (databaseSchemaVersion < 51) {
await this.$executeQuery('ALTER TABLE `cpfp_clusters` ADD INDEX `height` (`height`)');
await this.updateToSchemaVersion(51);
}
if (databaseSchemaVersion < 52) {
await this.$executeQuery(this.getCreateCompactCPFPTableQuery(), await this.$checkIfTableExists('compact_cpfp_clusters'));
await this.$executeQuery(this.getCreateCompactTransactionsTableQuery(), await this.$checkIfTableExists('compact_transactions'));
try {
await this.$convertCompactCpfpTables();
await this.$executeQuery('DROP TABLE IF EXISTS `transactions`');
await this.$executeQuery('DROP TABLE IF EXISTS `cpfp_clusters`');
await this.updateToSchemaVersion(52);
} catch(e) {
logger.warn('' + (e instanceof Error ? e.message : e));
}
}
}
/**
@@ -913,6 +939,25 @@ class DatabaseMigration {
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
private getCreateCompactCPFPTableQuery(): string {
return `CREATE TABLE IF NOT EXISTS compact_cpfp_clusters (
root binary(32) NOT NULL,
height int(10) NOT NULL,
txs BLOB DEFAULT NULL,
fee_rate float unsigned,
PRIMARY KEY (root),
INDEX (height)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
private getCreateCompactTransactionsTableQuery(): string {
return `CREATE TABLE IF NOT EXISTS compact_transactions (
txid binary(32) NOT NULL,
cluster binary(32) DEFAULT NULL,
PRIMARY KEY (txid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
public async $truncateIndexedData(tables: string[]) {
const allowedTables = ['blocks', 'hashrates', 'prices'];
@@ -933,6 +978,49 @@ class DatabaseMigration {
logger.warn(`Unable to erase indexed data`);
}
}
private async $convertCompactCpfpTables(): Promise<void> {
try {
const batchSize = 250;
const maxHeight = await blocksRepository.$mostRecentBlockHeight() || 0;
const [minHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from cpfp_clusters`);
const minHeight = (minHeightRows.length && minHeightRows[0].minHeight != null) ? minHeightRows[0].minHeight : maxHeight;
let height = maxHeight;
// Logging
let timer = new Date().getTime() / 1000;
const startedAt = new Date().getTime() / 1000;
while (height > minHeight) {
const [rows] = await DB.query(
`
SELECT * from cpfp_clusters
WHERE height <= ? AND height > ?
ORDER BY height
`,
[height, height - batchSize]
) as RowDataPacket[][];
if (rows?.length) {
await cpfpRepository.$batchSaveClusters(rows.map(row => {
return {
root: row.root,
height: row.height,
txs: JSON.parse(row.txs),
effectiveFeePerVsize: row.fee_rate,
};
}));
}
const elapsed = new Date().getTime() / 1000 - timer;
const runningFor = new Date().getTime() / 1000 - startedAt;
logger.debug(`Migrated cpfp data from block ${height} to ${height - batchSize} in ${elapsed.toFixed(2)} seconds | total elapsed: ${runningFor.toFixed(2)} seconds`);
timer = new Date().getTime() / 1000;
height -= batchSize;
}
} catch (e) {
logger.warn(`Failed to migrate cpfp transaction data`);
}
}
}
export default new DatabaseMigration();

View File

@@ -670,9 +670,7 @@ class ChannelsApi {
AND status != 2
`);
if (result[0].changedRows ?? 0 > 0) {
logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`);
} else {
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`);
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`, logger.tags.ln);
}
} catch (e) {
logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e));

View File

@@ -538,6 +538,10 @@ class NodesApi {
const IPSIds = ISPId.split(',');
const [rows]: any = await DB.query(query, [IPSIds, IPSIds]);
if (!rows || rows.length === 0) {
return [];
}
const nodes = {};
const intISPIds: number[] = [];
@@ -681,9 +685,7 @@ class NodesApi {
)
`);
if (result[0].changedRows ?? 0 > 0) {
logger.info(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`);
} else {
logger.debug(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`);
logger.debug(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`, logger.tags.ln);
}
} catch (e) {
logger.err('$setNodesInactive() error: ' + (e instanceof Error ? e.message : e));

View File

@@ -41,13 +41,70 @@ class NodesRoutes {
let nodes: any[] = [];
switch (config.MEMPOOL.NETWORK) {
case 'testnet':
nodesList = ['032c7c7819276c4f706a04df1a0f1e10a5495994a7be4c1d3d28ca766e5a2b957b', '025a7e38c2834dd843591a4d23d5f09cdeb77ddca85f673c2d944a14220ff14cf7', '0395e2731a1673ef21d7a16a727c4fc4d4c35a861c428ce2c819c53d2b81c8bd55', '032ab2028c0b614c6d87824e2373529652fd7e4221b4c70cc4da7c7005c49afcf0', '029001b22fe70b48bee12d014df91982eb85ff1bd404ec772d5c83c4ee3e88d2c3', '0212e2848d79f928411da5f2ff0a8c95ec6ccb5a09d2031b6f71e91309dcde63af', '03e871a2229523d34f76e6311ff197cfe7f26c2fbec13554b93a46f4e710c47dab', '032202ec98d976b0e928bd1d91924e8bd3eab07231fc39feb3737b010071073df8', '02fa7c5a948d03d563a9f36940c2205a814e594d17c0042ced242c71a857d72605', '039c14fdec2d958e3d14cebf657451bbd9e039196615785e82c917f274e3fb2205', '033589bbcb233ffc416cefd5437c7f37e9d7cb7942d405e39e72c4c846d9b37f18', '029293110441c6e2eacb57e1255bf6ef05c41a6a676fe474922d33c19f98a7d584'];
nodesList = [
'032c7c7819276c4f706a04df1a0f1e10a5495994a7be4c1d3d28ca766e5a2b957b',
'025a7e38c2834dd843591a4d23d5f09cdeb77ddca85f673c2d944a14220ff14cf7',
'0395e2731a1673ef21d7a16a727c4fc4d4c35a861c428ce2c819c53d2b81c8bd55',
'032ab2028c0b614c6d87824e2373529652fd7e4221b4c70cc4da7c7005c49afcf0',
'029001b22fe70b48bee12d014df91982eb85ff1bd404ec772d5c83c4ee3e88d2c3',
'0212e2848d79f928411da5f2ff0a8c95ec6ccb5a09d2031b6f71e91309dcde63af',
'03e871a2229523d34f76e6311ff197cfe7f26c2fbec13554b93a46f4e710c47dab',
'032202ec98d976b0e928bd1d91924e8bd3eab07231fc39feb3737b010071073df8',
'02fa7c5a948d03d563a9f36940c2205a814e594d17c0042ced242c71a857d72605',
'039c14fdec2d958e3d14cebf657451bbd9e039196615785e82c917f274e3fb2205',
'033589bbcb233ffc416cefd5437c7f37e9d7cb7942d405e39e72c4c846d9b37f18',
'029293110441c6e2eacb57e1255bf6ef05c41a6a676fe474922d33c19f98a7d584',
'0235ad0b56ed8c42c4354444c24e971c05e769ec0b5fb0ccea42880095dc02ea2c',
'029700819a37afea630f80e6cc461f3fd3c4ace2598a21cfbbe64d1c78d0ee69a5',
'02c2d8b2dbf87c7894af2f1d321290e2fe6db5446cd35323987cee98f06e2e0075',
'030b0ca1ea7b1075716d2a555630e6fd47ef11bc7391fe68963ec06cf370a5e382',
'031adb9eb2d66693f85fa31a4adca0319ba68219f3ad5f9a2ef9b34a6b40755fa1',
'02ccd07faa47eda810ecf5591ccf5ca50f6c1034d0d175052898d32a00b9bae24f',
];
break;
case 'signet':
nodesList = ['03ddab321b760433cbf561b615ef62ac7d318630c5f51d523aaf5395b90b751956', '033d92c7bfd213ef1b34c90e985fb5dc77f9ec2409d391492484e57a44c4aca1de', '02ad010dda54253c1eb9efe38b0760657a3b43ecad62198c359c051c9d99d45781', '025196512905b8a3f1597428b867bec63ec9a95e5089eb7dc7e63e2d2691669029', '027c625aa1fbe3768db68ebcb05b53b6dc0ce68b7b54b8900d326d167363e684fe', '03f1629af3101fcc56b7aac2667016be84e3defbf3d0c8719f836c9b41c9a57a43', '02dfb81e2f7a3c4c9e8a51b70ef82b4a24549cc2fab1f5b2fd636501774a918991', '02d01ccf832944c68f10d39006093769c5b8bda886d561b128534e313d729fdb34', '02499ed23027d4698a6904ff4ec1b6085a61f10b9a6937f90438f9947e38e8ea86', '038310e3a786340f2bd7770704c7ccfe560fd163d9a1c99d67894597419d12cbf7', '03e5e9d879b72c7d67ecd483bae023bd33e695bb32b981a4021260f7b9d62bc761', '028d16e1a0ace4c0c0a421536d8d32ce484dfe6e2f726b7b0e7c30f12a195f8cc7'];
nodesList = [
'03ddab321b760433cbf561b615ef62ac7d318630c5f51d523aaf5395b90b751956',
'033d92c7bfd213ef1b34c90e985fb5dc77f9ec2409d391492484e57a44c4aca1de',
'02ad010dda54253c1eb9efe38b0760657a3b43ecad62198c359c051c9d99d45781',
'025196512905b8a3f1597428b867bec63ec9a95e5089eb7dc7e63e2d2691669029',
'027c625aa1fbe3768db68ebcb05b53b6dc0ce68b7b54b8900d326d167363e684fe',
'03f1629af3101fcc56b7aac2667016be84e3defbf3d0c8719f836c9b41c9a57a43',
'02dfb81e2f7a3c4c9e8a51b70ef82b4a24549cc2fab1f5b2fd636501774a918991',
'02d01ccf832944c68f10d39006093769c5b8bda886d561b128534e313d729fdb34',
'02499ed23027d4698a6904ff4ec1b6085a61f10b9a6937f90438f9947e38e8ea86',
'038310e3a786340f2bd7770704c7ccfe560fd163d9a1c99d67894597419d12cbf7',
'03e5e9d879b72c7d67ecd483bae023bd33e695bb32b981a4021260f7b9d62bc761',
'028d16e1a0ace4c0c0a421536d8d32ce484dfe6e2f726b7b0e7c30f12a195f8cc7',
'02ff690d06c187ab994bf83c5a2114fe5bf50112c2c817af0f788f736be9fa2070',
'02a9f570c51a2526a5ee85802e88f9281bed771eb66a0c8a7d898430dd5d0eae45',
'038c3de773255d3bd7a50e31e58d423baac5c90826a74d75e64b74c95475de1097',
'0242c7f7d315095f37ad1421ae0a2fc967d4cbe65b61b079c5395a769436959853',
'02a909e70eb03742f12666ebb1f56ac42a5fbaab0c0e8b5b1df4aa9f10f8a09240',
'03a26efa12489803c07f3ac2f1dba63812e38f0f6e866ce3ebb34df7de1f458cd2',
];
break;
default:
nodesList = ['03fbc17549ec667bccf397ababbcb4cdc0e3394345e4773079ab2774612ec9be61', '03da9a8623241ccf95f19cd645c6cecd4019ac91570e976eb0a128bebbc4d8a437', '03ca5340cf85cb2e7cf076e489f785410838de174e40be62723e8a60972ad75144', '0238bd27f02d67d6c51e269692bc8c9a32357a00e7777cba7f4f1f18a2a700b108', '03f983dcabed6baa1eab5b56c8b2e8fdc846ab3fd931155377897335e85a9fa57c', '03e399589533581e48796e29a825839a010036a61b20744fda929d6709fcbffcc5', '021f5288b5f72c42cd0d8801086af7ce09a816d8ee9a4c47a4b436399b26cb601a', '032b01b7585f781420cd4148841a82831ba37fa952342052cec16750852d4f2dd9', '02848036488d4b8fb1f1c4064261ec36151f43b085f0b51bd239ade3ddfc940c34', '02b6b1640fe029e304c216951af9fbefdb23b0bdc9baaf327540d31b6107841fdf', '03694289827203a5b3156d753071ddd5bf92e371f5a462943f9555eef6d2d6606c', '0283d850db7c3e8ea7cc9c4abc7afaab12bbdf72b677dcba1d608350d2537d7d43'];
nodesList = [
'03fbc17549ec667bccf397ababbcb4cdc0e3394345e4773079ab2774612ec9be61',
'03da9a8623241ccf95f19cd645c6cecd4019ac91570e976eb0a128bebbc4d8a437',
'03ca5340cf85cb2e7cf076e489f785410838de174e40be62723e8a60972ad75144',
'0238bd27f02d67d6c51e269692bc8c9a32357a00e7777cba7f4f1f18a2a700b108',
'03f983dcabed6baa1eab5b56c8b2e8fdc846ab3fd931155377897335e85a9fa57c',
'03e399589533581e48796e29a825839a010036a61b20744fda929d6709fcbffcc5',
'021f5288b5f72c42cd0d8801086af7ce09a816d8ee9a4c47a4b436399b26cb601a',
'032b01b7585f781420cd4148841a82831ba37fa952342052cec16750852d4f2dd9',
'02848036488d4b8fb1f1c4064261ec36151f43b085f0b51bd239ade3ddfc940c34',
'02b6b1640fe029e304c216951af9fbefdb23b0bdc9baaf327540d31b6107841fdf',
'03694289827203a5b3156d753071ddd5bf92e371f5a462943f9555eef6d2d6606c',
'0283d850db7c3e8ea7cc9c4abc7afaab12bbdf72b677dcba1d608350d2537d7d43',
'02521287789f851268a39c9eccc9d6180d2c614315b583c9e6ae0addbd6d79df06',
'0258c2a7b7f8af2585b4411b1ec945f70988f30412bb1df179de941f14d0b1bc3e',
'03c3389ff1a896f84d921ed01a19fc99c6724ce8dc4b960cd3b7b2362b62cd60d7',
'038d118996b3eaa15dcd317b32a539c9ecfdd7698f204acf8a087336af655a9192',
'02a928903d93d78877dacc3642b696128a3636e9566dd42d2d132325b2c8891c09',
'0328cd17f3a9d3d90b532ade0d1a67e05eb8a51835b3dce0a2e38eac04b5a62a57',
];
}
for (let pubKey of nodesList) {

View File

@@ -141,13 +141,13 @@ export default class CLightningClient extends EventEmitter implements AbstractLi
// main data directory provided, default to using the bitcoin mainnet subdirectory
// to be removed in v0.2.0
else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) {
logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`)
logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`)
logger.warn(`${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`, logger.tags.ln)
logger.warn(`specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`, logger.tags.ln)
rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc')
}
}
logger.debug(`[CLightningClient] Connecting to ${rpcPath}`);
logger.debug(`Connecting to ${rpcPath}`, logger.tags.ln);
super();
this.rpcPath = rpcPath;
@@ -172,19 +172,19 @@ export default class CLightningClient extends EventEmitter implements AbstractLi
this.clientConnectionPromise = new Promise<void>(resolve => {
_self.client.on('connect', () => {
logger.info(`[CLightningClient] Lightning client connected`);
logger.info(`CLightning client connected`, logger.tags.ln);
_self.reconnectWait = 1;
resolve();
});
_self.client.on('end', () => {
logger.err('[CLightningClient] Lightning client connection closed, reconnecting');
logger.err(`CLightning client connection closed, reconnecting`, logger.tags.ln);
_self.increaseWaitTime();
_self.reconnect();
});
_self.client.on('error', error => {
logger.err(`[CLightningClient] Lightning client connection error: ${error}`);
logger.err(`CLightning client connection error: ${error}`, logger.tags.ln);
_self.increaseWaitTime();
_self.reconnect();
});
@@ -196,7 +196,6 @@ export default class CLightningClient extends EventEmitter implements AbstractLi
return;
}
const data = JSON.parse(line);
// logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`);
_self.emit('res:' + data.id, data);
});
}
@@ -217,7 +216,7 @@ export default class CLightningClient extends EventEmitter implements AbstractLi
}
this.reconnectTimeout = setTimeout(() => {
logger.debug('[CLightningClient] Trying to reconnect...');
logger.debug(`Trying to reconnect...`, logger.tags.ln);
_self.client.connect(_self.rpcPath);
_self.reconnectTimeout = null;
@@ -235,7 +234,6 @@ export default class CLightningClient extends EventEmitter implements AbstractLi
id: '' + callInt
};
// logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`);
// Wait for the client to connect
return this.clientConnectionPromise

View File

@@ -2,6 +2,7 @@ import { ILightningApi } from '../lightning-api.interface';
import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher';
import logger from '../../../logger';
import { Common } from '../../common';
import config from '../../../config';
/**
* Convert a clightning "listnode" entry to a lnd node entry
@@ -40,7 +41,7 @@ export function convertNode(clNode: any): ILightningApi.Node {
* Convert clightning "listchannels" response to lnd "describegraph.edges" format
*/
export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> {
logger.info('Converting clightning nodes and channels to lnd graph format');
logger.debug(`Converting clightning nodes and channels to lnd graph format`, logger.tags.ln);
let loggerTimer = new Date().getTime() / 1000;
let channelProcessed = 0;
@@ -63,8 +64,8 @@ export async function convertAndmergeBidirectionalChannels(clChannels: any[]): P
}
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Building complete channels from clightning output. Channels processed: ${channelProcessed + 1} of ${clChannels.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.info(`Building complete channels from clightning output. Channels processed: ${channelProcessed + 1} of ${clChannels.length}`, logger.tags.ln);
loggerTimer = new Date().getTime() / 1000;
}
@@ -80,7 +81,7 @@ export async function convertAndmergeBidirectionalChannels(clChannels: any[]): P
}
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
if (elapsedSeconds > 10) {
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.info(`Building partial channels from clightning output. Channels processed: ${channelProcessed + 1} of ${keys.length}`);
loggerTimer = new Date().getTime() / 1000;
}

View File

@@ -1,17 +1,14 @@
import logger from '../logger';
import { MempoolBlock, TransactionExtended, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta } from '../mempool.interfaces';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces';
import { Common } from './common';
import config from '../config';
import { StaticPool } from 'node-worker-threads-pool';
import { Worker } from 'worker_threads';
import path from 'path';
class MempoolBlocks {
private mempoolBlocks: MempoolBlockWithTransactions[] = [];
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
private makeTemplatesPool = new StaticPool({
size: 1,
task: path.resolve(__dirname, './tx-selection-worker.js'),
});
private txSelectionWorker: Worker | null = null;
constructor() {}
@@ -146,27 +143,159 @@ class MempoolBlocks {
return mempoolBlockDeltas;
}
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit: number | null = null, condenseRest = false): Promise<void> {
const { mempool, blocks } = await this.makeTemplatesPool.exec({ mempool: newMempool, blockLimit, weightLimit, condenseRest });
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, blocks);
// copy CPFP info across to main thread's mempool
Object.keys(newMempool).forEach((txid) => {
if (newMempool[txid] && mempool[txid]) {
newMempool[txid].effectiveFeePerVsize = mempool[txid].effectiveFeePerVsize;
newMempool[txid].ancestors = mempool[txid].ancestors;
newMempool[txid].descendants = mempool[txid].descendants;
newMempool[txid].bestDescendant = mempool[txid].bestDescendant;
newMempool[txid].cpfpChecked = mempool[txid].cpfpChecked;
}
public async makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }): Promise<void> {
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
Object.values(newMempool).forEach(entry => {
strippedMempool[entry.txid] = {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
this.mempoolBlocks = blocks;
// (re)initialize tx selection worker thread
if (!this.txSelectionWorker) {
this.txSelectionWorker = new Worker(path.resolve(__dirname, './tx-selection-worker.js'));
// if the thread throws an unexpected error, or exits for any other reason,
// reset worker state so that it will be re-initialized on the next run
this.txSelectionWorker.once('error', () => {
this.txSelectionWorker = null;
});
this.txSelectionWorker.once('exit', () => {
this.txSelectionWorker = null;
});
}
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
try {
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters);
// clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener);
} catch (e) {
logger.err('makeBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
}
}
public async updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[]): Promise<void> {
if (!this.txSelectionWorker) {
// need to reset the worker
return this.makeBlockTemplates(newMempool);
}
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const addedStripped: ThreadTransaction[] = added.map(entry => {
return {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
});
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
try {
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
const { blocks, clusters } = await workerResultPromise;
this.processBlockTemplates(newMempool, blocks, clusters);
// clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener);
} catch (e) {
logger.err('updateBlockTemplates failed. ' + (e instanceof Error ? e.message : e));
}
}
private processBlockTemplates(mempool, blocks, clusters): void {
// update this thread's mempool with the results
blocks.forEach(block => {
block.forEach(tx => {
if (tx.txid in mempool) {
if (tx.effectiveFeePerVsize != null) {
mempool[tx.txid].effectiveFeePerVsize = tx.effectiveFeePerVsize;
}
if (tx.cpfpRoot && tx.cpfpRoot in clusters) {
const ancestors: Ancestor[] = [];
const descendants: Ancestor[] = [];
const cluster = clusters[tx.cpfpRoot];
let matched = false;
cluster.forEach(txid => {
if (txid === tx.txid) {
matched = true;
} else {
const relative = {
txid: txid,
fee: mempool[txid].fee,
weight: mempool[txid].weight,
};
if (matched) {
descendants.push(relative);
} else {
ancestors.push(relative);
}
}
});
mempool[tx.txid].ancestors = ancestors;
mempool[tx.txid].descendants = descendants;
mempool[tx.txid].bestDescendant = null;
}
mempool[tx.txid].cpfpChecked = tx.cpfpChecked;
}
});
});
// unpack the condensed blocks into proper mempool blocks
const mempoolBlocks = blocks.map((transactions, blockIndex) => {
return this.dataToMempoolBlocks(transactions.map(tx => {
return mempool[tx.txid] || null;
}).filter(tx => !!tx), undefined, undefined, blockIndex);
});
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks);
this.mempoolBlocks = mempoolBlocks;
this.mempoolBlockDeltas = deltas;
}
private dataToMempoolBlocks(transactions: TransactionExtended[],
blockSize: number, blockWeight: number, blocksIndex: number): MempoolBlockWithTransactions {
blockSize: number | undefined, blockWeight: number | undefined, blocksIndex: number): MempoolBlockWithTransactions {
let totalSize = blockSize || 0;
let totalWeight = blockWeight || 0;
if (blockSize === undefined && blockWeight === undefined) {
totalSize = 0;
totalWeight = 0;
transactions.forEach(tx => {
totalSize += tx.size;
totalWeight += tx.weight;
});
}
let rangeLength = 4;
if (blocksIndex === 0) {
rangeLength = 8;
@@ -177,8 +306,8 @@ class MempoolBlocks {
rangeLength = 8;
}
return {
blockSize: blockSize,
blockVSize: blockWeight / 4,
blockSize: totalSize,
blockVSize: totalWeight / 4,
nTx: transactions.length,
totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0),
medianFee: Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE),

View File

@@ -21,7 +21,7 @@ class Mempool {
private mempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => void) | undefined;
private asyncMempoolChangedCallback: ((newMempool: {[txId: string]: TransactionExtended; }, newTransactions: TransactionExtended[],
deletedTransactions: TransactionExtended[]) => void) | undefined;
deletedTransactions: TransactionExtended[]) => Promise<void>) | undefined;
private txPerSecondArray: number[] = [];
private txPerSecond: number = 0;
@@ -210,7 +210,7 @@ class Mempool {
for (const rbfTransaction in rbfTransactions) {
if (this.mempoolCache[rbfTransaction]) {
// Store replaced transactions
rbfCache.add(rbfTransaction, rbfTransactions[rbfTransaction].txid);
rbfCache.add(this.mempoolCache[rbfTransaction], rbfTransactions[rbfTransaction].txid);
// Erase the replaced transactions from the local mempool
delete this.mempoolCache[rbfTransaction];
}
@@ -236,6 +236,7 @@ class Mempool {
const lazyDeleteAt = this.mempoolCache[tx].deleteAfter;
if (lazyDeleteAt && lazyDeleteAt < now) {
delete this.mempoolCache[tx];
rbfCache.evict(tx);
}
}
}

View File

@@ -265,9 +265,9 @@ class Mining {
}
await HashratesRepository.$setLatestRun('last_weekly_hashrates_indexing', new Date().getUTCDate());
if (newlyIndexed > 0) {
logger.notice(`Weekly mining pools hashrates indexing completed: indexed ${newlyIndexed}`);
logger.notice(`Weekly mining pools hashrates indexing completed: indexed ${newlyIndexed}`, logger.tags.mining);
} else {
logger.debug(`Weekly mining pools hashrates indexing completed: indexed ${newlyIndexed}`);
logger.debug(`Weekly mining pools hashrates indexing completed: indexed ${newlyIndexed}`, logger.tags.mining);
}
loadingIndicators.setProgress('weekly-hashrate-indexing', 100);
} catch (e) {
@@ -370,14 +370,14 @@ class Mining {
await HashratesRepository.$setLatestRun('last_hashrates_indexing', new Date().getUTCDate());
if (newlyIndexed > 0) {
logger.notice(`Daily network hashrate indexing completed: indexed ${newlyIndexed} days`);
logger.notice(`Daily network hashrate indexing completed: indexed ${newlyIndexed} days`, logger.tags.mining);
} else {
logger.debug(`Daily network hashrate indexing completed: indexed ${newlyIndexed} days`);
logger.debug(`Daily network hashrate indexing completed: indexed ${newlyIndexed} days`, logger.tags.mining);
}
loadingIndicators.setProgress('daily-hashrate-indexing', 100);
} catch (e) {
loadingIndicators.setProgress('daily-hashrate-indexing', 100);
logger.err(`Daily network hashrate indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(`Daily network hashrate indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
throw e;
}
}
@@ -449,9 +449,9 @@ class Mining {
}
if (totalIndexed > 0) {
logger.notice(`Indexed ${totalIndexed} difficulty adjustments`);
logger.notice(`Indexed ${totalIndexed} difficulty adjustments`, logger.tags.mining);
} else {
logger.debug(`Indexed ${totalIndexed} difficulty adjustments`);
logger.debug(`Indexed ${totalIndexed} difficulty adjustments`, logger.tags.mining);
}
}

View File

@@ -61,7 +61,7 @@ class PoolsParser {
poolNames.push(poolsDuplicated[i].name);
}
}
logger.debug(`Found ${poolNames.length} unique mining pools`);
logger.debug(`Found ${poolNames.length} unique mining pools`, logger.tags.mining);
// Get existing pools from the db
let existingPools;
@@ -72,7 +72,7 @@ class PoolsParser {
existingPools = [];
}
} catch (e) {
logger.err('Cannot get existing pools from the database, skipping pools.json import');
logger.err('Cannot get existing pools from the database, skipping pools.json import', logger.tags.mining);
return;
}
@@ -99,7 +99,7 @@ class PoolsParser {
slug = poolsJson['slugs'][poolNames[i]];
} catch (e) {
if (this.slugWarnFlag === false) {
logger.warn(`pools.json does not seem to contain the 'slugs' object`);
logger.warn(`pools.json does not seem to contain the 'slugs' object`, logger.tags.mining);
this.slugWarnFlag = true;
}
}
@@ -107,7 +107,7 @@ class PoolsParser {
if (slug === undefined) {
// Only keep alphanumerical
slug = poolNames[i].replace(/[^a-z0-9]/gi, '').toLowerCase();
logger.warn(`No slug found for '${poolNames[i]}', generating it => '${slug}'`);
logger.warn(`No slug found for '${poolNames[i]}', generating it => '${slug}'`, logger.tags.mining);
}
const poolObj = {
@@ -143,9 +143,9 @@ class PoolsParser {
'addresses': allAddresses,
'slug': slug
});
logger.debug(`Rename '${poolToRename[0].name}' mining pool to ${poolObj.name}`);
logger.debug(`Rename '${poolToRename[0].name}' mining pool to ${poolObj.name}`, logger.tags.mining);
} else {
logger.debug(`Add '${finalPoolName}' mining pool`);
logger.debug(`Add '${finalPoolName}' mining pool`, logger.tags.mining);
finalPoolDataAdd.push(poolObj);
}
}
@@ -160,14 +160,14 @@ class PoolsParser {
}
if (config.DATABASE.ENABLED === false) { // Don't run db operations
logger.info('Mining pools.json import completed (no database)');
logger.info('Mining pools.json import completed (no database)', logger.tags.mining);
return;
}
if (finalPoolDataAdd.length > 0 || finalPoolDataUpdate.length > 0 ||
finalPoolDataRename.length > 0
) {
logger.debug(`Update pools table now`);
logger.debug(`Update pools table now`, logger.tags.mining);
// Add new mining pools into the database
let queryAdd: string = 'INSERT INTO pools(name, link, regexes, addresses, slug) VALUES ';
@@ -217,9 +217,9 @@ class PoolsParser {
await DB.query({ sql: query, timeout: 120000 });
}
await this.insertUnknownPool();
logger.info('Mining pools.json import completed');
logger.info('Mining pools.json import completed', logger.tags.mining);
} catch (e) {
logger.err(`Cannot import pools in the database`);
logger.err(`Cannot import pools in the database`, logger.tags.mining);
throw e;
}
}
@@ -227,7 +227,7 @@ class PoolsParser {
try {
await this.insertUnknownPool();
} catch (e) {
logger.err(`Cannot insert unknown pool in the database`);
logger.err(`Cannot insert unknown pool in the database`, logger.tags.mining);
throw e;
}
}
@@ -252,7 +252,7 @@ class PoolsParser {
`);
}
} catch (e) {
logger.err('Unable to insert "Unknown" mining pool');
logger.err('Unable to insert "Unknown" mining pool', logger.tags.mining);
}
}
@@ -272,17 +272,17 @@ class PoolsParser {
for (const updatedPool of finalPoolDataUpdate) {
const [pool]: any[] = await DB.query(`SELECT id, name from pools where slug = "${updatedPool.slug}"`);
if (pool.length > 0) {
logger.notice(`Deleting blocks from ${pool[0].name} mining pool for future re-indexing`);
logger.notice(`Deleting blocks from ${pool[0].name} mining pool for future re-indexing`, logger.tags.mining);
await DB.query(`DELETE FROM blocks WHERE pool_id = ${pool[0].id}`);
}
}
// Ignore early days of Bitcoin as there were not mining pool yet
logger.notice('Deleting blocks with unknown mining pool from height 130635 for future re-indexing');
logger.notice(`Deleting blocks with unknown mining pool from height 130635 for future re-indexing`, logger.tags.mining);
const [unknownPool] = await DB.query(`SELECT id from pools where slug = "unknown"`);
await DB.query(`DELETE FROM blocks WHERE pool_id = ${unknownPool[0].id} AND height > 130635`);
logger.notice('Truncating hashrates for future re-indexing');
logger.notice(`Truncating hashrates for future re-indexing`, logger.tags.mining);
await DB.query(`DELETE FROM hashrates`);
}
}

View File

@@ -1,31 +1,62 @@
export interface CachedRbf {
txid: string;
expires: Date;
}
import { TransactionExtended } from "../mempool.interfaces";
class RbfCache {
private cache: { [txid: string]: CachedRbf; } = {};
private replacedBy: { [txid: string]: string; } = {};
private replaces: { [txid: string]: string[] } = {};
private txs: { [txid: string]: TransactionExtended } = {};
private expiring: { [txid: string]: Date } = {};
constructor() {
setInterval(this.cleanup.bind(this), 1000 * 60 * 60);
}
public add(replacedTxId: string, newTxId: string): void {
this.cache[replacedTxId] = {
expires: new Date(Date.now() + 1000 * 604800), // 1 week
txid: newTxId,
};
public add(replacedTx: TransactionExtended, newTxId: string): void {
this.replacedBy[replacedTx.txid] = newTxId;
this.txs[replacedTx.txid] = replacedTx;
if (!this.replaces[newTxId]) {
this.replaces[newTxId] = [];
}
this.replaces[newTxId].push(replacedTx.txid);
}
public get(txId: string): CachedRbf | undefined {
return this.cache[txId];
public getReplacedBy(txId: string): string | undefined {
return this.replacedBy[txId];
}
public getReplaces(txId: string): string[] | undefined {
return this.replaces[txId];
}
public getTx(txId: string): TransactionExtended | undefined {
return this.txs[txId];
}
// flag a transaction as removed from the mempool
public evict(txid): void {
this.expiring[txid] = new Date(Date.now() + 1000 * 86400); // 24 hours
}
private cleanup(): void {
const currentDate = new Date();
for (const c in this.cache) {
if (this.cache[c].expires < currentDate) {
delete this.cache[c];
for (const txid in this.expiring) {
if (this.expiring[txid] < currentDate) {
delete this.expiring[txid];
this.remove(txid);
}
}
}
// remove a transaction & all previous versions from the cache
private remove(txid): void {
// don't remove a transaction while a newer version remains in the mempool
if (this.replaces[txid] && !this.replacedBy[txid]) {
const replaces = this.replaces[txid];
delete this.replaces[txid];
for (const tx of replaces) {
// recursively remove prior versions from the cache
delete this.replacedBy[tx];
delete this.txs[tx];
this.remove(tx);
}
}
}

View File

@@ -1,8 +1,7 @@
import bitcoinApi from './bitcoin/bitcoin-api-factory';
import { TransactionExtended, TransactionMinerInfo } from '../mempool.interfaces';
import { IEsploraApi } from './bitcoin/esplora-api.interface';
import config from '../config';
import { Common } from './common';
import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
class TransactionUtils {
constructor() { }
@@ -21,8 +20,19 @@ class TransactionUtils {
};
}
public async $getTransactionExtended(txId: string, addPrevouts = false, lazyPrevouts = false): Promise<TransactionExtended> {
const transaction: IEsploraApi.Transaction = await bitcoinApi.$getRawTransaction(txId, false, addPrevouts, lazyPrevouts);
/**
* @param txId
* @param addPrevouts
* @param lazyPrevouts
* @param forceCore - See https://github.com/mempool/mempool/issues/2904
*/
public async $getTransactionExtended(txId: string, addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<TransactionExtended> {
let transaction: IEsploraApi.Transaction;
if (forceCore === true) {
transaction = await bitcoinCoreApi.$getRawTransaction(txId, true);
} else {
transaction = await bitcoinApi.$getRawTransaction(txId, false, addPrevouts, lazyPrevouts);
}
return this.extendTransaction(transaction);
}

View File

@@ -1,17 +1,30 @@
import config from '../config';
import logger from '../logger';
import { TransactionExtended, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces';
import { ThreadTransaction, MempoolBlockWithTransactions, AuditTransaction } from '../mempool.interfaces';
import { PairingHeap } from '../utils/pairing-heap';
import { Common } from './common';
import { parentPort } from 'worker_threads';
let mempool: { [txid: string]: ThreadTransaction } = {};
if (parentPort) {
parentPort.on('message', (params: { mempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit: number | null, condenseRest: boolean}) => {
const { mempool, blocks } = makeBlockTemplates(params);
parentPort.on('message', (params) => {
if (params.type === 'set') {
mempool = params.mempool;
} else if (params.type === 'update') {
params.added.forEach(tx => {
mempool[tx.txid] = tx;
});
params.removed.forEach(txid => {
delete mempool[txid];
});
}
const { blocks, clusters } = makeBlockTemplates(mempool);
// return the result to main thread.
if (parentPort) {
parentPort.postMessage({ mempool, blocks });
parentPort.postMessage({ blocks, clusters });
}
});
}
@@ -19,35 +32,24 @@ if (parentPort) {
/*
* Build projected mempool blocks using an approximation of the transaction selection algorithm from Bitcoin Core
* (see BlockAssembler in https://github.com/bitcoin/bitcoin/blob/master/src/node/miner.cpp)
*
* blockLimit: number of blocks to build in total.
* weightLimit: maximum weight of transactions to consider using the selection algorithm.
* if weightLimit is significantly lower than the mempool size, results may start to diverge from getBlockTemplate
* condenseRest: whether to ignore excess transactions or append them to the final block.
*/
function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }: { mempool: { [txid: string]: TransactionExtended }, blockLimit: number, weightLimit?: number | null, condenseRest?: boolean | null })
: { mempool: { [txid: string]: TransactionExtended }, blocks: MempoolBlockWithTransactions[] } {
function makeBlockTemplates(mempool: { [txid: string]: ThreadTransaction })
: { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } } {
const start = Date.now();
const auditPool: { [txid: string]: AuditTransaction } = {};
const mempoolArray: AuditTransaction[] = [];
const restOfArray: TransactionExtended[] = [];
const restOfArray: ThreadTransaction[] = [];
const cpfpClusters: { [root: string]: string[] } = {};
let weight = 0;
const maxWeight = weightLimit ? Math.max(4_000_000 * blockLimit, weightLimit) : Infinity;
// grab the top feerate txs up to maxWeight
Object.values(mempool).sort((a, b) => b.feePerVsize - a.feePerVsize).forEach(tx => {
weight += tx.weight;
if (weight >= maxWeight) {
restOfArray.push(tx);
return;
}
// initializing everything up front helps V8 optimize property access later
auditPool[tx.txid] = {
txid: tx.txid,
fee: tx.fee,
size: tx.size,
weight: tx.weight,
feePerVsize: tx.feePerVsize,
effectiveFeePerVsize: tx.feePerVsize,
vin: tx.vin,
relativesSet: false,
ancestorMap: new Map<string, AuditTransaction>(),
@@ -74,7 +76,7 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }:
// Build blocks by greedily choosing the highest feerate package
// (i.e. the package rooted in the transaction with the best ancestor score)
const blocks: MempoolBlockWithTransactions[] = [];
const blocks: ThreadTransaction[][] = [];
let blockWeight = 4000;
let blockSize = 0;
let transactions: AuditTransaction[] = [];
@@ -82,7 +84,7 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }:
let overflow: AuditTransaction[] = [];
let failures = 0;
let top = 0;
while ((top < mempoolArray.length || !modified.isEmpty()) && (condenseRest || blocks.length < blockLimit)) {
while ((top < mempoolArray.length || !modified.isEmpty())) {
// skip invalid transactions
while (top < mempoolArray.length && (mempoolArray[top].used || mempoolArray[top].modified)) {
top++;
@@ -106,44 +108,36 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }:
if (nextTx && !nextTx?.used) {
// Check if the package fits into this block
if (blockWeight + nextTx.ancestorWeight < config.MEMPOOL.BLOCK_WEIGHT_UNITS) {
blockWeight += nextTx.ancestorWeight;
const ancestors: AuditTransaction[] = Array.from(nextTx.ancestorMap.values());
const descendants: AuditTransaction[] = [];
// sort ancestors by dependency graph (equivalent to sorting by ascending ancestor count)
const sortedTxSet = [...ancestors.sort((a, b) => { return (a.ancestorMap.size || 0) - (b.ancestorMap.size || 0); }), nextTx];
let isCluster = false;
if (sortedTxSet.length > 1) {
cpfpClusters[nextTx.txid] = sortedTxSet.map(tx => tx.txid);
isCluster = true;
}
const effectiveFeeRate = nextTx.ancestorFee / (nextTx.ancestorWeight / 4);
const used: AuditTransaction[] = [];
while (sortedTxSet.length) {
const ancestor = sortedTxSet.pop();
const mempoolTx = mempool[ancestor.txid];
if (ancestor && !ancestor?.used) {
ancestor.used = true;
// update original copy of this tx with effective fee rate & relatives data
mempoolTx.effectiveFeePerVsize = effectiveFeeRate;
mempoolTx.ancestors = sortedTxSet.map((a) => {
return {
txid: a.txid,
fee: a.fee,
weight: a.weight,
};
}).reverse();
mempoolTx.descendants = descendants.map((a) => {
return {
txid: a.txid,
fee: a.fee,
weight: a.weight,
};
});
descendants.push(ancestor);
mempoolTx.cpfpChecked = true;
transactions.push(ancestor);
blockSize += ancestor.size;
ancestor.used = true;
ancestor.usedBy = nextTx.txid;
// update original copy of this tx with effective fee rate & relatives data
mempoolTx.effectiveFeePerVsize = effectiveFeeRate;
if (isCluster) {
mempoolTx.cpfpRoot = nextTx.txid;
}
mempoolTx.cpfpChecked = true;
transactions.push(ancestor);
blockSize += ancestor.size;
blockWeight += ancestor.weight;
used.push(ancestor);
}
// remove these as valid package ancestors for any descendants remaining in the mempool
if (sortedTxSet.length) {
sortedTxSet.forEach(tx => {
if (used.length) {
used.forEach(tx => {
updateDescendants(tx, auditPool, modified);
});
}
@@ -159,10 +153,10 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }:
// this block is full
const exceededPackageTries = failures > 1000 && blockWeight > (config.MEMPOOL.BLOCK_WEIGHT_UNITS - 4000);
const queueEmpty = top >= mempoolArray.length && modified.isEmpty();
if ((exceededPackageTries || queueEmpty) && (!condenseRest || blocks.length < blockLimit - 1)) {
if ((exceededPackageTries || queueEmpty) && blocks.length < 7) {
// construct this block
if (transactions.length) {
blocks.push(dataToMempoolBlocks(transactions.map(t => mempool[t.txid]), blockSize, blockWeight, blocks.length));
blocks.push(transactions.map(t => mempool[t.txid]));
}
// reset for the next block
transactions = [];
@@ -181,55 +175,40 @@ function makeBlockTemplates({ mempool, blockLimit, weightLimit, condenseRest }:
overflow = [];
}
}
if (condenseRest) {
// pack any leftover transactions into the last block
for (const tx of overflow) {
if (!tx || tx?.used) {
continue;
}
blockWeight += tx.weight;
blockSize += tx.size;
const mempoolTx = mempool[tx.txid];
// update original copy of this tx with effective fee rate & relatives data
mempoolTx.effectiveFeePerVsize = tx.score;
mempoolTx.ancestors = (Array.from(tx.ancestorMap?.values()) as AuditTransaction[]).map((a) => {
return {
txid: a.txid,
fee: a.fee,
weight: a.weight,
};
});
mempoolTx.bestDescendant = null;
mempoolTx.cpfpChecked = true;
transactions.push(tx);
tx.used = true;
// pack any leftover transactions into the last block
for (const tx of overflow) {
if (!tx || tx?.used) {
continue;
}
const blockTransactions = transactions.map(t => mempool[t.txid]);
restOfArray.forEach(tx => {
blockWeight += tx.weight;
blockSize += tx.size;
tx.effectiveFeePerVsize = tx.feePerVsize;
tx.cpfpChecked = false;
tx.ancestors = [];
tx.bestDescendant = null;
blockTransactions.push(tx);
});
if (blockTransactions.length) {
blocks.push(dataToMempoolBlocks(blockTransactions, blockSize, blockWeight, blocks.length));
blockWeight += tx.weight;
const mempoolTx = mempool[tx.txid];
// update original copy of this tx with effective fee rate & relatives data
mempoolTx.effectiveFeePerVsize = tx.score;
if (tx.ancestorMap.size > 0) {
cpfpClusters[tx.txid] = Array.from(tx.ancestorMap?.values()).map(a => a.txid);
mempoolTx.cpfpRoot = tx.txid;
}
transactions = [];
} else if (transactions.length) {
blocks.push(dataToMempoolBlocks(transactions.map(t => mempool[t.txid]), blockSize, blockWeight, blocks.length));
mempoolTx.cpfpChecked = true;
transactions.push(tx);
tx.used = true;
}
const blockTransactions = transactions.map(t => mempool[t.txid]);
restOfArray.forEach(tx => {
blockWeight += tx.weight;
tx.effectiveFeePerVsize = tx.feePerVsize;
tx.cpfpChecked = false;
blockTransactions.push(tx);
});
if (blockTransactions.length) {
blocks.push(blockTransactions);
}
transactions = [];
const end = Date.now();
const time = end - start;
logger.debug('Mempool templates calculated in ' + time / 1000 + ' seconds');
return {
mempool,
blocks
};
return { blocks, clusters: cpfpClusters };
}
// traverse in-mempool ancestors
@@ -239,9 +218,9 @@ function setRelatives(
mempool: { [txid: string]: AuditTransaction },
): void {
for (const parent of tx.vin) {
const parentTx = mempool[parent.txid];
if (parentTx && !tx.ancestorMap?.has(parent.txid)) {
tx.ancestorMap.set(parent.txid, parentTx);
const parentTx = mempool[parent];
if (parentTx && !tx.ancestorMap?.has(parent)) {
tx.ancestorMap.set(parent, parentTx);
parentTx.children.add(tx);
// visit each node only once
if (!parentTx.relativesSet) {
@@ -312,27 +291,4 @@ function updateDescendants(
});
}
}
}
function dataToMempoolBlocks(transactions: TransactionExtended[],
blockSize: number, blockWeight: number, blocksIndex: number): MempoolBlockWithTransactions {
let rangeLength = 4;
if (blocksIndex === 0) {
rangeLength = 8;
}
if (transactions.length > 4000) {
rangeLength = 6;
} else if (transactions.length > 10000) {
rangeLength = 8;
}
return {
blockSize: blockSize,
blockVSize: blockWeight / 4,
nTx: transactions.length,
totalFees: transactions.reduce((acc, cur) => acc + cur.fee, 0),
medianFee: Common.percentile(transactions.map((tx) => tx.effectiveFeePerVsize), config.MEMPOOL.RECOMMENDED_FEE_PERCENTILE),
feeRange: Common.getFeesInRange(transactions, rangeLength),
transactionIds: transactions.map((tx) => tx.txid),
transactions: transactions.map((tx) => Common.stripTransaction(tx)),
};
}

View File

@@ -58,10 +58,10 @@ class WebsocketHandler {
client['track-tx'] = parsedMessage['track-tx'];
// Client is telling the transaction wasn't found
if (parsedMessage['watch-mempool']) {
const rbfCacheTx = rbfCache.get(client['track-tx']);
if (rbfCacheTx) {
const rbfCacheTxid = rbfCache.getReplacedBy(client['track-tx']);
if (rbfCacheTxid) {
response['txReplaced'] = {
txid: rbfCacheTx.txid,
txid: rbfCacheTxid,
};
client['track-tx'] = null;
} else {
@@ -251,7 +251,7 @@ class WebsocketHandler {
}
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.makeBlockTemplates(newMempool, 8, null, true);
await mempoolBlocks.updateBlockTemplates(newMempool, newTransactions, deletedTransactions.map(tx => tx.txid));
} else {
mempoolBlocks.updateMempoolBlocks(newMempool);
}
@@ -419,7 +419,7 @@ class WebsocketHandler {
const _memPool = memPool.getMempool();
if (config.MEMPOOL.ADVANCED_GBT_AUDIT) {
await mempoolBlocks.makeBlockTemplates(_memPool, 2);
await mempoolBlocks.makeBlockTemplates(_memPool);
} else {
mempoolBlocks.updateMempoolBlocks(_memPool);
}
@@ -439,7 +439,7 @@ class WebsocketHandler {
};
}) : [];
BlocksSummariesRepository.$saveSummary({
BlocksSummariesRepository.$saveTemplate({
height: block.height,
template: {
id: block.id,
@@ -462,13 +462,16 @@ class WebsocketHandler {
}
}
const removed: string[] = [];
// Update mempool to remove transactions included in the new block
for (const txId of txIds) {
delete _memPool[txId];
removed.push(txId);
rbfCache.evict(txId);
}
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
await mempoolBlocks.makeBlockTemplates(_memPool, 2);
await mempoolBlocks.updateBlockTemplates(_memPool, [], removed);
} else {
mempoolBlocks.updateMempoolBlocks(_memPool);
}

View File

@@ -31,7 +31,7 @@ interface IConfig {
POOLS_JSON_TREE_URL: string,
ADVANCED_GBT_AUDIT: boolean;
ADVANCED_GBT_MEMPOOL: boolean;
TRANSACTION_INDEXING: boolean;
CPFP_INDEXING: boolean;
};
ESPLORA: {
REST_API_URL: string;
@@ -44,6 +44,7 @@ interface IConfig {
GRAPH_REFRESH_INTERVAL: number;
LOGGER_UPDATE_INTERVAL: number;
FORENSICS_INTERVAL: number;
FORENSICS_RATE_LIMIT: number;
};
LND: {
TLS_CERT_PATH: string;
@@ -151,7 +152,7 @@ const defaults: IConfig = {
'POOLS_JSON_TREE_URL': 'https://api.github.com/repos/mempool/mining-pools/git/trees/master',
'ADVANCED_GBT_AUDIT': false,
'ADVANCED_GBT_MEMPOOL': false,
'TRANSACTION_INDEXING': false,
'CPFP_INDEXING': false,
},
'ESPLORA': {
'REST_API_URL': 'http://127.0.0.1:3000',
@@ -205,6 +206,7 @@ const defaults: IConfig = {
'GRAPH_REFRESH_INTERVAL': 600,
'LOGGER_UPDATE_INTERVAL': 30,
'FORENSICS_INTERVAL': 43200,
'FORENSICS_RATE_LIMIT': 20,
},
'LND': {
'TLS_CERT_PATH': '',

View File

@@ -32,22 +32,27 @@ class Logger {
local7: 23
};
public tags = {
mining: 'Mining',
ln: 'Lightning',
};
// @ts-ignore
public emerg: ((msg: string) => void);
public emerg: ((msg: string, tag?: string) => void);
// @ts-ignore
public alert: ((msg: string) => void);
public alert: ((msg: string, tag?: string) => void);
// @ts-ignore
public crit: ((msg: string) => void);
public crit: ((msg: string, tag?: string) => void);
// @ts-ignore
public err: ((msg: string) => void);
public err: ((msg: string, tag?: string) => void);
// @ts-ignore
public warn: ((msg: string) => void);
public warn: ((msg: string, tag?: string) => void);
// @ts-ignore
public notice: ((msg: string) => void);
public notice: ((msg: string, tag?: string) => void);
// @ts-ignore
public info: ((msg: string) => void);
public info: ((msg: string, tag?: string) => void);
// @ts-ignore
public debug: ((msg: string) => void);
public debug: ((msg: string, tag?: string) => void);
private name = 'mempool';
private client: dgram.Socket;
@@ -66,8 +71,8 @@ class Logger {
private addprio(prio): void {
this[prio] = (function(_this) {
return function(msg) {
return _this.msg(prio, msg);
return function(msg, tag?: string) {
return _this.msg(prio, msg, tag);
};
})(this);
}
@@ -85,7 +90,7 @@ class Logger {
return '';
}
private msg(priority, msg) {
private msg(priority, msg, tag?: string) {
let consolemsg, prionum, syslogmsg;
if (typeof msg === 'string' && msg.length > 0) {
while (msg[msg.length - 1].charCodeAt(0) === 10) {
@@ -94,10 +99,10 @@ class Logger {
}
const network = this.network ? ' <' + this.network + '>' : '';
prionum = Logger.priorities[priority] || Logger.priorities.info;
consolemsg = `${this.ts()} [${process.pid}] ${priority.toUpperCase()}:${network} ${msg}`;
consolemsg = `${this.ts()} [${process.pid}] ${priority.toUpperCase()}:${network} ${tag ? '[' + tag + '] ' : ''}${msg}`;
if (config.SYSLOG.ENABLED && Logger.priorities[priority] <= Logger.priorities[config.SYSLOG.MIN_PRIORITY]) {
syslogmsg = `<${(Logger.facilities[config.SYSLOG.FACILITY] * 8 + prionum)}> ${this.name}[${process.pid}]: ${priority.toUpperCase()}${network} ${msg}`;
syslogmsg = `<${(Logger.facilities[config.SYSLOG.FACILITY] * 8 + prionum)}> ${this.name}[${process.pid}]: ${priority.toUpperCase()}${network} ${tag ? '[' + tag + '] ' : ''}${msg}`;
this.syslog(syslogmsg);
}
if (Logger.priorities[priority] > Logger.priorities[config.MEMPOOL.STDOUT_LOG_MIN_PRIORITY]) {

View File

@@ -81,10 +81,10 @@ export interface TransactionExtended extends IEsploraApi.Transaction {
export interface AuditTransaction {
txid: string;
fee: number;
size: number;
weight: number;
feePerVsize: number;
vin: IEsploraApi.Vin[];
effectiveFeePerVsize: number;
vin: string[];
relativesSet: boolean;
ancestorMap: Map<string, AuditTransaction>;
children: Set<AuditTransaction>;
@@ -96,6 +96,17 @@ export interface AuditTransaction {
modifiedNode: HeapNode<AuditTransaction>;
}
export interface ThreadTransaction {
txid: string;
fee: number;
weight: number;
feePerVsize: number;
effectiveFeePerVsize?: number;
vin: string[];
cpfpRoot?: string;
cpfpChecked?: boolean;
}
export interface Ancestor {
txid: string;
weight: number;
@@ -263,6 +274,7 @@ export interface IBackendInfo {
hostname: string;
gitCommit: string;
version: string;
lightning: boolean;
}
export interface IDifficultyAdjustment {
@@ -326,4 +338,4 @@ export interface IOldestNodes {
updatedAt?: number,
city?: any,
country?: any,
}
}

View File

@@ -8,6 +8,8 @@ import HashratesRepository from './HashratesRepository';
import { escape } from 'mysql2';
import BlocksSummariesRepository from './BlocksSummariesRepository';
import DifficultyAdjustmentsRepository from './DifficultyAdjustmentsRepository';
import bitcoinClient from '../api/bitcoin/bitcoin-client';
import config from '../config';
class BlocksRepository {
/**
@@ -667,16 +669,32 @@ class BlocksRepository {
*/
public async $getCPFPUnindexedBlocks(): Promise<any[]> {
try {
const [rows]: any = await DB.query(`SELECT height, hash FROM blocks WHERE cpfp_indexed = 0 ORDER BY height DESC`);
return rows;
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
const currentBlockHeight = blockchainInfo.blocks;
let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, currentBlockHeight);
if (indexingBlockAmount <= -1) {
indexingBlockAmount = currentBlockHeight + 1;
}
const minHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1);
const [rows]: any[] = await DB.query(`
SELECT height
FROM compact_cpfp_clusters
WHERE height <= ? AND height >= ?
ORDER BY height DESC;
`, [currentBlockHeight, minHeight]);
const indexedHeights = {};
rows.forEach((row) => { indexedHeights[row.height] = true; });
const allHeights: number[] = Array.from(Array(currentBlockHeight - minHeight + 1).keys(), n => n + minHeight).reverse();
const unindexedHeights = allHeights.filter(x => !indexedHeights[x]);
return unindexedHeights;
} catch (e) {
logger.err('Cannot fetch CPFP unindexed blocks. Reason: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $setCPFPIndexed(hash: string): Promise<void> {
await DB.query(`UPDATE blocks SET cpfp_indexed = 1 WHERE hash = ?`, [hash]);
return [];
}
/**

View File

@@ -17,19 +17,16 @@ class BlocksSummariesRepository {
return undefined;
}
public async $saveSummary(params: { height: number, mined?: BlockSummary, template?: BlockSummary}) {
const blockId = params.mined?.id ?? params.template?.id;
public async $saveSummary(params: { height: number, mined?: BlockSummary}) {
const blockId = params.mined?.id;
try {
const [dbSummary]: any[] = await DB.query(`SELECT * FROM blocks_summaries WHERE id = "${blockId}"`);
if (dbSummary.length === 0) { // First insertion
await DB.query(`INSERT INTO blocks_summaries VALUE (?, ?, ?, ?)`, [
params.height, blockId, JSON.stringify(params.mined?.transactions ?? []), JSON.stringify(params.template?.transactions ?? [])
]);
} else if (params.mined !== undefined) { // Update mined block summary
await DB.query(`UPDATE blocks_summaries SET transactions = ? WHERE id = "${params.mined.id}"`, [JSON.stringify(params.mined.transactions)]);
} else if (params.template !== undefined) { // Update template block summary
await DB.query(`UPDATE blocks_summaries SET template = ? WHERE id = "${params.template.id}"`, [JSON.stringify(params.template?.transactions)]);
}
const transactions = JSON.stringify(params.mined?.transactions || []);
await DB.query(`
INSERT INTO blocks_summaries (height, id, transactions, template)
VALUE (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
transactions = ?
`, [params.height, blockId, transactions, '[]', transactions]);
} catch (e: any) {
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
logger.debug(`Cannot save block summary for ${blockId} because it has already been indexed, ignoring`);
@@ -40,6 +37,26 @@ class BlocksSummariesRepository {
}
}
public async $saveTemplate(params: { height: number, template: BlockSummary}) {
const blockId = params.template?.id;
try {
const transactions = JSON.stringify(params.template?.transactions || []);
await DB.query(`
INSERT INTO blocks_summaries (height, id, transactions, template)
VALUE (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
template = ?
`, [params.height, blockId, '[]', transactions, transactions]);
} catch (e: any) {
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
logger.debug(`Cannot save block template for ${blockId} because it has already been indexed, ignoring`);
} else {
logger.debug(`Cannot save block template for ${blockId}. Reason: ${e instanceof Error ? e.message : e}`);
throw e;
}
}
}
public async $getIndexedSummariesId(): Promise<string[]> {
try {
const [rows]: any[] = await DB.query(`SELECT id from blocks_summaries`);

View File

@@ -1,34 +1,151 @@
import cluster, { Cluster } from 'cluster';
import { RowDataPacket } from 'mysql2';
import DB from '../database';
import logger from '../logger';
import { Ancestor } from '../mempool.interfaces';
import transactionRepository from '../repositories/TransactionRepository';
class CpfpRepository {
public async $saveCluster(height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
if (!txs[0]) {
return false;
}
// skip clusters of transactions with the same fees
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
const equalFee = txs.reduce((acc, tx) => {
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
}, true);
if (equalFee) {
return false;
}
try {
const txsJson = JSON.stringify(txs);
const packedTxs = Buffer.from(this.pack(txs));
await DB.query(
`
INSERT INTO cpfp_clusters(root, height, txs, fee_rate)
VALUE (?, ?, ?, ?)
INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
VALUE (UNHEX(?), ?, ?, ?)
ON DUPLICATE KEY UPDATE
height = ?,
txs = ?,
fee_rate = ?
`,
[txs[0].txid, height, txsJson, effectiveFeePerVsize, height, txsJson, effectiveFeePerVsize, height]
[clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
);
const maxChunk = 10;
let chunkIndex = 0;
while (chunkIndex < txs.length) {
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
return { txid: tx.txid, cluster: clusterRoot };
});
await transactionRepository.$batchSetCluster(chunk);
chunkIndex += maxChunk;
}
return true;
} catch (e: any) {
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $batchSaveClusters(clusters: { root: string, height: number, txs: any, effectiveFeePerVsize: number}[]): Promise<boolean> {
try {
const clusterValues: any[] = [];
const txs: any[] = [];
for (const cluster of clusters) {
if (cluster.txs?.length > 1) {
const roundedEffectiveFee = Math.round(cluster.effectiveFeePerVsize * 100) / 100;
const equalFee = cluster.txs.reduce((acc, tx) => {
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
}, true);
if (!equalFee) {
clusterValues.push([
cluster.root,
cluster.height,
Buffer.from(this.pack(cluster.txs)),
cluster.effectiveFeePerVsize
]);
for (const tx of cluster.txs) {
txs.push({ txid: tx.txid, cluster: cluster.root });
}
}
}
}
if (!clusterValues.length) {
return false;
}
const maxChunk = 100;
let chunkIndex = 0;
// insert transactions in batches of up to 100 rows
while (chunkIndex < txs.length) {
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
await transactionRepository.$batchSetCluster(chunk);
chunkIndex += maxChunk;
}
chunkIndex = 0;
// insert clusters in batches of up to 100 rows
while (chunkIndex < clusterValues.length) {
const chunk = clusterValues.slice(chunkIndex, chunkIndex + maxChunk);
let query = `
INSERT IGNORE INTO compact_cpfp_clusters(root, height, txs, fee_rate)
VALUES
`;
query += chunk.map(chunk => {
return (' (UNHEX(?), ?, ?, ?)');
}) + ';';
const values = chunk.flat();
await DB.query(
query,
values
);
chunkIndex += maxChunk;
}
return true;
} catch (e: any) {
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $getCluster(clusterRoot: string): Promise<Cluster> {
const [clusterRows]: any = await DB.query(
`
SELECT *
FROM compact_cpfp_clusters
WHERE root = UNHEX(?)
`,
[clusterRoot]
);
const cluster = clusterRows[0];
cluster.txs = this.unpack(cluster.txs);
return cluster;
}
public async $deleteClustersFrom(height: number): Promise<void> {
logger.info(`Delete newer cpfp clusters from height ${height} from the database`);
try {
const [rows] = await DB.query(
`
SELECT txs, height, root from compact_cpfp_clusters
WHERE height >= ?
`,
[height]
) as RowDataPacket[][];
if (rows?.length) {
for (let clusterToDelete of rows) {
const txs = this.unpack(clusterToDelete.txs);
for (let tx of txs) {
await transactionRepository.$removeTransaction(tx.txid);
}
}
}
await DB.query(
`
DELETE from cpfp_clusters
DELETE from compact_cpfp_clusters
WHERE height >= ?
`,
[height]
@@ -38,6 +155,70 @@ class CpfpRepository {
throw e;
}
}
// insert a dummy row to mark that we've indexed as far as this block
public async $insertProgressMarker(height: number): Promise<void> {
try {
const [rows]: any = await DB.query(
`
SELECT root
FROM compact_cpfp_clusters
WHERE height = ?
`,
[height]
);
if (!rows?.length) {
const rootBuffer = Buffer.alloc(32);
rootBuffer.writeInt32LE(height);
await DB.query(
`
INSERT INTO compact_cpfp_clusters(root, height, fee_rate)
VALUE (?, ?, ?)
`,
[rootBuffer, height, 0]
);
}
} catch (e: any) {
logger.err(`Cannot insert cpfp progress marker. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
public pack(txs: Ancestor[]): ArrayBuffer {
const buf = new ArrayBuffer(44 * txs.length);
const view = new DataView(buf);
txs.forEach((tx, i) => {
const offset = i * 44;
for (let x = 0; x < 32; x++) {
// store txid in little-endian
view.setUint8(offset + (31 - x), parseInt(tx.txid.slice(x * 2, (x * 2) + 2), 16));
}
view.setUint32(offset + 32, tx.weight);
view.setBigUint64(offset + 36, BigInt(Math.round(tx.fee)));
});
return buf;
}
public unpack(buf: Buffer): Ancestor[] {
if (!buf) {
return [];
}
const arrayBuffer = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
const txs: Ancestor[] = [];
const view = new DataView(arrayBuffer);
for (let offset = 0; offset < arrayBuffer.byteLength; offset += 44) {
const txid = Array.from(new Uint8Array(arrayBuffer, offset, 32)).reverse().map(b => b.toString(16).padStart(2, '0')).join('');
const weight = view.getUint32(offset + 32);
const fee = Number(view.getBigUint64(offset + 36));
txs.push({
txid,
weight,
fee
});
}
return txs;
}
}
export default new CpfpRepository();

View File

@@ -1,6 +1,7 @@
import DB from '../database';
import logger from '../logger';
import { Ancestor, CpfpInfo } from '../mempool.interfaces';
import cpfpRepository from './CpfpRepository';
interface CpfpSummary {
txid: string;
@@ -12,20 +13,20 @@ interface CpfpSummary {
}
class TransactionRepository {
public async $setCluster(txid: string, cluster: string): Promise<void> {
public async $setCluster(txid: string, clusterRoot: string): Promise<void> {
try {
await DB.query(
`
INSERT INTO transactions
INSERT INTO compact_transactions
(
txid,
cluster
)
VALUE (?, ?)
VALUE (UNHEX(?), UNHEX(?))
ON DUPLICATE KEY UPDATE
cluster = ?
cluster = UNHEX(?)
;`,
[txid, cluster, cluster]
[txid, clusterRoot, clusterRoot]
);
} catch (e: any) {
logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
@@ -33,18 +34,45 @@ class TransactionRepository {
}
}
public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
public async $batchSetCluster(txs): Promise<void> {
try {
let query = `
SELECT *
FROM transactions
LEFT JOIN cpfp_clusters AS cluster ON cluster.root = transactions.cluster
WHERE transactions.txid = ?
INSERT IGNORE INTO compact_transactions
(
txid,
cluster
)
VALUES
`;
const [rows]: any = await DB.query(query, [txid]);
if (rows.length) {
rows[0].txs = JSON.parse(rows[0].txs) as Ancestor[];
return this.convertCpfp(rows[0]);
query += txs.map(tx => {
return (' (UNHEX(?), UNHEX(?))');
}) + ';';
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
await DB.query(
query,
values
);
} catch (e: any) {
logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
try {
const [txRows]: any = await DB.query(
`
SELECT HEX(txid) as id, HEX(cluster) as root
FROM compact_transactions
WHERE txid = UNHEX(?)
`,
[txid]
);
if (txRows.length && txRows[0].root != null) {
const txid = txRows[0].id.toLowerCase();
const clusterId = txRows[0].root.toLowerCase();
const cluster = await cpfpRepository.$getCluster(clusterId);
return this.convertCpfp(txid, cluster);
}
} catch (e) {
logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e));
@@ -52,12 +80,23 @@ class TransactionRepository {
}
}
private convertCpfp(cpfp: CpfpSummary): CpfpInfo {
public async $removeTransaction(txid: string): Promise<void> {
await DB.query(
`
DELETE FROM compact_transactions
WHERE txid = UNHEX(?)
`,
[txid]
);
}
private convertCpfp(txid, cluster): CpfpInfo {
const descendants: Ancestor[] = [];
const ancestors: Ancestor[] = [];
let matched = false;
for (const tx of cpfp.txs) {
if (tx.txid === cpfp.txid) {
for (const tx of cluster.txs) {
if (tx.txid === txid) {
matched = true;
} else if (!matched) {
descendants.push(tx);
@@ -68,7 +107,6 @@ class TransactionRepository {
return {
descendants,
ancestors,
effectiveFeePerVsize: cpfp.fee_rate
};
}
}

View File

@@ -7,7 +7,6 @@ import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface';
import { Common } from '../../api/common';
import { ILightningApi } from '../../api/lightning/lightning-api.interface';
const throttleDelay = 20; //ms
const tempCacheSize = 10000;
class ForensicsService {
@@ -91,7 +90,7 @@ class ForensicsService {
let outspends: IEsploraApi.Outspend[] | undefined;
try {
outspends = await bitcoinApi.$getOutspends(channel.closing_transaction_id);
await Common.sleep$(throttleDelay);
await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
} catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + channel.closing_transaction_id + '/outspends'}. Reason ${e instanceof Error ? e.message : e}`);
continue;
@@ -340,7 +339,7 @@ class ForensicsService {
let outspends: IEsploraApi.Outspend[] | undefined;
try {
outspends = await bitcoinApi.$getOutspends(input.txid);
await Common.sleep$(throttleDelay);
await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
} catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + input.txid + '/outspends'}. Reason ${e instanceof Error ? e.message : e}`);
}
@@ -429,7 +428,7 @@ class ForensicsService {
if (temp) {
this.tempCached.push(txid);
}
await Common.sleep$(throttleDelay);
await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
} catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + txid + '/outspends'}. Reason ${e instanceof Error ? e.message : e}`);
return null;

View File

@@ -23,7 +23,7 @@ class NetworkSyncService {
constructor() {}
public async $startService(): Promise<void> {
logger.info('Starting lightning network sync service');
logger.info(`Starting lightning network sync service`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
@@ -33,11 +33,11 @@ class NetworkSyncService {
private async $runTasks(): Promise<void> {
const taskStartTime = Date.now();
try {
logger.info(`Updating nodes and channels`);
logger.debug(`Updating nodes and channels`, logger.tags.ln);
const networkGraph = await lightningApi.$getNetworkGraph();
if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) {
logger.info(`LN Network graph is empty, retrying in 10 seconds`);
logger.info(`LN Network graph is empty, retrying in 10 seconds`, logger.tags.ln);
setTimeout(() => { this.$runTasks(); }, 10000);
return;
}
@@ -55,7 +55,7 @@ class NetworkSyncService {
}
} catch (e) {
logger.err('$runTasks() error: ' + (e instanceof Error ? e.message : e));
logger.err(`$runTasks() error: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
setTimeout(() => { this.$runTasks(); }, Math.max(1, (1000 * config.LIGHTNING.GRAPH_REFRESH_INTERVAL) - (Date.now() - taskStartTime)));
@@ -79,8 +79,8 @@ class NetworkSyncService {
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Updating node ${progress}/${nodes.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.debug(`Updating node ${progress}/${nodes.length}`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
}
@@ -106,7 +106,7 @@ class NetworkSyncService {
deletedRecords += await NodeRecordsRepository.$deleteUnusedRecords(node.pub_key, customRecordTypes);
}
}
logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted. ${deletedRecords} custom records deleted.`);
logger.debug(`${progress} nodes updated. ${deletedSockets} sockets deleted. ${deletedRecords} custom records deleted.`);
// If a channel if not present in the graph, mark it as inactive
await nodesApi.$setNodesInactive(graphNodesPubkeys);
@@ -138,18 +138,18 @@ class NetworkSyncService {
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Updating channel ${progress}/${channels.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.debug(`Updating channel ${progress}/${channels.length}`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
}
}
logger.info(`${progress} channels updated`);
logger.debug(`${progress} channels updated`, logger.tags.ln);
// If a channel if not present in the graph, mark it as inactive
await channelsApi.$setChannelsInactive(graphChannelsIds);
} catch (e) {
logger.err(`Cannot update channel list. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(` Cannot update channel list. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.ln);
}
}
@@ -184,26 +184,28 @@ class NetworkSyncService {
if (lowest < node.first_seen) {
const query = `UPDATE nodes SET first_seen = FROM_UNIXTIME(?) WHERE public_key = ?`;
const params = [lowest, node.public_key];
++updated;
await DB.query(query, params);
}
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Updating node first seen date ${progress}/${nodes.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.debug(`Updating node first seen date ${progress}/${nodes.length}`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
++updated;
}
}
logger.info(`Updated ${updated} node first seen dates`);
if (updated > 0) {
logger.debug(`Updated ${updated} node first seen dates`, logger.tags.ln);
}
} catch (e) {
logger.err('$updateNodeFirstSeen() error: ' + (e instanceof Error ? e.message : e));
logger.err(`$updateNodeFirstSeen() error: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
}
private async $lookUpCreationDateFromChain(): Promise<void> {
let progress = 0;
logger.info(`Running channel creation date lookup`);
logger.debug(`Running channel creation date lookup`, logger.tags.ln);
try {
const channels = await channelsApi.$getChannelsWithoutCreatedDate();
for (const channel of channels) {
@@ -217,14 +219,17 @@ class NetworkSyncService {
);
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Updating channel creation date ${progress}/${channels.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.debug(`Updating channel creation date ${progress}/${channels.length}`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
}
}
logger.info(`Updated ${channels.length} channels' creation date`);
if (channels.length > 0) {
logger.debug(`Updated ${channels.length} channels' creation date`, logger.tags.ln);
}
} catch (e) {
logger.err('$lookUpCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e));
logger.err(`$lookUpCreationDateFromChain() error: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
}
@@ -233,7 +238,7 @@ class NetworkSyncService {
* mark that channel as inactive
*/
private async $deactivateChannelsWithoutActiveNodes(): Promise<void> {
logger.info(`Find channels which nodes are offline`);
logger.debug(`Find channels which nodes are offline`, logger.tags.ln);
try {
const result = await DB.query<ResultSetHeader>(`
@@ -256,12 +261,10 @@ class NetworkSyncService {
`);
if (result[0].changedRows ?? 0 > 0) {
logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`);
} else {
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`);
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`, logger.tags.ln);
}
} catch (e) {
logger.err('$deactivateChannelsWithoutActiveNodes() error: ' + (e instanceof Error ? e.message : e));
logger.err(`$deactivateChannelsWithoutActiveNodes() error: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
}
@@ -280,13 +283,13 @@ class NetworkSyncService {
} else {
log += ` for the first time`;
}
logger.info(log);
logger.info(`${log}`, logger.tags.ln);
const channels = await channelsApi.$getChannelsByStatus([0, 1]);
for (const channel of channels) {
const spendingTx = await bitcoinApi.$getOutspend(channel.transaction_id, channel.transaction_vout);
if (spendingTx.spent === true && spendingTx.status?.confirmed === true) {
logger.debug('Marking channel: ' + channel.id + ' as closed.');
logger.debug(`Marking channel: ${channel.id} as closed.`, logger.tags.ln);
await DB.query(`UPDATE channels SET status = 2, closing_date = FROM_UNIXTIME(?) WHERE id = ?`,
[spendingTx.status.block_time, channel.id]);
if (spendingTx.txid && !channel.closing_transaction_id) {
@@ -296,16 +299,16 @@ class NetworkSyncService {
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Checking if channel has been closed ${progress}/${channels.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.info(`Checking if channel has been closed ${progress}/${channels.length}`, logger.tags.ln);
this.loggerTimer = new Date().getTime() / 1000;
}
}
this.closedChannelsScanBlock = blocks.getCurrentBlockHeight();
logger.info(`Closed channels scan completed at block ${this.closedChannelsScanBlock}`);
logger.debug(`Closed channels scan completed at block ${this.closedChannelsScanBlock}`, logger.tags.ln);
} catch (e) {
logger.err('$scanForClosedChannels() error: ' + (e instanceof Error ? e.message : e));
logger.err(`$scanForClosedChannels() error: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
}
}

View File

@@ -6,7 +6,7 @@ import { Common } from '../../api/common';
class LightningStatsUpdater {
public async $startService(): Promise<void> {
logger.info('Starting Lightning Stats service');
logger.info(`Starting Lightning Stats service`, logger.tags.ln);
await this.$runTasks();
LightningStatsImporter.$run();
@@ -27,7 +27,7 @@ class LightningStatsUpdater {
const networkGraph = await lightningApi.$getNetworkGraph();
await LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph);
logger.info(`Updated latest network stats`);
logger.debug(`Updated latest network stats`, logger.tags.ln);
}
}

View File

@@ -21,10 +21,10 @@ class FundingTxFetcher {
try {
this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8'));
} catch (e) {
logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`);
logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`, logger.tags.ln);
this.fundingTxCache = {};
}
logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`);
logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`, logger.tags.ln);
}
}
@@ -44,26 +44,27 @@ class FundingTxFetcher {
++channelProcessed;
let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
if (elapsedSeconds > 10) {
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer);
logger.info(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` +
`(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` +
`elapsed: ${elapsedSeconds} seconds`
`elapsed: ${elapsedSeconds} seconds`,
logger.tags.ln
);
loggerTimer = new Date().getTime() / 1000;
}
elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer);
if (elapsedSeconds > 60) {
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`, logger.tags.ln);
fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
cacheTimer = new Date().getTime() / 1000;
}
}
if (this.channelNewlyProcessed > 0) {
logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`);
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`, logger.tags.ln);
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`, logger.tags.ln);
fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
}

View File

@@ -14,7 +14,7 @@ export async function $lookupNodeLocation(): Promise<void> {
let nodesUpdated = 0;
let geoNamesInserted = 0;
logger.info(`Running node location updater using Maxmind`);
logger.debug(`Running node location updater using Maxmind`, logger.tags.ln);
try {
const nodes = await nodesApi.$getAllNodes();
const lookupCity = await maxmind.open<CityResponse>(config.MAXMIND.GEOLITE2_CITY);
@@ -152,8 +152,8 @@ export async function $lookupNodeLocation(): Promise<void> {
++progress;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
if (elapsedSeconds > 10) {
logger.info(`Updating node location data ${progress}/${nodes.length}`);
if (elapsedSeconds > config.LIGHTNING.LOGGER_UPDATE_INTERVAL) {
logger.debug(`Updating node location data ${progress}/${nodes.length}`);
loggerTimer = new Date().getTime() / 1000;
}
}
@@ -161,9 +161,7 @@ export async function $lookupNodeLocation(): Promise<void> {
}
if (nodesUpdated > 0) {
logger.info(`${nodesUpdated} nodes maxmind data updated, ${geoNamesInserted} geo names inserted`);
} else {
logger.debug(`${nodesUpdated} nodes maxmind data updated, ${geoNamesInserted} geo names inserted`);
logger.debug(`${nodesUpdated} nodes maxmind data updated, ${geoNamesInserted} geo names inserted`, logger.tags.ln);
}
} catch (e) {
logger.err('$lookupNodeLocation() error: ' + (e instanceof Error ? e.message : e));

View File

@@ -8,7 +8,6 @@ import { isIP } from 'net';
import { Common } from '../../../api/common';
import channelsApi from '../../../api/explorer/channels.api';
import nodesApi from '../../../api/explorer/nodes.api';
import { ResultSetHeader } from 'mysql2';
const fsPromises = promises;
@@ -17,7 +16,7 @@ class LightningStatsImporter {
async $run(): Promise<void> {
const [channels]: any[] = await DB.query('SELECT short_id from channels;');
logger.info('Caching funding txs for currently existing channels');
logger.info(`Caching funding txs for currently existing channels`, logger.tags.ln);
await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id));
if (config.MEMPOOL.NETWORK !== 'mainnet' || config.DATABASE.ENABLED === false) {
@@ -108,7 +107,7 @@ class LightningStatsImporter {
const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
if (!tx) {
logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`);
logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`, logger.tags.ln);
continue;
}
@@ -310,13 +309,18 @@ class LightningStatsImporter {
* Import topology files LN historical data into the database
*/
async $importHistoricalLightningStats(): Promise<void> {
if (!config.LIGHTNING.TOPOLOGY_FOLDER) {
logger.info(`Lightning topology folder is not set. Not importing historical LN stats`);
return;
}
logger.debug('Run the historical importer');
try {
let fileList: string[] = [];
try {
fileList = await fsPromises.readdir(this.topologiesFolder);
} catch (e) {
logger.err(`Unable to open topology folder at ${this.topologiesFolder}`);
logger.err(`Unable to open topology folder at ${this.topologiesFolder}`, logger.tags.ln);
throw e;
}
// Insert history from the most recent to the oldest
@@ -354,7 +358,7 @@ class LightningStatsImporter {
continue;
}
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
logger.debug(`Reading ${this.topologiesFolder}/${filename}`, logger.tags.ln);
let fileContent = '';
try {
fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
@@ -363,7 +367,7 @@ class LightningStatsImporter {
totalProcessed++;
continue;
}
logger.err(`Unable to open ${this.topologiesFolder}/${filename}`);
logger.err(`Unable to open ${this.topologiesFolder}/${filename}`, logger.tags.ln);
totalProcessed++;
continue;
}
@@ -373,7 +377,7 @@ class LightningStatsImporter {
graph = JSON.parse(fileContent);
graph = await this.cleanupTopology(graph);
} catch (e) {
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content. Reason: ${e instanceof Error ? e.message : e}`);
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content. Reason: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
totalProcessed++;
continue;
}
@@ -385,20 +389,20 @@ class LightningStatsImporter {
}
if (!logStarted) {
logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`);
logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`, logger.tags.ln);
logStarted = true;
}
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`, logger.tags.ln);
totalProcessed++;
if (processed > 10) {
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`, logger.tags.ln);
processed = 0;
} else {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`, logger.tags.ln);
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph, true);
@@ -407,10 +411,10 @@ class LightningStatsImporter {
}
if (totalProcessed > 0) {
logger.info(`Lightning network stats historical import completed`);
logger.notice(`Lightning network stats historical import completed`, logger.tags.ln);
}
} catch (e) {
logger.err(`Lightning network stats historical failed. Reason: ${e instanceof Error ? e.message : e}`);
logger.err(`Lightning network stats historical failed. Reason: ${e instanceof Error ? e.message : e}`, logger.tags.ln);
}
}

View File

@@ -32,9 +32,9 @@ class PoolsUpdater {
this.lastRun = now;
if (config.SOCKS5PROXY.ENABLED) {
logger.info(`Updating latest mining pools from ${this.poolsUrl} over the Tor network`);
logger.info(`Updating latest mining pools from ${this.poolsUrl} over the Tor network`, logger.tags.mining);
} else {
logger.info(`Updating latest mining pools from ${this.poolsUrl} over clearnet`);
logger.info(`Updating latest mining pools from ${this.poolsUrl} over clearnet`, logger.tags.mining);
}
try {
@@ -53,9 +53,9 @@ class PoolsUpdater {
}
if (this.currentSha === undefined) {
logger.info(`Downloading pools.json for the first time from ${this.poolsUrl}`);
logger.info(`Downloading pools.json for the first time from ${this.poolsUrl}`, logger.tags.mining);
} else {
logger.warn(`Pools.json is outdated, fetch latest from ${this.poolsUrl}`);
logger.warn(`Pools.json is outdated, fetch latest from ${this.poolsUrl}`, logger.tags.mining);
}
const poolsJson = await this.query(this.poolsUrl);
if (poolsJson === undefined) {
@@ -63,11 +63,11 @@ class PoolsUpdater {
}
await poolsParser.migratePoolsJson(poolsJson);
await this.updateDBSha(githubSha);
logger.notice('PoolsUpdater completed');
logger.notice(`PoolsUpdater completed`, logger.tags.mining);
} catch (e) {
this.lastRun = now - (oneWeek - oneDay); // Try again in 24h instead of waiting next week
logger.err('PoolsUpdater failed. Will try again in 24h. Reason: ' + (e instanceof Error ? e.message : e));
logger.err(`PoolsUpdater failed. Will try again in 24h. Reason: ${e instanceof Error ? e.message : e}`, logger.tags.mining);
}
}
@@ -81,7 +81,7 @@ class PoolsUpdater {
await DB.query('DELETE FROM state where name="pools_json_sha"');
await DB.query(`INSERT INTO state VALUES('pools_json_sha', NULL, '${githubSha}')`);
} catch (e) {
logger.err('Cannot save github pools.json sha into the db. Reason: ' + (e instanceof Error ? e.message : e));
logger.err('Cannot save github pools.json sha into the db. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
}
}
}
@@ -94,7 +94,7 @@ class PoolsUpdater {
const [rows]: any[] = await DB.query('SELECT string FROM state WHERE name="pools_json_sha"');
return (rows.length > 0 ? rows[0].string : undefined);
} catch (e) {
logger.err('Cannot fetch pools.json sha from db. Reason: ' + (e instanceof Error ? e.message : e));
logger.err('Cannot fetch pools.json sha from db. Reason: ' + (e instanceof Error ? e.message : e), logger.tags.mining);
return undefined;
}
}
@@ -113,7 +113,7 @@ class PoolsUpdater {
}
}
logger.err(`Cannot find "pools.json" in git tree (${this.treeUrl})`);
logger.err(`Cannot find "pools.json" in git tree (${this.treeUrl})`, logger.tags.mining);
return undefined;
}

View File

@@ -91,7 +91,7 @@ class KrakenApi implements PriceFeed {
}
if (Object.keys(priceHistory).length > 0) {
logger.notice(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`);
logger.notice(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`, logger.tags.mining);
}
}
}

View File

@@ -82,7 +82,7 @@ class PriceUpdater {
await this.$updatePrice();
}
} catch (e) {
logger.err(`Cannot save BTC prices in db. Reason: ${e instanceof Error ? e.message : e}`);
logger.err(`Cannot save BTC prices in db. Reason: ${e instanceof Error ? e.message : e}`, logger.tags.mining);
}
this.running = false;
@@ -115,14 +115,14 @@ class PriceUpdater {
if (price > 0) {
prices.push(price);
}
logger.debug(`${feed.name} BTC/${currency} price: ${price}`);
logger.debug(`${feed.name} BTC/${currency} price: ${price}`, logger.tags.mining);
} catch (e) {
logger.debug(`Could not fetch BTC/${currency} price at ${feed.name}. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.debug(`Could not fetch BTC/${currency} price at ${feed.name}. Reason: ${(e instanceof Error ? e.message : e)}`, logger.tags.mining);
}
}
}
if (prices.length === 1) {
logger.debug(`Only ${prices.length} feed available for BTC/${currency} price`);
logger.debug(`Only ${prices.length} feed available for BTC/${currency} price`, logger.tags.mining);
}
// Compute average price, non weighted
@@ -175,9 +175,9 @@ class PriceUpdater {
++insertedCount;
}
if (insertedCount > 0) {
logger.notice(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
logger.notice(`Inserted ${insertedCount} MtGox USD weekly price history into db`, logger.tags.mining);
} else {
logger.debug(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
logger.debug(`Inserted ${insertedCount} MtGox USD weekly price history into db`, logger.tags.mining);
}
// Insert Kraken weekly prices
@@ -198,7 +198,7 @@ class PriceUpdater {
private async $insertMissingRecentPrices(type: 'hour' | 'day'): Promise<void> {
const existingPriceTimes = await PricesRepository.$getPricesTimes();
logger.info(`Fetching ${type === 'day' ? 'dai' : 'hour'}ly price history from exchanges and saving missing ones into the database, this may take a while`);
logger.info(`Fetching ${type === 'day' ? 'dai' : 'hour'}ly price history from exchanges and saving missing ones into the database`, logger.tags.mining);
const historicalPrices: PriceHistory[] = [];
@@ -207,7 +207,7 @@ class PriceUpdater {
try {
historicalPrices.push(await feed.$fetchRecentPrice(this.currencies, type));
} catch (e) {
logger.err(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`);
logger.err(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`, logger.tags.mining);
}
}
@@ -252,9 +252,9 @@ class PriceUpdater {
}
if (totalInserted > 0) {
logger.notice(`Inserted ${totalInserted} ${type === 'day' ? 'dai' : 'hour'}ly historical prices into the db`);
logger.notice(`Inserted ${totalInserted} ${type === 'day' ? 'dai' : 'hour'}ly historical prices into the db`, logger.tags.mining);
} else {
logger.debug(`Inserted ${totalInserted} ${type === 'day' ? 'dai' : 'hour'}ly historical prices into the db`);
logger.debug(`Inserted ${totalInserted} ${type === 'day' ? 'dai' : 'hour'}ly historical prices into the db`, logger.tags.mining);
}
}
}