Merge pull request #2319 from mempool/nymkappa/feature/import-json-topology
Import json topology
This commit is contained in:
		
						commit
						4cf4efd3f2
					
				| @ -207,6 +207,10 @@ export class Common { | |||||||
| 
 | 
 | ||||||
|   /** Decodes a channel id returned by lnd as uint64 to a short channel id */ |   /** Decodes a channel id returned by lnd as uint64 to a short channel id */ | ||||||
|   static channelIntegerIdToShortId(id: string): string { |   static channelIntegerIdToShortId(id: string): string { | ||||||
|  |     if (id.indexOf('/') !== -1) { | ||||||
|  |       id = id.slice(0, -2); | ||||||
|  |     } | ||||||
|  |      | ||||||
|     if (id.indexOf('x') !== -1) { // Already a short id
 |     if (id.indexOf('x') !== -1) { // Already a short id
 | ||||||
|       return id; |       return id; | ||||||
|     } |     } | ||||||
|  | |||||||
| @ -71,9 +71,7 @@ class FundingTxFetcher { | |||||||
|   } |   } | ||||||
|    |    | ||||||
|   public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> { |   public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> { | ||||||
|     if (channelId.indexOf('x') === -1) { |     channelId = Common.channelIntegerIdToShortId(channelId); | ||||||
|       channelId = Common.channelIntegerIdToShortId(channelId); |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     if (this.fundingTxCache[channelId]) { |     if (this.fundingTxCache[channelId]) { | ||||||
|       return this.fundingTxCache[channelId]; |       return this.fundingTxCache[channelId]; | ||||||
|  | |||||||
| @ -8,30 +8,6 @@ import { isIP } from 'net'; | |||||||
| 
 | 
 | ||||||
| const fsPromises = promises; | const fsPromises = promises; | ||||||
| 
 | 
 | ||||||
| interface Node { |  | ||||||
|   id: string; |  | ||||||
|   timestamp: number; |  | ||||||
|   features: string; |  | ||||||
|   rgb_color: string; |  | ||||||
|   alias: string; |  | ||||||
|   addresses: unknown[]; |  | ||||||
|   out_degree: number; |  | ||||||
|   in_degree: number; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| interface Channel { |  | ||||||
|   channel_id: string; |  | ||||||
|   node1_pub: string; |  | ||||||
|   node2_pub: string; |  | ||||||
|   timestamp: number; |  | ||||||
|   features: string; |  | ||||||
|   fee_base_msat: number; |  | ||||||
|   fee_rate_milli_msat: number; |  | ||||||
|   htlc_minimim_msat: number; |  | ||||||
|   cltv_expiry_delta: number; |  | ||||||
|   htlc_maximum_msat: number; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| class LightningStatsImporter { | class LightningStatsImporter { | ||||||
|   topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; |   topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; | ||||||
| 
 | 
 | ||||||
| @ -59,11 +35,11 @@ class LightningStatsImporter { | |||||||
|       let isUnnanounced = true; |       let isUnnanounced = true; | ||||||
| 
 | 
 | ||||||
|       for (const socket of (node.addresses ?? [])) { |       for (const socket of (node.addresses ?? [])) { | ||||||
|         if (!socket.network?.length || !socket.addr?.length) { |         if (!socket.network?.length && !socket.addr?.length) { | ||||||
|           continue; |           continue; | ||||||
|         } |         } | ||||||
|         hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1; |         hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1 || socket.addr.indexOf('torv2') !== -1 || socket.addr.indexOf('torv3') !== -1; | ||||||
|         hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0])); |         hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0])) || socket.addr.indexOf('ipv4') !== -1 || socket.addr.indexOf('ipv6') !== -1;; | ||||||
|       } |       } | ||||||
|       if (hasOnion && hasClearnet) { |       if (hasOnion && hasClearnet) { | ||||||
|         clearnetTorNodes++; |         clearnetTorNodes++; | ||||||
| @ -262,83 +238,152 @@ class LightningStatsImporter { | |||||||
|    * Import topology files LN historical data into the database |    * Import topology files LN historical data into the database | ||||||
|    */ |    */ | ||||||
|   async $importHistoricalLightningStats(): Promise<void> { |   async $importHistoricalLightningStats(): Promise<void> { | ||||||
|     const fileList = await fsPromises.readdir(this.topologiesFolder); |     try { | ||||||
|     // Insert history from the most recent to the oldest
 |       let fileList: string[] = []; | ||||||
|     // This also put the .json cached files first
 |  | ||||||
|     fileList.sort().reverse(); |  | ||||||
| 
 |  | ||||||
|     const [rows]: any[] = await DB.query(` |  | ||||||
|       SELECT UNIX_TIMESTAMP(added) AS added, node_count |  | ||||||
|       FROM lightning_stats |  | ||||||
|       ORDER BY added DESC |  | ||||||
|     `);
 |  | ||||||
|     const existingStatsTimestamps = {}; |  | ||||||
|     for (const row of rows) { |  | ||||||
|       existingStatsTimestamps[row.added] = row; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     // For logging purpose
 |  | ||||||
|     let processed = 10; |  | ||||||
|     let totalProcessed = 0; |  | ||||||
|     let logStarted = false; |  | ||||||
| 
 |  | ||||||
|     for (const filename of fileList) { |  | ||||||
|       processed++; |  | ||||||
| 
 |  | ||||||
|       const timestamp = parseInt(filename.split('_')[1], 10); |  | ||||||
| 
 |  | ||||||
|       // Stats exist already, don't calculate/insert them
 |  | ||||||
|       if (existingStatsTimestamps[timestamp] !== undefined) { |  | ||||||
|         continue; |  | ||||||
|       } |  | ||||||
| 
 |  | ||||||
|       if (filename.indexOf('.topology') === -1) { |  | ||||||
|         continue; |  | ||||||
|       } |  | ||||||
| 
 |  | ||||||
|       logger.debug(`Reading ${this.topologiesFolder}/${filename}`); |  | ||||||
|       let fileContent = ''; |  | ||||||
|       try { |       try { | ||||||
|         fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); |         fileList = await fsPromises.readdir(this.topologiesFolder); | ||||||
|       } catch (e: any) { |       } catch (e) { | ||||||
|         if (e.errno == -1) { // EISDIR - Ignore directorie
 |         logger.err(`Unable to open topology folder at ${this.topologiesFolder}`); | ||||||
|  |         throw e; | ||||||
|  |       } | ||||||
|  |       // Insert history from the most recent to the oldest
 | ||||||
|  |       // This also put the .json cached files first
 | ||||||
|  |       fileList.sort().reverse(); | ||||||
|  | 
 | ||||||
|  |       const [rows]: any[] = await DB.query(` | ||||||
|  |         SELECT UNIX_TIMESTAMP(added) AS added, node_count | ||||||
|  |         FROM lightning_stats | ||||||
|  |         ORDER BY added DESC | ||||||
|  |       `);
 | ||||||
|  |       const existingStatsTimestamps = {}; | ||||||
|  |       for (const row of rows) { | ||||||
|  |         existingStatsTimestamps[row.added] = row; | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       // For logging purpose
 | ||||||
|  |       let processed = 10; | ||||||
|  |       let totalProcessed = 0; | ||||||
|  |       let logStarted = false; | ||||||
|  | 
 | ||||||
|  |       for (const filename of fileList) { | ||||||
|  |         processed++; | ||||||
|  | 
 | ||||||
|  |         const timestamp = parseInt(filename.split('_')[1], 10); | ||||||
|  | 
 | ||||||
|  |         // Stats exist already, don't calculate/insert them
 | ||||||
|  |         if (existingStatsTimestamps[timestamp] !== undefined) { | ||||||
|           continue; |           continue; | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         if (filename.indexOf('topology_') === -1) { | ||||||
|  |           continue; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         logger.debug(`Reading ${this.topologiesFolder}/${filename}`); | ||||||
|  |         let fileContent = ''; | ||||||
|  |         try { | ||||||
|  |           fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); | ||||||
|  |         } catch (e: any) { | ||||||
|  |           if (e.errno == -1) { // EISDIR - Ignore directorie
 | ||||||
|  |             continue; | ||||||
|  |           } | ||||||
|  |           logger.err(`Unable to open ${this.topologiesFolder}/${filename}`); | ||||||
|  |           continue; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let graph; | ||||||
|  |         try { | ||||||
|  |           graph = JSON.parse(fileContent); | ||||||
|  |           graph = await this.cleanupTopology(graph); | ||||||
|  |         } catch (e) { | ||||||
|  |           logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); | ||||||
|  |           continue; | ||||||
|  |         } | ||||||
|  |      | ||||||
|  |         if (!logStarted) { | ||||||
|  |           logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`); | ||||||
|  |           logStarted = true; | ||||||
|  |         } | ||||||
|  |          | ||||||
|  |         const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; | ||||||
|  |         logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`); | ||||||
|  | 
 | ||||||
|  |         totalProcessed++; | ||||||
|  | 
 | ||||||
|  |         if (processed > 10) { | ||||||
|  |           logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); | ||||||
|  |           processed = 0; | ||||||
|  |         } else { | ||||||
|  |           logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); | ||||||
|  |         } | ||||||
|  |         await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2))); | ||||||
|  |         const stat = await this.computeNetworkStats(timestamp, graph); | ||||||
|  | 
 | ||||||
|  |         existingStatsTimestamps[timestamp] = stat; | ||||||
|       } |       } | ||||||
| 
 | 
 | ||||||
|       let graph; |       if (totalProcessed > 0) { | ||||||
|       try { |         logger.info(`Lightning network stats historical import completed`); | ||||||
|         graph = JSON.parse(fileContent); |       } | ||||||
|       } catch (e) { |     } catch (e) { | ||||||
|         logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`); |       logger.err(`Lightning network stats historical failed. Reason: ${e instanceof Error ? e.message : e}`); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   async cleanupTopology(graph) { | ||||||
|  |     const newGraph = { | ||||||
|  |       nodes: <ILightningApi.Node[]>[], | ||||||
|  |       edges: <ILightningApi.Channel[]>[], | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     for (const node of graph.nodes) { | ||||||
|  |       const addressesParts = (node.addresses ?? '').split(','); | ||||||
|  |       const addresses: any[] = []; | ||||||
|  |       for (const address of addressesParts) { | ||||||
|  |         addresses.push({ | ||||||
|  |           network: '', | ||||||
|  |           addr: address | ||||||
|  |         }); | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       newGraph.nodes.push({ | ||||||
|  |         last_update: node.timestamp ?? 0, | ||||||
|  |         pub_key: node.id ?? null, | ||||||
|  |         alias: node.alias ?? null, | ||||||
|  |         addresses: addresses, | ||||||
|  |         color: node.rgb_color ?? null, | ||||||
|  |         features: {}, | ||||||
|  |       }); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     for (const adjacency of graph.adjacency) { | ||||||
|  |       if (adjacency.length === 0) { | ||||||
|         continue; |         continue; | ||||||
|       } |  | ||||||
|    |  | ||||||
|       if (!logStarted) { |  | ||||||
|         logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`); |  | ||||||
|         logStarted = true; |  | ||||||
|       } |  | ||||||
|        |  | ||||||
|       const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; |  | ||||||
|       logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`); |  | ||||||
| 
 |  | ||||||
|       totalProcessed++; |  | ||||||
| 
 |  | ||||||
|       if (processed > 10) { |  | ||||||
|         logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); |  | ||||||
|         processed = 0; |  | ||||||
|       } else { |       } else { | ||||||
|         logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`); |         for (const edge of adjacency) { | ||||||
|  |           newGraph.edges.push({ | ||||||
|  |             channel_id: edge.scid, | ||||||
|  |             chan_point: '', | ||||||
|  |             last_update: edge.timestamp, | ||||||
|  |             node1_pub: edge.source ?? null, | ||||||
|  |             node2_pub: edge.destination ?? null, | ||||||
|  |             capacity: '0', // Will be fetch later
 | ||||||
|  |             node1_policy: { | ||||||
|  |               time_lock_delta: edge.cltv_expiry_delta, | ||||||
|  |               min_htlc: edge.htlc_minimim_msat, | ||||||
|  |               fee_base_msat: edge.fee_base_msat, | ||||||
|  |               fee_rate_milli_msat: edge.fee_proportional_millionths, | ||||||
|  |               max_htlc_msat: edge.htlc_maximum_msat, | ||||||
|  |               last_update: edge.timestamp, | ||||||
|  |               disabled: false,           | ||||||
|  |             }, | ||||||
|  |             node2_policy: null, | ||||||
|  |           }); | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|       await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2))); |  | ||||||
|       const stat = await this.computeNetworkStats(timestamp, graph); |  | ||||||
| 
 |  | ||||||
|       existingStatsTimestamps[timestamp] = stat; |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if (totalProcessed > 0) { |     return newGraph; | ||||||
|       logger.info(`Lightning network stats historical import completed`); |  | ||||||
|     } |  | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user