Make sure to not count channels twice

This commit is contained in:
nymkappa 2022-08-02 12:19:57 +02:00
parent b246c6f4c3
commit 5287490894
No known key found for this signature in database
GPG Key ID: E155910B16E8BD04
3 changed files with 66 additions and 34 deletions

View File

@ -1,4 +1,3 @@
import DB from '../../database';
import logger from '../../logger'; import logger from '../../logger';
import lightningApi from '../../api/lightning/lightning-api-factory'; import lightningApi from '../../api/lightning/lightning-api-factory';
import LightningStatsImporter from './sync-tasks/stats-importer'; import LightningStatsImporter from './sync-tasks/stats-importer';

View File

@ -1,8 +1,11 @@
import { existsSync, readFileSync, writeFileSync } from 'fs'; import { existsSync, promises } from 'fs';
import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; import bitcoinClient from '../../../api/bitcoin/bitcoin-client';
import config from '../../../config'; import config from '../../../config';
import DB from '../../../database';
import logger from '../../../logger'; import logger from '../../../logger';
const fsPromises = promises;
const BLOCKS_CACHE_MAX_SIZE = 100; const BLOCKS_CACHE_MAX_SIZE = 100;
const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json'; const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json';
@ -21,7 +24,7 @@ class FundingTxFetcher {
// Load funding tx disk cache // Load funding tx disk cache
if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) {
try { try {
this.fundingTxCache = JSON.parse(readFileSync(CACHE_FILE_NAME, 'utf-8')); this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8'));
} catch (e) { } catch (e) {
logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`); logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`);
this.fundingTxCache = {}; this.fundingTxCache = {};
@ -51,7 +54,7 @@ class FundingTxFetcher {
elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer); elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer);
if (elapsedSeconds > 60) { if (elapsedSeconds > 60) {
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
cacheTimer = new Date().getTime() / 1000; cacheTimer = new Date().getTime() / 1000;
} }
} }
@ -59,7 +62,7 @@ class FundingTxFetcher {
if (this.channelNewlyProcessed > 0) { if (this.channelNewlyProcessed > 0) {
logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`); logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`);
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
} }
this.running = false; this.running = false;
@ -76,13 +79,30 @@ class FundingTxFetcher {
const outputIdx = parts[2]; const outputIdx = parts[2];
let block = this.blocksCache[blockHeight]; let block = this.blocksCache[blockHeight];
// Check if we have the block in the `blocks_summaries` table to avoid calling core
if (!block) {
const [rows] = await DB.query(`
SELECT UNIX_TIMESTAMP(blocks.blockTimestamp) AS time, blocks_summaries.transactions AS tx
FROM blocks_summaries
JOIN blocks ON blocks.hash = blocks_summaries.id
WHERE blocks_summaries.height = ${blockHeight}
`);
block = rows[0] ?? null;
if (block) {
block.tx = JSON.parse(block.tx);
if (block.tx.length === 0) {
block = null;
}
}
}
// Fetch it from core
if (!block) { if (!block) {
const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10));
block = await bitcoinClient.getBlock(blockHash, 2); block = await bitcoinClient.getBlock(blockHash, 2);
this.blocksCache[block.height] = block;
} }
this.blocksCache[block.height] = block;
const blocksCacheHashes = Object.keys(this.blocksCache).sort(); const blocksCacheHashes = Object.keys(this.blocksCache).sort((a, b) => parseInt(b) - parseInt(a)).reverse();
if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) { if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) {
for (let i = 0; i < 10; ++i) { for (let i = 0; i < 10; ++i) {
delete this.blocksCache[blocksCacheHashes[i]]; delete this.blocksCache[blocksCacheHashes[i]];
@ -92,7 +112,7 @@ class FundingTxFetcher {
this.fundingTxCache[channelId] = { this.fundingTxCache[channelId] = {
timestamp: block.time, timestamp: block.time,
txid: block.tx[txIdx].txid, txid: block.tx[txIdx].txid,
value: block.tx[txIdx].vout[outputIdx].value, value: block.tx[txIdx].value / 100000000 ?? block.tx[txIdx].vout[outputIdx].value,
}; };
++this.channelNewlyProcessed; ++this.channelNewlyProcessed;

View File

@ -1,10 +1,12 @@
import DB from '../../../database'; import DB from '../../../database';
import { readdirSync, readFileSync } from 'fs'; import { promises } from 'fs';
import { XMLParser } from 'fast-xml-parser'; import { XMLParser } from 'fast-xml-parser';
import logger from '../../../logger'; import logger from '../../../logger';
import fundingTxFetcher from './funding-tx-fetcher'; import fundingTxFetcher from './funding-tx-fetcher';
import config from '../../../config'; import config from '../../../config';
const fsPromises = promises;
interface Node { interface Node {
id: string; id: string;
timestamp: number; timestamp: number;
@ -33,14 +35,12 @@ class LightningStatsImporter {
topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER;
parser = new XMLParser(); parser = new XMLParser();
latestNodeCount = 1; // Ignore gap in the data
async $run(): Promise<void> { async $run(): Promise<void> {
logger.info(`Importing historical lightning stats`); logger.info(`Importing historical lightning stats`);
// const [channels]: any[] = await DB.query('SELECT short_id from channels;'); const [channels]: any[] = await DB.query('SELECT short_id from channels;');
// logger.info('Caching funding txs for currently existing channels'); logger.info('Caching funding txs for currently existing channels');
// await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id));
await this.$importHistoricalLightningStats(); await this.$importHistoricalLightningStats();
} }
@ -148,6 +148,8 @@ class LightningStatsImporter {
const capacities: number[] = []; const capacities: number[] = [];
const feeRates: number[] = []; const feeRates: number[] = [];
const baseFees: number[] = []; const baseFees: number[] = [];
const alreadyCountedChannels = {};
for (const channel of networkGraph.channels) { for (const channel of networkGraph.channels) {
const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2));
if (!tx) { if (!tx) {
@ -173,10 +175,14 @@ class LightningStatsImporter {
nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000);
nodeStats[channel.destination].channels++; nodeStats[channel.destination].channels++;
if (!alreadyCountedChannels[channel.scid.slice(0, -2)]) {
capacity += Math.round(tx.value * 100000000); capacity += Math.round(tx.value * 100000000);
capacities.push(Math.round(tx.value * 100000000));
alreadyCountedChannels[channel.scid.slice(0, -2)] = true;
}
avgFeeRate += channel.fee_proportional_millionths; avgFeeRate += channel.fee_proportional_millionths;
avgBaseFee += channel.fee_base_msat; avgBaseFee += channel.fee_base_msat;
capacities.push(Math.round(tx.value * 100000000));
feeRates.push(channel.fee_proportional_millionths); feeRates.push(channel.fee_proportional_millionths);
baseFees.push(channel.fee_base_msat); baseFees.push(channel.fee_base_msat);
} }
@ -186,6 +192,7 @@ class LightningStatsImporter {
const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
const avgCapacity = Math.round(capacity / capacities.length);
let query = `INSERT INTO lightning_stats( let query = `INSERT INTO lightning_stats(
added, added,
@ -207,14 +214,14 @@ class LightningStatsImporter {
await DB.query(query, [ await DB.query(query, [
timestamp, timestamp,
networkGraph.channels.length, capacities.length,
networkGraph.nodes.length, networkGraph.nodes.length,
capacity, capacity,
torNodes, torNodes,
clearnetNodes, clearnetNodes,
unannouncedNodes, unannouncedNodes,
clearnetTorNodes, clearnetTorNodes,
Math.round(capacity / networkGraph.channels.length), avgCapacity,
avgFeeRate, avgFeeRate,
avgBaseFee, avgBaseFee,
medCapacity, medCapacity,
@ -241,10 +248,10 @@ class LightningStatsImporter {
} }
async $importHistoricalLightningStats(): Promise<void> { async $importHistoricalLightningStats(): Promise<void> {
const fileList = readdirSync(this.topologiesFolder); const fileList = await fsPromises.readdir(this.topologiesFolder);
fileList.sort().reverse(); fileList.sort().reverse();
const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) AS added FROM lightning_stats');
const existingStatsTimestamps = {}; const existingStatsTimestamps = {};
for (const row of rows) { for (const row of rows) {
existingStatsTimestamps[row.added] = true; existingStatsTimestamps[row.added] = true;
@ -252,26 +259,30 @@ class LightningStatsImporter {
for (const filename of fileList) { for (const filename of fileList) {
const timestamp = parseInt(filename.split('_')[1], 10); const timestamp = parseInt(filename.split('_')[1], 10);
const fileContent = readFileSync(`${this.topologiesFolder}/${filename}`, 'utf8');
const graph = this.parseFile(fileContent);
if (!graph) {
continue;
}
// Ignore drop of more than 90% of the node count as it's probably a missing data point
const diffRatio = graph.nodes.length / this.latestNodeCount;
if (diffRatio < 0.90) {
continue;
}
this.latestNodeCount = graph.nodes.length;
// Stats exist already, don't calculate/insert them // Stats exist already, don't calculate/insert them
if (existingStatsTimestamps[timestamp] === true) { if (existingStatsTimestamps[timestamp] !== undefined) {
continue; continue;
} }
logger.debug(`Processing ${this.topologiesFolder}/${filename}`); logger.debug(`Processing ${this.topologiesFolder}/${filename}`);
const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
let graph;
if (filename.indexOf('.json') !== -1) {
try {
graph = JSON.parse(fileContent);
} catch (e) {
logger.debug(`Invalid topology file, cannot parse the content`);
}
} else {
graph = this.parseFile(fileContent);
if (!graph) {
logger.debug(`Invalid topology file, cannot parse the content`);
continue;
}
await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph));
}
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`);
@ -282,6 +293,8 @@ class LightningStatsImporter {
logger.debug(`Generating LN network stats for ${datestr}`); logger.debug(`Generating LN network stats for ${datestr}`);
await this.computeNetworkStats(timestamp, graph); await this.computeNetworkStats(timestamp, graph);
existingStatsTimestamps[timestamp] = true;
} }
logger.info(`Lightning network stats historical import completed`); logger.info(`Lightning network stats historical import completed`);