From 528749089455e667407910c79575d53be5162003 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 12:19:57 +0200 Subject: [PATCH] Make sure to not count channels twice --- .../tasks/lightning/stats-updater.service.ts | 1 - .../sync-tasks/funding-tx-fetcher.ts | 34 ++++++++-- .../lightning/sync-tasks/stats-importer.ts | 65 +++++++++++-------- 3 files changed, 66 insertions(+), 34 deletions(-) diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index f364629b9..5701ef22a 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -1,4 +1,3 @@ -import DB from '../../database'; import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; import LightningStatsImporter from './sync-tasks/stats-importer'; diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index b9407c44d..4068de8f1 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -1,8 +1,11 @@ -import { existsSync, readFileSync, writeFileSync } from 'fs'; +import { existsSync, promises } from 'fs'; import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; import config from '../../../config'; +import DB from '../../../database'; import logger from '../../../logger'; +const fsPromises = promises; + const BLOCKS_CACHE_MAX_SIZE = 100; const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json'; @@ -21,7 +24,7 @@ class FundingTxFetcher { // Load funding tx disk cache if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { try { - this.fundingTxCache = JSON.parse(readFileSync(CACHE_FILE_NAME, 'utf-8')); + this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8')); } catch (e) { logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`); this.fundingTxCache = {}; @@ -51,7 +54,7 @@ class FundingTxFetcher { elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer); if (elapsedSeconds > 60) { logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); - writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); cacheTimer = new Date().getTime() / 1000; } } @@ -59,7 +62,7 @@ class FundingTxFetcher { if (this.channelNewlyProcessed > 0) { logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`); logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); - writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); } this.running = false; @@ -76,13 +79,30 @@ class FundingTxFetcher { const outputIdx = parts[2]; let block = this.blocksCache[blockHeight]; + // Check if we have the block in the `blocks_summaries` table to avoid calling core + if (!block) { + const [rows] = await DB.query(` + SELECT UNIX_TIMESTAMP(blocks.blockTimestamp) AS time, blocks_summaries.transactions AS tx + FROM blocks_summaries + JOIN blocks ON blocks.hash = blocks_summaries.id + WHERE blocks_summaries.height = ${blockHeight} + `); + block = rows[0] ?? null; + if (block) { + block.tx = JSON.parse(block.tx); + if (block.tx.length === 0) { + block = null; + } + } + } + // Fetch it from core if (!block) { const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); block = await bitcoinClient.getBlock(blockHash, 2); - this.blocksCache[block.height] = block; } + this.blocksCache[block.height] = block; - const blocksCacheHashes = Object.keys(this.blocksCache).sort(); + const blocksCacheHashes = Object.keys(this.blocksCache).sort((a, b) => parseInt(b) - parseInt(a)).reverse(); if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) { for (let i = 0; i < 10; ++i) { delete this.blocksCache[blocksCacheHashes[i]]; @@ -92,7 +112,7 @@ class FundingTxFetcher { this.fundingTxCache[channelId] = { timestamp: block.time, txid: block.tx[txIdx].txid, - value: block.tx[txIdx].vout[outputIdx].value, + value: block.tx[txIdx].value / 100000000 ?? block.tx[txIdx].vout[outputIdx].value, }; ++this.channelNewlyProcessed; diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index f6d70df7d..8482b558c 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -1,10 +1,12 @@ import DB from '../../../database'; -import { readdirSync, readFileSync } from 'fs'; +import { promises } from 'fs'; import { XMLParser } from 'fast-xml-parser'; import logger from '../../../logger'; import fundingTxFetcher from './funding-tx-fetcher'; import config from '../../../config'; +const fsPromises = promises; + interface Node { id: string; timestamp: number; @@ -33,14 +35,12 @@ class LightningStatsImporter { topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; parser = new XMLParser(); - latestNodeCount = 1; // Ignore gap in the data - async $run(): Promise { logger.info(`Importing historical lightning stats`); - // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); - // logger.info('Caching funding txs for currently existing channels'); - // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); + const [channels]: any[] = await DB.query('SELECT short_id from channels;'); + logger.info('Caching funding txs for currently existing channels'); + await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); await this.$importHistoricalLightningStats(); } @@ -148,6 +148,8 @@ class LightningStatsImporter { const capacities: number[] = []; const feeRates: number[] = []; const baseFees: number[] = []; + const alreadyCountedChannels = {}; + for (const channel of networkGraph.channels) { const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); if (!tx) { @@ -173,10 +175,14 @@ class LightningStatsImporter { nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); nodeStats[channel.destination].channels++; - capacity += Math.round(tx.value * 100000000); + if (!alreadyCountedChannels[channel.scid.slice(0, -2)]) { + capacity += Math.round(tx.value * 100000000); + capacities.push(Math.round(tx.value * 100000000)); + alreadyCountedChannels[channel.scid.slice(0, -2)] = true; + } + avgFeeRate += channel.fee_proportional_millionths; avgBaseFee += channel.fee_base_msat; - capacities.push(Math.round(tx.value * 100000000)); feeRates.push(channel.fee_proportional_millionths); baseFees.push(channel.fee_base_msat); } @@ -186,6 +192,7 @@ class LightningStatsImporter { const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; + const avgCapacity = Math.round(capacity / capacities.length); let query = `INSERT INTO lightning_stats( added, @@ -207,14 +214,14 @@ class LightningStatsImporter { await DB.query(query, [ timestamp, - networkGraph.channels.length, + capacities.length, networkGraph.nodes.length, capacity, torNodes, clearnetNodes, unannouncedNodes, clearnetTorNodes, - Math.round(capacity / networkGraph.channels.length), + avgCapacity, avgFeeRate, avgBaseFee, medCapacity, @@ -241,10 +248,10 @@ class LightningStatsImporter { } async $importHistoricalLightningStats(): Promise { - const fileList = readdirSync(this.topologiesFolder); + const fileList = await fsPromises.readdir(this.topologiesFolder); fileList.sort().reverse(); - const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); + const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) AS added FROM lightning_stats'); const existingStatsTimestamps = {}; for (const row of rows) { existingStatsTimestamps[row.added] = true; @@ -252,26 +259,30 @@ class LightningStatsImporter { for (const filename of fileList) { const timestamp = parseInt(filename.split('_')[1], 10); - const fileContent = readFileSync(`${this.topologiesFolder}/${filename}`, 'utf8'); - - const graph = this.parseFile(fileContent); - if (!graph) { - continue; - } - - // Ignore drop of more than 90% of the node count as it's probably a missing data point - const diffRatio = graph.nodes.length / this.latestNodeCount; - if (diffRatio < 0.90) { - continue; - } - this.latestNodeCount = graph.nodes.length; // Stats exist already, don't calculate/insert them - if (existingStatsTimestamps[timestamp] === true) { + if (existingStatsTimestamps[timestamp] !== undefined) { continue; } logger.debug(`Processing ${this.topologiesFolder}/${filename}`); + const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); + + let graph; + if (filename.indexOf('.json') !== -1) { + try { + graph = JSON.parse(fileContent); + } catch (e) { + logger.debug(`Invalid topology file, cannot parse the content`); + } + } else { + graph = this.parseFile(fileContent); + if (!graph) { + logger.debug(`Invalid topology file, cannot parse the content`); + continue; + } + await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); + } const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); @@ -282,6 +293,8 @@ class LightningStatsImporter { logger.debug(`Generating LN network stats for ${datestr}`); await this.computeNetworkStats(timestamp, graph); + + existingStatsTimestamps[timestamp] = true; } logger.info(`Lightning network stats historical import completed`);