Merge pull request #2244 from mempool/nymkappa/bugfix/daily-stats-crash
Fix daily LN stats crash
This commit is contained in:
		
						commit
						6796bb94cc
					
				| @ -1,5 +1,6 @@ | ||||
| import { ILightningApi } from '../lightning-api.interface'; | ||||
| import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher'; | ||||
| import logger from '../../../logger'; | ||||
| 
 | ||||
| /** | ||||
|  * Convert a clightning "listnode" entry to a lnd node entry | ||||
| @ -23,12 +24,17 @@ export function convertNode(clNode: any): ILightningApi.Node { | ||||
| /** | ||||
|  * Convert clightning "listchannels" response to lnd "describegraph.edges" format | ||||
|  */ | ||||
|  export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> { | ||||
| export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> { | ||||
|   logger.info('Converting clightning nodes and channels to lnd graph format'); | ||||
| 
 | ||||
|   let loggerTimer = new Date().getTime() / 1000; | ||||
|   let channelProcessed = 0; | ||||
| 
 | ||||
|   const consolidatedChannelList: ILightningApi.Channel[] = []; | ||||
|   const clChannelsDict = {}; | ||||
|   const clChannelsDictCount = {}; | ||||
| 
 | ||||
|   for (const clChannel of clChannels) {     | ||||
|   for (const clChannel of clChannels) { | ||||
|     if (!clChannelsDict[clChannel.short_channel_id]) { | ||||
|       clChannelsDict[clChannel.short_channel_id] = clChannel; | ||||
|       clChannelsDictCount[clChannel.short_channel_id] = 1; | ||||
| @ -39,9 +45,26 @@ export function convertNode(clNode: any): ILightningApi.Node { | ||||
|       delete clChannelsDict[clChannel.short_channel_id]; | ||||
|       clChannelsDictCount[clChannel.short_channel_id]++; | ||||
|     } | ||||
| 
 | ||||
|     const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); | ||||
|     if (elapsedSeconds > 10) { | ||||
|       logger.info(`Building complete channels from clightning output. Channels processed: ${channelProcessed + 1} of ${clChannels.length}`); | ||||
|       loggerTimer = new Date().getTime() / 1000; | ||||
|     } | ||||
| 
 | ||||
|     ++channelProcessed; | ||||
|   } | ||||
|   for (const short_channel_id of Object.keys(clChannelsDict)) { | ||||
| 
 | ||||
|   channelProcessed = 0; | ||||
|   const keys = Object.keys(clChannelsDict); | ||||
|   for (const short_channel_id of keys) { | ||||
|     consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id])); | ||||
| 
 | ||||
|     const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); | ||||
|     if (elapsedSeconds > 10) { | ||||
|       logger.info(`Building partial channels from clightning output. Channels processed: ${channelProcessed + 1} of ${keys.length}`); | ||||
|       loggerTimer = new Date().getTime() / 1000; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   return consolidatedChannelList; | ||||
| @ -79,7 +102,7 @@ async function buildFullChannel(clChannelA: any, clChannelB: any): Promise<ILigh | ||||
|  * Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format | ||||
|  * In this case, clightning knows the channel policy of only one node | ||||
|  */ | ||||
|  async function buildIncompleteChannel(clChannel: any): Promise<ILightningApi.Channel> { | ||||
| async function buildIncompleteChannel(clChannel: any): Promise<ILightningApi.Channel> { | ||||
|   const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id); | ||||
|   const parts = clChannel.short_channel_id.split('x'); | ||||
|   const outputIdx = parts[2]; | ||||
| @ -99,7 +122,7 @@ async function buildFullChannel(clChannelA: any, clChannelB: any): Promise<ILigh | ||||
| /** | ||||
|  * Convert a clightning "listnode" response to a lnd channel policy format | ||||
|  */ | ||||
|  function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy { | ||||
| function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy { | ||||
|   return { | ||||
|     time_lock_delta: 0, // TODO
 | ||||
|     min_htlc: clChannel.htlc_minimum_msat.slice(0, -4), | ||||
|  | ||||
| @ -137,9 +137,7 @@ class Server { | ||||
|     } | ||||
| 
 | ||||
|     if (config.LIGHTNING.ENABLED) { | ||||
|       fundingTxFetcher.$init() | ||||
|       .then(() => networkSyncService.$startService()) | ||||
|       .then(() => lightningStatsUpdater.$startService()); | ||||
|       this.$runLightningBackend(); | ||||
|     } | ||||
| 
 | ||||
|     this.server.listen(config.MEMPOOL.HTTP_PORT, () => { | ||||
| @ -185,6 +183,18 @@ class Server { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $runLightningBackend() { | ||||
|     try { | ||||
|       await fundingTxFetcher.$init(); | ||||
|       await networkSyncService.$startService(); | ||||
|       await lightningStatsUpdater.$startService(); | ||||
|     } catch(e) { | ||||
|       logger.err(`Lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`); | ||||
|       await Common.sleep$(1000 * 60); | ||||
|       this.$runLightningBackend(); | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
|   setUpWebsocketHandling() { | ||||
|     if (this.wss) { | ||||
|       websocketHandler.setWebsocketServer(this.wss); | ||||
|  | ||||
| @ -1,3 +1,4 @@ | ||||
| import DB from '../../database'; | ||||
| import logger from '../../logger'; | ||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||
| import LightningStatsImporter from './sync-tasks/stats-importer'; | ||||
| @ -9,7 +10,7 @@ class LightningStatsUpdater { | ||||
|     logger.info('Starting Lightning Stats service'); | ||||
| 
 | ||||
|     LightningStatsImporter.$run(); | ||||
| 
 | ||||
|      | ||||
|     setTimeout(() => { | ||||
|       this.$runTasks(); | ||||
|     }, this.timeUntilMidnight()); | ||||
| @ -42,9 +43,14 @@ class LightningStatsUpdater { | ||||
|     this.setDateMidnight(date); | ||||
|     date.setUTCHours(24); | ||||
| 
 | ||||
|     const [rows] = await DB.query(`SELECT UNIX_TIMESTAMP(MAX(added)) as lastAdded from lightning_stats`); | ||||
|     if ((rows[0].lastAdded ?? 0) === date.getTime() / 1000) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     logger.info(`Running lightning daily stats log...`); | ||||
|     const networkGraph = await lightningApi.$getNetworkGraph(); | ||||
|     LightningStatsImporter.computeNetworkStats(date.getTime(), networkGraph); | ||||
|     LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -45,7 +45,7 @@ class FundingTxFetcher { | ||||
|       let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); | ||||
|       if (elapsedSeconds > 10) { | ||||
|         elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer); | ||||
|         logger.debug(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + | ||||
|         logger.info(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + | ||||
|           `(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` + | ||||
|           `elapsed: ${elapsedSeconds} seconds` | ||||
|         ); | ||||
|  | ||||
| @ -13,19 +13,19 @@ interface Node { | ||||
|   features: string; | ||||
|   rgb_color: string; | ||||
|   alias: string; | ||||
|   addresses: string; | ||||
|   addresses: unknown[]; | ||||
|   out_degree: number; | ||||
|   in_degree: number; | ||||
| } | ||||
| 
 | ||||
| interface Channel { | ||||
|   scid: string; | ||||
|   source: string; | ||||
|   destination: string; | ||||
|   channel_id: string; | ||||
|   node1_pub: string; | ||||
|   node2_pub: string; | ||||
|   timestamp: number; | ||||
|   features: string; | ||||
|   fee_base_msat: number; | ||||
|   fee_proportional_millionths: number; | ||||
|   fee_rate_milli_msat: number; | ||||
|   htlc_minimim_msat: number; | ||||
|   cltv_expiry_delta: number; | ||||
|   htlc_maximum_msat: number; | ||||
| @ -60,10 +60,9 @@ class LightningStatsImporter { | ||||
|       let hasClearnet = false; | ||||
|       let isUnnanounced = true; | ||||
| 
 | ||||
|       const sockets = node.addresses.split(','); | ||||
|       for (const socket of sockets) { | ||||
|         hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); | ||||
|         hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); | ||||
|       for (const socket of (node.addresses ?? [])) { | ||||
|         hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network); | ||||
|         hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network); | ||||
|       } | ||||
|       if (hasOnion && hasClearnet) { | ||||
|         clearnetTorNodes++; | ||||
| @ -90,8 +89,11 @@ class LightningStatsImporter { | ||||
|     const baseFees: number[] = []; | ||||
|     const alreadyCountedChannels = {}; | ||||
|      | ||||
|     for (const channel of networkGraph.channels) { | ||||
|       const short_id = channel.scid.slice(0, -2); | ||||
|     for (const channel of networkGraph.edges) { | ||||
|       let short_id = channel.channel_id; | ||||
|       if (short_id.indexOf('/') !== -1) { | ||||
|         short_id = short_id.slice(0, -2); | ||||
|       } | ||||
| 
 | ||||
|       const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id); | ||||
|       if (!tx) { | ||||
| @ -99,23 +101,23 @@ class LightningStatsImporter { | ||||
|         continue; | ||||
|       } | ||||
| 
 | ||||
|       if (!nodeStats[channel.source]) { | ||||
|         nodeStats[channel.source] = { | ||||
|       if (!nodeStats[channel.node1_pub]) { | ||||
|         nodeStats[channel.node1_pub] = { | ||||
|           capacity: 0, | ||||
|           channels: 0, | ||||
|         }; | ||||
|       } | ||||
|       if (!nodeStats[channel.destination]) { | ||||
|         nodeStats[channel.destination] = { | ||||
|       if (!nodeStats[channel.node2_pub]) { | ||||
|         nodeStats[channel.node2_pub] = { | ||||
|           capacity: 0, | ||||
|           channels: 0, | ||||
|         }; | ||||
|       } | ||||
|        | ||||
|       nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.source].channels++; | ||||
|       nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.destination].channels++; | ||||
|       nodeStats[channel.node1_pub].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.node1_pub].channels++; | ||||
|       nodeStats[channel.node2_pub].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.node2_pub].channels++; | ||||
| 
 | ||||
|       if (!alreadyCountedChannels[short_id]) { | ||||
|         capacity += Math.round(tx.value * 100000000); | ||||
| @ -123,19 +125,31 @@ class LightningStatsImporter { | ||||
|         alreadyCountedChannels[short_id] = true; | ||||
|       } | ||||
| 
 | ||||
|       if (channel.fee_proportional_millionths < 5000) { | ||||
|         avgFeeRate += channel.fee_proportional_millionths; | ||||
|         feeRates.push(channel.fee_proportional_millionths); | ||||
|       } | ||||
| 
 | ||||
|       if (channel.fee_base_msat < 5000) { | ||||
|         avgBaseFee += channel.fee_base_msat;       | ||||
|         baseFees.push(channel.fee_base_msat); | ||||
|       if (channel.node1_policy !== undefined) { // Coming from the node
 | ||||
|         for (const policy of [channel.node1_policy, channel.node2_policy]) { | ||||
|           if (policy && policy.fee_rate_milli_msat < 5000) { | ||||
|             avgFeeRate += policy.fee_rate_milli_msat; | ||||
|             feeRates.push(policy.fee_rate_milli_msat); | ||||
|           }   | ||||
|           if (policy && policy.fee_base_msat < 5000) { | ||||
|             avgBaseFee += policy.fee_base_msat;       | ||||
|             baseFees.push(policy.fee_base_msat); | ||||
|           } | ||||
|         } | ||||
|       } else { // Coming from the historical import
 | ||||
|         if (channel.fee_rate_milli_msat < 5000) { | ||||
|           avgFeeRate += channel.fee_rate_milli_msat; | ||||
|           feeRates.push(channel.fee_rate_milli_msat); | ||||
|         }   | ||||
|         if (channel.fee_base_msat < 5000) { | ||||
|           avgBaseFee += channel.fee_base_msat;       | ||||
|           baseFees.push(channel.fee_base_msat); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|      | ||||
|     avgFeeRate /= networkGraph.channels.length; | ||||
|     avgBaseFee /= networkGraph.channels.length; | ||||
|     avgFeeRate /= networkGraph.edges.length; | ||||
|     avgBaseFee /= networkGraph.edges.length; | ||||
|     const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; | ||||
|     const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; | ||||
|     const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; | ||||
| @ -203,15 +217,28 @@ class LightningStatsImporter { | ||||
|     let latestNodeCount = 1; | ||||
| 
 | ||||
|     const fileList = await fsPromises.readdir(this.topologiesFolder); | ||||
|     // Insert history from the most recent to the oldest
 | ||||
|     // This also put the .json cached files first
 | ||||
|     fileList.sort().reverse(); | ||||
| 
 | ||||
|     const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added, node_count FROM lightning_stats'); | ||||
|     const [rows]: any[] = await DB.query(` | ||||
|       SELECT UNIX_TIMESTAMP(added) AS added, node_count | ||||
|       FROM lightning_stats | ||||
|       ORDER BY added DESC | ||||
|     `);
 | ||||
|     const existingStatsTimestamps = {}; | ||||
|     for (const row of rows) { | ||||
|       existingStatsTimestamps[row.added] = rows[0]; | ||||
|       existingStatsTimestamps[row.added] = row; | ||||
|     } | ||||
| 
 | ||||
|     // For logging purpose
 | ||||
|     let processed = 10; | ||||
|     let totalProcessed = -1; | ||||
| 
 | ||||
|     for (const filename of fileList) { | ||||
|       processed++; | ||||
|       totalProcessed++; | ||||
| 
 | ||||
|       const timestamp = parseInt(filename.split('_')[1], 10); | ||||
| 
 | ||||
|       // Stats exist already, don't calculate/insert them
 | ||||
| @ -220,7 +247,7 @@ class LightningStatsImporter { | ||||
|         continue; | ||||
|       } | ||||
| 
 | ||||
|       logger.debug(`Processing ${this.topologiesFolder}/${filename}`); | ||||
|       logger.debug(`Reading ${this.topologiesFolder}/${filename}`); | ||||
|       const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); | ||||
| 
 | ||||
|       let graph; | ||||
| @ -228,12 +255,13 @@ class LightningStatsImporter { | ||||
|         try { | ||||
|           graph = JSON.parse(fileContent); | ||||
|         } catch (e) { | ||||
|           logger.debug(`Invalid topology file, cannot parse the content`); | ||||
|           logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); | ||||
|           continue; | ||||
|         } | ||||
|       } else { | ||||
|         graph = this.parseFile(fileContent); | ||||
|         if (!graph) { | ||||
|           logger.debug(`Invalid topology file, cannot parse the content`); | ||||
|           logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); | ||||
|           continue; | ||||
|         } | ||||
|         await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); | ||||
| @ -245,19 +273,22 @@ class LightningStatsImporter { | ||||
|         const diffRatio = graph.nodes.length / latestNodeCount; | ||||
|         if (diffRatio < 0.9) { | ||||
|           // Ignore drop of more than 90% of the node count as it's probably a missing data point
 | ||||
|           logger.debug(`Nodes count diff ratio threshold reached, ignore the data for this day ${graph.nodes.length} nodes vs ${latestNodeCount}`); | ||||
|           continue; | ||||
|         } | ||||
|       } | ||||
|       latestNodeCount = graph.nodes.length; | ||||
|        | ||||
|       const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; | ||||
|       logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); | ||||
|       logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`); | ||||
| 
 | ||||
|       // Cache funding txs
 | ||||
|       logger.debug(`Caching funding txs for ${datestr}`); | ||||
|       await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); | ||||
| 
 | ||||
|       logger.debug(`Generating LN network stats for ${datestr}`); | ||||
|       if (processed > 10) { | ||||
|         logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); | ||||
|         processed = 0; | ||||
|       } else { | ||||
|         logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); | ||||
|       } | ||||
|       await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2))); | ||||
|       const stat = await this.computeNetworkStats(timestamp, graph); | ||||
| 
 | ||||
|       existingStatsTimestamps[timestamp] = stat; | ||||
| @ -290,13 +321,22 @@ class LightningStatsImporter { | ||||
|       if (!node.data) { | ||||
|         continue; | ||||
|       } | ||||
|       const addresses: unknown[] = []; | ||||
|       const sockets = node.data[5].split(','); | ||||
|       for (const socket of sockets) { | ||||
|         const parts = socket.split('://'); | ||||
|         addresses.push({ | ||||
|           network: parts[0], | ||||
|           addr: parts[1], | ||||
|         }); | ||||
|       } | ||||
|       nodes.push({ | ||||
|         id: node.data[0], | ||||
|         timestamp: node.data[1], | ||||
|         features: node.data[2], | ||||
|         rgb_color: node.data[3], | ||||
|         alias: node.data[4], | ||||
|         addresses: node.data[5], | ||||
|         addresses: addresses, | ||||
|         out_degree: node.data[6], | ||||
|         in_degree: node.data[7], | ||||
|       }); | ||||
| @ -307,13 +347,13 @@ class LightningStatsImporter { | ||||
|         continue; | ||||
|       } | ||||
|       channels.push({ | ||||
|         scid: channel.data[0], | ||||
|         source: channel.data[1], | ||||
|         destination: channel.data[2], | ||||
|         channel_id: channel.data[0], | ||||
|         node1_pub: channel.data[1], | ||||
|         node2_pub: channel.data[2], | ||||
|         timestamp: channel.data[3], | ||||
|         features: channel.data[4], | ||||
|         fee_base_msat: channel.data[5], | ||||
|         fee_proportional_millionths: channel.data[6], | ||||
|         fee_rate_milli_msat: channel.data[6], | ||||
|         htlc_minimim_msat: channel.data[7], | ||||
|         cltv_expiry_delta: channel.data[8], | ||||
|         htlc_maximum_msat: channel.data[9], | ||||
| @ -322,7 +362,7 @@ class LightningStatsImporter { | ||||
| 
 | ||||
|     return { | ||||
|       nodes: nodes, | ||||
|       channels: channels, | ||||
|       edges: channels, | ||||
|     }; | ||||
|   } | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user