From 6be2985b407ef8cdb44ffbbcf59d227169564e08 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Wed, 3 Aug 2022 12:13:55 +0200 Subject: [PATCH] Fix daily LN stats crash --- .../clightning/clightning-convert.ts | 33 ++++- .../tasks/lightning/stats-updater.service.ts | 10 +- .../sync-tasks/funding-tx-fetcher.ts | 2 +- .../lightning/sync-tasks/stats-importer.ts | 132 ++++++++++++------ 4 files changed, 123 insertions(+), 54 deletions(-) diff --git a/backend/src/api/lightning/clightning/clightning-convert.ts b/backend/src/api/lightning/clightning/clightning-convert.ts index 1a267bc65..75c8ec20c 100644 --- a/backend/src/api/lightning/clightning/clightning-convert.ts +++ b/backend/src/api/lightning/clightning/clightning-convert.ts @@ -1,5 +1,6 @@ import { ILightningApi } from '../lightning-api.interface'; import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher'; +import logger from '../../../logger'; /** * Convert a clightning "listnode" entry to a lnd node entry @@ -23,12 +24,17 @@ export function convertNode(clNode: any): ILightningApi.Node { /** * Convert clightning "listchannels" response to lnd "describegraph.edges" format */ - export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise { +export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise { + logger.info('Converting clightning nodes and channels to lnd graph format'); + + let loggerTimer = new Date().getTime() / 1000; + let channelProcessed = 0; + const consolidatedChannelList: ILightningApi.Channel[] = []; const clChannelsDict = {}; const clChannelsDictCount = {}; - for (const clChannel of clChannels) { + for (const clChannel of clChannels) { if (!clChannelsDict[clChannel.short_channel_id]) { clChannelsDict[clChannel.short_channel_id] = clChannel; clChannelsDictCount[clChannel.short_channel_id] = 1; @@ -39,9 +45,26 @@ export function convertNode(clNode: any): ILightningApi.Node { delete clChannelsDict[clChannel.short_channel_id]; clChannelsDictCount[clChannel.short_channel_id]++; } + + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Building complete channels from clightning output. Channels processed: ${channelProcessed + 1} of ${clChannels.length}`); + loggerTimer = new Date().getTime() / 1000; + } + + ++channelProcessed; } - for (const short_channel_id of Object.keys(clChannelsDict)) { + + channelProcessed = 0; + const keys = Object.keys(clChannelsDict); + for (const short_channel_id of keys) { consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id])); + + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Building partial channels from clightning output. Channels processed: ${channelProcessed + 1} of ${keys.length}`); + loggerTimer = new Date().getTime() / 1000; + } } return consolidatedChannelList; @@ -79,7 +102,7 @@ async function buildFullChannel(clChannelA: any, clChannelB: any): Promise { +async function buildIncompleteChannel(clChannel: any): Promise { const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id); const parts = clChannel.short_channel_id.split('x'); const outputIdx = parts[2]; @@ -99,7 +122,7 @@ async function buildFullChannel(clChannelA: any, clChannelB: any): Promise { this.$runTasks(); }, this.timeUntilMidnight()); @@ -42,9 +43,14 @@ class LightningStatsUpdater { this.setDateMidnight(date); date.setUTCHours(24); + const [rows] = await DB.query(`SELECT UNIX_TIMESTAMP(MAX(added)) as lastAdded from lightning_stats`); + if ((rows[0].lastAdded ?? 0) === date.getTime() / 1000) { + return; + } + logger.info(`Running lightning daily stats log...`); const networkGraph = await lightningApi.$getNetworkGraph(); - LightningStatsImporter.computeNetworkStats(date.getTime(), networkGraph); + LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph); } } diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index 926d20c91..6ca72aef7 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -45,7 +45,7 @@ class FundingTxFetcher { let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); if (elapsedSeconds > 10) { elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer); - logger.debug(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + + logger.info(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + `(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` + `elapsed: ${elapsedSeconds} seconds` ); diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index 91e67f77d..d9c441498 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -13,19 +13,19 @@ interface Node { features: string; rgb_color: string; alias: string; - addresses: string; + addresses: unknown[]; out_degree: number; in_degree: number; } interface Channel { - scid: string; - source: string; - destination: string; + channel_id: string; + node1_pub: string; + node2_pub: string; timestamp: number; features: string; fee_base_msat: number; - fee_proportional_millionths: number; + fee_rate_milli_msat: number; htlc_minimim_msat: number; cltv_expiry_delta: number; htlc_maximum_msat: number; @@ -60,10 +60,9 @@ class LightningStatsImporter { let hasClearnet = false; let isUnnanounced = true; - const sockets = node.addresses.split(','); - for (const socket of sockets) { - hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); - hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); + for (const socket of (node.addresses ?? [])) { + hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network); + hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network); } if (hasOnion && hasClearnet) { clearnetTorNodes++; @@ -90,8 +89,11 @@ class LightningStatsImporter { const baseFees: number[] = []; const alreadyCountedChannels = {}; - for (const channel of networkGraph.channels) { - const short_id = channel.scid.slice(0, -2); + for (const channel of networkGraph.edges) { + let short_id = channel.channel_id; + if (short_id.indexOf('/') !== -1) { + short_id = short_id.slice(0, -2); + } const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id); if (!tx) { @@ -99,23 +101,23 @@ class LightningStatsImporter { continue; } - if (!nodeStats[channel.source]) { - nodeStats[channel.source] = { + if (!nodeStats[channel.node1_pub]) { + nodeStats[channel.node1_pub] = { capacity: 0, channels: 0, }; } - if (!nodeStats[channel.destination]) { - nodeStats[channel.destination] = { + if (!nodeStats[channel.node2_pub]) { + nodeStats[channel.node2_pub] = { capacity: 0, channels: 0, }; } - nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); - nodeStats[channel.source].channels++; - nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); - nodeStats[channel.destination].channels++; + nodeStats[channel.node1_pub].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.node1_pub].channels++; + nodeStats[channel.node2_pub].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.node2_pub].channels++; if (!alreadyCountedChannels[short_id]) { capacity += Math.round(tx.value * 100000000); @@ -123,19 +125,31 @@ class LightningStatsImporter { alreadyCountedChannels[short_id] = true; } - if (channel.fee_proportional_millionths < 5000) { - avgFeeRate += channel.fee_proportional_millionths; - feeRates.push(channel.fee_proportional_millionths); - } - - if (channel.fee_base_msat < 5000) { - avgBaseFee += channel.fee_base_msat; - baseFees.push(channel.fee_base_msat); + if (channel.node1_policy !== undefined) { // Coming from the node + for (const policy of [channel.node1_policy, channel.node2_policy]) { + if (policy && policy.fee_rate_milli_msat < 5000) { + avgFeeRate += policy.fee_rate_milli_msat; + feeRates.push(policy.fee_rate_milli_msat); + } + if (policy && policy.fee_base_msat < 5000) { + avgBaseFee += policy.fee_base_msat; + baseFees.push(policy.fee_base_msat); + } + } + } else { // Coming from the historical import + if (channel.fee_rate_milli_msat < 5000) { + avgFeeRate += channel.fee_rate_milli_msat; + feeRates.push(channel.fee_rate_milli_msat); + } + if (channel.fee_base_msat < 5000) { + avgBaseFee += channel.fee_base_msat; + baseFees.push(channel.fee_base_msat); + } } } - avgFeeRate /= networkGraph.channels.length; - avgBaseFee /= networkGraph.channels.length; + avgFeeRate /= networkGraph.edges.length; + avgBaseFee /= networkGraph.edges.length; const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; @@ -203,15 +217,28 @@ class LightningStatsImporter { let latestNodeCount = 1; const fileList = await fsPromises.readdir(this.topologiesFolder); + // Insert history from the most recent to the oldest + // This also put the .json cached files first fileList.sort().reverse(); - const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added, node_count FROM lightning_stats'); + const [rows]: any[] = await DB.query(` + SELECT UNIX_TIMESTAMP(added) AS added, node_count + FROM lightning_stats + ORDER BY added DESC + `); const existingStatsTimestamps = {}; for (const row of rows) { - existingStatsTimestamps[row.added] = rows[0]; + existingStatsTimestamps[row.added] = row; } + // For logging purpose + let processed = 10; + let totalProcessed = -1; + for (const filename of fileList) { + processed++; + totalProcessed++; + const timestamp = parseInt(filename.split('_')[1], 10); // Stats exist already, don't calculate/insert them @@ -220,7 +247,7 @@ class LightningStatsImporter { continue; } - logger.debug(`Processing ${this.topologiesFolder}/${filename}`); + logger.debug(`Reading ${this.topologiesFolder}/${filename}`); const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); let graph; @@ -228,12 +255,13 @@ class LightningStatsImporter { try { graph = JSON.parse(fileContent); } catch (e) { - logger.debug(`Invalid topology file, cannot parse the content`); + logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); + continue; } } else { graph = this.parseFile(fileContent); if (!graph) { - logger.debug(`Invalid topology file, cannot parse the content`); + logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); continue; } await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); @@ -245,19 +273,22 @@ class LightningStatsImporter { const diffRatio = graph.nodes.length / latestNodeCount; if (diffRatio < 0.9) { // Ignore drop of more than 90% of the node count as it's probably a missing data point + logger.debug(`Nodes count diff ratio threshold reached, ignore the data for this day ${graph.nodes.length} nodes vs ${latestNodeCount}`); continue; } } latestNodeCount = graph.nodes.length; const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; - logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); + logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`); - // Cache funding txs - logger.debug(`Caching funding txs for ${datestr}`); - await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); - - logger.debug(`Generating LN network stats for ${datestr}`); + if (processed > 10) { + logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); + processed = 0; + } else { + logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); + } + await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2))); const stat = await this.computeNetworkStats(timestamp, graph); existingStatsTimestamps[timestamp] = stat; @@ -290,13 +321,22 @@ class LightningStatsImporter { if (!node.data) { continue; } + const addresses: unknown[] = []; + const sockets = node.data[5].split(','); + for (const socket of sockets) { + const parts = socket.split('://'); + addresses.push({ + network: parts[0], + addr: parts[1], + }); + } nodes.push({ id: node.data[0], timestamp: node.data[1], features: node.data[2], rgb_color: node.data[3], alias: node.data[4], - addresses: node.data[5], + addresses: addresses, out_degree: node.data[6], in_degree: node.data[7], }); @@ -307,13 +347,13 @@ class LightningStatsImporter { continue; } channels.push({ - scid: channel.data[0], - source: channel.data[1], - destination: channel.data[2], + channel_id: channel.data[0], + node1_pub: channel.data[1], + node2_pub: channel.data[2], timestamp: channel.data[3], features: channel.data[4], fee_base_msat: channel.data[5], - fee_proportional_millionths: channel.data[6], + fee_rate_milli_msat: channel.data[6], htlc_minimim_msat: channel.data[7], cltv_expiry_delta: channel.data[8], htlc_maximum_msat: channel.data[9], @@ -322,7 +362,7 @@ class LightningStatsImporter { return { nodes: nodes, - channels: channels, + edges: channels, }; } }