Merge pull request #2267 from mempool/nymkappa/feature/node-status
Add `nodes.status` field
This commit is contained in:
		
						commit
						7012a480e8
					
				| @ -1,5 +1,6 @@ | |||||||
| import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces'; | import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces'; | ||||||
| import config from '../config'; | import config from '../config'; | ||||||
|  | import { convertChannelId } from './lightning/clightning/clightning-convert'; | ||||||
| export class Common { | export class Common { | ||||||
|   static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? |   static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? | ||||||
|     '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' |     '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' | ||||||
| @ -184,4 +185,37 @@ export class Common { | |||||||
|       config.MEMPOOL.BLOCKS_SUMMARIES_INDEXING === true |       config.MEMPOOL.BLOCKS_SUMMARIES_INDEXING === true | ||||||
|     ); |     ); | ||||||
|   } |   } | ||||||
|  | 
 | ||||||
|  |   static setDateMidnight(date: Date): void { | ||||||
|  |     date.setUTCHours(0); | ||||||
|  |     date.setUTCMinutes(0); | ||||||
|  |     date.setUTCSeconds(0); | ||||||
|  |     date.setUTCMilliseconds(0); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   static channelShortIdToIntegerId(id: string): string { | ||||||
|  |     if (config.LIGHTNING.BACKEND === 'lnd') { | ||||||
|  |       return id; | ||||||
|  |     } | ||||||
|  |     return convertChannelId(id); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** Decodes a channel id returned by lnd as uint64 to a short channel id */ | ||||||
|  |   static channelIntegerIdToShortId(id: string): string { | ||||||
|  |     if (config.LIGHTNING.BACKEND === 'cln') { | ||||||
|  |       return id; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     const n = BigInt(id); | ||||||
|  |     return [ | ||||||
|  |       n >> 40n, // nth block
 | ||||||
|  |       (n >> 16n) & 0xffffffn, // nth tx of the block
 | ||||||
|  |       n & 0xffffn // nth output of the tx
 | ||||||
|  |     ].join('x'); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   static utcDateToMysql(date?: number): string { | ||||||
|  |     const d = new Date((date || 0) * 1000); | ||||||
|  |     return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; | ||||||
|  |   } | ||||||
| } | } | ||||||
|  | |||||||
| @ -4,7 +4,7 @@ import logger from '../logger'; | |||||||
| import { Common } from './common'; | import { Common } from './common'; | ||||||
| 
 | 
 | ||||||
| class DatabaseMigration { | class DatabaseMigration { | ||||||
|   private static currentVersion = 35; |   private static currentVersion = 36; | ||||||
|   private queryTimeout = 120000; |   private queryTimeout = 120000; | ||||||
|   private statisticsAddedIndexed = false; |   private statisticsAddedIndexed = false; | ||||||
|   private uniqueLogs: string[] = []; |   private uniqueLogs: string[] = []; | ||||||
| @ -320,6 +320,10 @@ class DatabaseMigration { | |||||||
|       await this.$executeQuery('DELETE from `lightning_stats` WHERE added > "2021-09-19"'); |       await this.$executeQuery('DELETE from `lightning_stats` WHERE added > "2021-09-19"'); | ||||||
|       await this.$executeQuery('ALTER TABLE `lightning_stats` ADD CONSTRAINT added_unique UNIQUE (added);'); |       await this.$executeQuery('ALTER TABLE `lightning_stats` ADD CONSTRAINT added_unique UNIQUE (added);'); | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     if (databaseSchemaVersion < 36 && isBitcoin == true) { | ||||||
|  |       await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"'); | ||||||
|  |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   /** |   /** | ||||||
|  | |||||||
| @ -1,6 +1,9 @@ | |||||||
| import logger from '../../logger'; | import logger from '../../logger'; | ||||||
| import DB from '../../database'; | import DB from '../../database'; | ||||||
| import nodesApi from './nodes.api'; | import nodesApi from './nodes.api'; | ||||||
|  | import { ResultSetHeader } from 'mysql2'; | ||||||
|  | import { ILightningApi } from '../lightning/lightning-api.interface'; | ||||||
|  | import { Common } from '../common'; | ||||||
| 
 | 
 | ||||||
| class ChannelsApi { | class ChannelsApi { | ||||||
|   public async $getAllChannels(): Promise<any[]> { |   public async $getAllChannels(): Promise<any[]> { | ||||||
| @ -302,6 +305,135 @@ class ChannelsApi { | |||||||
|       }, |       }, | ||||||
|     }; |     }; | ||||||
|   } |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Save or update a channel present in the graph | ||||||
|  |    */ | ||||||
|  |   public async $saveChannel(channel: ILightningApi.Channel): Promise<void> { | ||||||
|  |     const [ txid, vout ] = channel.chan_point.split(':'); | ||||||
|  | 
 | ||||||
|  |     const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {}; | ||||||
|  |     const policy2: Partial<ILightningApi.RoutingPolicy> = channel.node2_policy || {}; | ||||||
|  | 
 | ||||||
|  |     const query = `INSERT INTO channels
 | ||||||
|  |       ( | ||||||
|  |         id, | ||||||
|  |         short_id, | ||||||
|  |         capacity, | ||||||
|  |         transaction_id, | ||||||
|  |         transaction_vout, | ||||||
|  |         updated_at, | ||||||
|  |         status, | ||||||
|  |         node1_public_key, | ||||||
|  |         node1_base_fee_mtokens, | ||||||
|  |         node1_cltv_delta, | ||||||
|  |         node1_fee_rate, | ||||||
|  |         node1_is_disabled, | ||||||
|  |         node1_max_htlc_mtokens, | ||||||
|  |         node1_min_htlc_mtokens, | ||||||
|  |         node1_updated_at, | ||||||
|  |         node2_public_key, | ||||||
|  |         node2_base_fee_mtokens, | ||||||
|  |         node2_cltv_delta, | ||||||
|  |         node2_fee_rate, | ||||||
|  |         node2_is_disabled, | ||||||
|  |         node2_max_htlc_mtokens, | ||||||
|  |         node2_min_htlc_mtokens, | ||||||
|  |         node2_updated_at | ||||||
|  |       ) | ||||||
|  |       VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | ||||||
|  |       ON DUPLICATE KEY UPDATE | ||||||
|  |         capacity = ?, | ||||||
|  |         updated_at = ?, | ||||||
|  |         status = 1, | ||||||
|  |         node1_public_key = ?, | ||||||
|  |         node1_base_fee_mtokens = ?, | ||||||
|  |         node1_cltv_delta = ?, | ||||||
|  |         node1_fee_rate = ?, | ||||||
|  |         node1_is_disabled = ?, | ||||||
|  |         node1_max_htlc_mtokens = ?, | ||||||
|  |         node1_min_htlc_mtokens = ?, | ||||||
|  |         node1_updated_at = ?, | ||||||
|  |         node2_public_key = ?, | ||||||
|  |         node2_base_fee_mtokens = ?, | ||||||
|  |         node2_cltv_delta = ?, | ||||||
|  |         node2_fee_rate = ?, | ||||||
|  |         node2_is_disabled = ?, | ||||||
|  |         node2_max_htlc_mtokens = ?, | ||||||
|  |         node2_min_htlc_mtokens = ?, | ||||||
|  |         node2_updated_at = ? | ||||||
|  |       ;`;
 | ||||||
|  | 
 | ||||||
|  |     await DB.query(query, [ | ||||||
|  |       Common.channelShortIdToIntegerId(channel.channel_id), | ||||||
|  |       Common.channelIntegerIdToShortId(channel.channel_id), | ||||||
|  |       channel.capacity, | ||||||
|  |       txid, | ||||||
|  |       vout, | ||||||
|  |       Common.utcDateToMysql(channel.last_update), | ||||||
|  |       channel.node1_pub, | ||||||
|  |       policy1.fee_base_msat, | ||||||
|  |       policy1.time_lock_delta, | ||||||
|  |       policy1.fee_rate_milli_msat, | ||||||
|  |       policy1.disabled, | ||||||
|  |       policy1.max_htlc_msat, | ||||||
|  |       policy1.min_htlc, | ||||||
|  |       Common.utcDateToMysql(policy1.last_update), | ||||||
|  |       channel.node2_pub, | ||||||
|  |       policy2.fee_base_msat, | ||||||
|  |       policy2.time_lock_delta, | ||||||
|  |       policy2.fee_rate_milli_msat, | ||||||
|  |       policy2.disabled, | ||||||
|  |       policy2.max_htlc_msat, | ||||||
|  |       policy2.min_htlc, | ||||||
|  |       Common.utcDateToMysql(policy2.last_update), | ||||||
|  |       channel.capacity, | ||||||
|  |       Common.utcDateToMysql(channel.last_update), | ||||||
|  |       channel.node1_pub, | ||||||
|  |       policy1.fee_base_msat, | ||||||
|  |       policy1.time_lock_delta, | ||||||
|  |       policy1.fee_rate_milli_msat, | ||||||
|  |       policy1.disabled, | ||||||
|  |       policy1.max_htlc_msat, | ||||||
|  |       policy1.min_htlc, | ||||||
|  |       Common.utcDateToMysql(policy1.last_update), | ||||||
|  |       channel.node2_pub, | ||||||
|  |       policy2.fee_base_msat, | ||||||
|  |       policy2.time_lock_delta, | ||||||
|  |       policy2.fee_rate_milli_msat, | ||||||
|  |       policy2.disabled, | ||||||
|  |       policy2.max_htlc_msat, | ||||||
|  |       policy2.min_htlc, | ||||||
|  |       Common.utcDateToMysql(policy2.last_update) | ||||||
|  |     ]); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Set all channels not in `graphChannelsIds` as inactive (status = 0) | ||||||
|  |    */ | ||||||
|  |   public async $setChannelsInactive(graphChannelsIds: string[]): Promise<void> { | ||||||
|  |     if (graphChannelsIds.length === 0) { | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     try { | ||||||
|  |       const result = await DB.query<ResultSetHeader>(` | ||||||
|  |         UPDATE channels | ||||||
|  |         SET status = 0 | ||||||
|  |         WHERE short_id NOT IN ( | ||||||
|  |           ${graphChannelsIds.map(id => `"${id}"`).join(',')} | ||||||
|  |         ) | ||||||
|  |         AND status != 2 | ||||||
|  |       `);
 | ||||||
|  |       if (result[0].changedRows ?? 0 > 0) { | ||||||
|  |         logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`); | ||||||
|  |       } else { | ||||||
|  |         logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`); | ||||||
|  |       } | ||||||
|  |     } catch (e) { | ||||||
|  |       logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| export default new ChannelsApi(); | export default new ChannelsApi(); | ||||||
|  | |||||||
| @ -1,5 +1,7 @@ | |||||||
| import logger from '../../logger'; | import logger from '../../logger'; | ||||||
| import DB from '../../database'; | import DB from '../../database'; | ||||||
|  | import { ResultSetHeader } from 'mysql2'; | ||||||
|  | import { ILightningApi } from '../lightning/lightning-api.interface'; | ||||||
| 
 | 
 | ||||||
| class NodesApi { | class NodesApi { | ||||||
|   public async $getNode(public_key: string): Promise<any> { |   public async $getNode(public_key: string): Promise<any> { | ||||||
| @ -321,6 +323,66 @@ class NodesApi { | |||||||
|       throw e; |       throw e; | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Save or update a node present in the graph | ||||||
|  |    */ | ||||||
|  |   public async $saveNode(node: ILightningApi.Node): Promise<void> { | ||||||
|  |     try { | ||||||
|  |       const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; | ||||||
|  |       const query = `INSERT INTO nodes(
 | ||||||
|  |           public_key, | ||||||
|  |           first_seen, | ||||||
|  |           updated_at, | ||||||
|  |           alias, | ||||||
|  |           color, | ||||||
|  |           sockets, | ||||||
|  |           status | ||||||
|  |         ) | ||||||
|  |         VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?, 1) | ||||||
|  |         ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?, status = 1`;
 | ||||||
|  | 
 | ||||||
|  |       await DB.query(query, [ | ||||||
|  |         node.pub_key, | ||||||
|  |         node.last_update, | ||||||
|  |         node.alias, | ||||||
|  |         node.color, | ||||||
|  |         sockets, | ||||||
|  |         node.last_update, | ||||||
|  |         node.alias, | ||||||
|  |         node.color, | ||||||
|  |         sockets, | ||||||
|  |       ]); | ||||||
|  |     } catch (e) { | ||||||
|  |       logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Set all nodes not in `nodesPubkeys` as inactive (status = 0) | ||||||
|  |    */ | ||||||
|  |    public async $setNodesInactive(graphNodesPubkeys: string[]): Promise<void> { | ||||||
|  |     if (graphNodesPubkeys.length === 0) { | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     try { | ||||||
|  |       const result = await DB.query<ResultSetHeader>(` | ||||||
|  |         UPDATE nodes | ||||||
|  |         SET status = 0 | ||||||
|  |         WHERE public_key NOT IN ( | ||||||
|  |           ${graphNodesPubkeys.map(pubkey => `"${pubkey}"`).join(',')} | ||||||
|  |         ) | ||||||
|  |       `);
 | ||||||
|  |       if (result[0].changedRows ?? 0 > 0) { | ||||||
|  |         logger.info(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`); | ||||||
|  |       } else { | ||||||
|  |         logger.debug(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`); | ||||||
|  |       } | ||||||
|  |     } catch (e) { | ||||||
|  |       logger.err('$setNodesInactive() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| export default new NodesApi(); | export default new NodesApi(); | ||||||
|  | |||||||
| @ -32,7 +32,8 @@ interface IConfig { | |||||||
|     ENABLED: boolean; |     ENABLED: boolean; | ||||||
|     BACKEND: 'lnd' | 'cln' | 'ldk'; |     BACKEND: 'lnd' | 'cln' | 'ldk'; | ||||||
|     TOPOLOGY_FOLDER: string; |     TOPOLOGY_FOLDER: string; | ||||||
|     NODE_STATS_REFRESH_INTERVAL: number; |     STATS_REFRESH_INTERVAL: number; | ||||||
|  |     GRAPH_REFRESH_INTERVAL: number; | ||||||
|   }; |   }; | ||||||
|   LND: { |   LND: { | ||||||
|     TLS_CERT_PATH: string; |     TLS_CERT_PATH: string; | ||||||
| @ -184,7 +185,8 @@ const defaults: IConfig = { | |||||||
|     'ENABLED': false, |     'ENABLED': false, | ||||||
|     'BACKEND': 'lnd', |     'BACKEND': 'lnd', | ||||||
|     'TOPOLOGY_FOLDER': '', |     'TOPOLOGY_FOLDER': '', | ||||||
|     'NODE_STATS_REFRESH_INTERVAL': 600, |     'STATS_REFRESH_INTERVAL': 600, | ||||||
|  |     'GRAPH_REFRESH_INTERVAL': 600, | ||||||
|   }, |   }, | ||||||
|   'LND': { |   'LND': { | ||||||
|     'TLS_CERT_PATH': '', |     'TLS_CERT_PATH': '', | ||||||
|  | |||||||
| @ -1,7 +1,7 @@ | |||||||
| import config from './config'; | import config from './config'; | ||||||
| import { createPool, Pool, PoolConnection } from 'mysql2/promise'; | import { createPool, Pool, PoolConnection } from 'mysql2/promise'; | ||||||
| import logger from './logger'; | import logger from './logger'; | ||||||
| import { PoolOptions } from 'mysql2/typings/mysql'; | import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } from 'mysql2/typings/mysql'; | ||||||
| 
 | 
 | ||||||
|  class DB { |  class DB { | ||||||
|   constructor() { |   constructor() { | ||||||
| @ -28,7 +28,9 @@ import { PoolOptions } from 'mysql2/typings/mysql'; | |||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   public async query(query, params?) { |   public async query<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket | | ||||||
|  |     OkPacket[] | ResultSetHeader>(query, params?): Promise<[T, FieldPacket[]]> | ||||||
|  |   { | ||||||
|     this.checkDBFlag(); |     this.checkDBFlag(); | ||||||
|     const pool = await this.getPool(); |     const pool = await this.getPool(); | ||||||
|     return pool.query(query, params); |     return pool.query(query, params); | ||||||
|  | |||||||
| @ -1,60 +1,43 @@ | |||||||
| import DB from '../../database'; | import DB from '../../database'; | ||||||
| import logger from '../../logger'; | import logger from '../../logger'; | ||||||
| import channelsApi from '../../api/explorer/channels.api'; | import channelsApi from '../../api/explorer/channels.api'; | ||||||
| import bitcoinClient from '../../api/bitcoin/bitcoin-client'; |  | ||||||
| import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory'; | import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory'; | ||||||
| import config from '../../config'; | import config from '../../config'; | ||||||
| import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; | import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; | ||||||
| import { ILightningApi } from '../../api/lightning/lightning-api.interface'; | import { ILightningApi } from '../../api/lightning/lightning-api.interface'; | ||||||
| import { $lookupNodeLocation } from './sync-tasks/node-locations'; | import { $lookupNodeLocation } from './sync-tasks/node-locations'; | ||||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||||
| import { convertChannelId } from '../../api/lightning/clightning/clightning-convert'; | import nodesApi from '../../api/explorer/nodes.api'; | ||||||
| import { Common } from '../../api/common'; | import { ResultSetHeader } from 'mysql2'; | ||||||
|  | import fundingTxFetcher from './sync-tasks/funding-tx-fetcher'; | ||||||
| 
 | 
 | ||||||
| class NetworkSyncService { | class NetworkSyncService { | ||||||
|  |   loggerTimer = 0; | ||||||
|  | 
 | ||||||
|   constructor() {} |   constructor() {} | ||||||
| 
 | 
 | ||||||
|   public async $startService() { |   public async $startService(): Promise<void> { | ||||||
|     logger.info('Starting node sync service'); |     logger.info('Starting lightning network sync service'); | ||||||
| 
 | 
 | ||||||
|     await this.$runUpdater(); |     this.loggerTimer = new Date().getTime() / 1000; | ||||||
| 
 | 
 | ||||||
|     setInterval(async () => { |     await this.$runTasks(); | ||||||
|       await this.$runUpdater(); |  | ||||||
|     }, 1000 * 60 * 60); |  | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private async $runUpdater(): Promise<void> { |   private async $runTasks(): Promise<void> { | ||||||
|     try { |     try { | ||||||
|       logger.info(`Updating nodes and channels...`); |       logger.info(`Updating nodes and channels`); | ||||||
| 
 | 
 | ||||||
|       const networkGraph = await lightningApi.$getNetworkGraph(); |       const networkGraph = await lightningApi.$getNetworkGraph(); | ||||||
|       if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) { |       if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) { | ||||||
|         logger.info(`LN Network graph is empty, retrying in 10 seconds`); |         logger.info(`LN Network graph is empty, retrying in 10 seconds`); | ||||||
|         await Common.sleep$(10000); |         setTimeout(() => { this.$runTasks(); }, 10000); | ||||||
|         this.$runUpdater(); |  | ||||||
|         return; |         return; | ||||||
|       } |       } | ||||||
| 
 | 
 | ||||||
|       for (const node of networkGraph.nodes) { |       await this.$updateNodesList(networkGraph.nodes); | ||||||
|         await this.$saveNode(node); |       await this.$updateChannelsList(networkGraph.edges); | ||||||
|       } |       await this.$deactivateChannelsWithoutActiveNodes(); | ||||||
|       logger.info(`Nodes updated.`); |  | ||||||
| 
 |  | ||||||
|       if (config.MAXMIND.ENABLED) { |  | ||||||
|         await $lookupNodeLocation(); |  | ||||||
|       } |  | ||||||
| 
 |  | ||||||
|       const graphChannelsIds: string[] = []; |  | ||||||
|       for (const channel of networkGraph.edges) { |  | ||||||
|         await this.$saveChannel(channel); |  | ||||||
|         graphChannelsIds.push(channel.channel_id); |  | ||||||
|       } |  | ||||||
|       await this.$setChannelsInactive(graphChannelsIds); |  | ||||||
| 
 |  | ||||||
|       logger.info(`Channels updated.`); |  | ||||||
| 
 |  | ||||||
|       await this.$findInactiveNodesAndChannels(); |  | ||||||
|       await this.$lookUpCreationDateFromChain(); |       await this.$lookUpCreationDateFromChain(); | ||||||
|       await this.$updateNodeFirstSeen(); |       await this.$updateNodeFirstSeen(); | ||||||
|       await this.$scanForClosedChannels(); |       await this.$scanForClosedChannels(); | ||||||
| @ -63,84 +46,183 @@ class NetworkSyncService { | |||||||
|       } |       } | ||||||
| 
 | 
 | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err('$runUpdater() error: ' + (e instanceof Error ? e.message : e)); |       logger.err('$runTasks() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.GRAPH_REFRESH_INTERVAL); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Update the `nodes` table to reflect the current network graph state | ||||||
|  |    */ | ||||||
|  |   private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> { | ||||||
|  |     let progress = 0; | ||||||
|  | 
 | ||||||
|  |     const graphNodesPubkeys: string[] = []; | ||||||
|  |     for (const node of nodes) { | ||||||
|  |       await nodesApi.$saveNode(node); | ||||||
|  |       graphNodesPubkeys.push(node.pub_key); | ||||||
|  |       ++progress; | ||||||
|  | 
 | ||||||
|  |       const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |       if (elapsedSeconds > 10) { | ||||||
|  |         logger.info(`Updating node ${progress}/${nodes.length}`); | ||||||
|  |         this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |     logger.info(`${progress} nodes updated`); | ||||||
|  | 
 | ||||||
|  |     // If a channel if not present in the graph, mark it as inactive
 | ||||||
|  |     nodesApi.$setNodesInactive(graphNodesPubkeys); | ||||||
|  | 
 | ||||||
|  |     if (config.MAXMIND.ENABLED) { | ||||||
|  |       $lookupNodeLocation(); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   /** | ||||||
|  |    * Update the `channels` table to reflect the current network graph state | ||||||
|  |    */ | ||||||
|  |   private async $updateChannelsList(channels: ILightningApi.Channel[]): Promise<void> { | ||||||
|  |     try { | ||||||
|  |       let progress = 0; | ||||||
|  | 
 | ||||||
|  |       const graphChannelsIds: string[] = []; | ||||||
|  |       for (const channel of channels) { | ||||||
|  |         await channelsApi.$saveChannel(channel); | ||||||
|  |         graphChannelsIds.push(channel.channel_id); | ||||||
|  |         ++progress; | ||||||
|  | 
 | ||||||
|  |         const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |         if (elapsedSeconds > 10) { | ||||||
|  |           logger.info(`Updating channel ${progress}/${channels.length}`); | ||||||
|  |           this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |         } | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       logger.info(`${progress} channels updated`); | ||||||
|  | 
 | ||||||
|  |       // If a channel if not present in the graph, mark it as inactive
 | ||||||
|  |       channelsApi.$setChannelsInactive(graphChannelsIds); | ||||||
|  |     } catch (e) { | ||||||
|  |       logger.err(`Cannot update channel list. Reason: ${(e instanceof Error ? e.message : e)}`); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   // This method look up the creation date of the earliest channel of the node
 |   // This method look up the creation date of the earliest channel of the node
 | ||||||
|   // and update the node to that date in order to get the earliest first seen date
 |   // and update the node to that date in order to get the earliest first seen date
 | ||||||
|   private async $updateNodeFirstSeen() { |   private async $updateNodeFirstSeen(): Promise<void> { | ||||||
|  |     let progress = 0; | ||||||
|  |     let updated = 0; | ||||||
|  | 
 | ||||||
|     try { |     try { | ||||||
|       const [nodes]: any[] = await DB.query(`SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node1_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created1, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node2_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created2 FROM nodes`); |       const [nodes]: any[] = await DB.query(` | ||||||
|  |         SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen, | ||||||
|  |         ( | ||||||
|  |           SELECT MIN(UNIX_TIMESTAMP(created)) | ||||||
|  |           FROM channels | ||||||
|  |           WHERE channels.node1_public_key = nodes.public_key | ||||||
|  |         ) AS created1, | ||||||
|  |         ( | ||||||
|  |           SELECT MIN(UNIX_TIMESTAMP(created)) | ||||||
|  |           FROM channels | ||||||
|  |           WHERE channels.node2_public_key = nodes.public_key | ||||||
|  |         ) AS created2 | ||||||
|  |         FROM nodes | ||||||
|  |       `);
 | ||||||
|  | 
 | ||||||
|       for (const node of nodes) { |       for (const node of nodes) { | ||||||
|         let lowest = 0; |         const lowest = Math.min( | ||||||
|         if (node.created1) { |           node.created1 ?? Number.MAX_SAFE_INTEGER, | ||||||
|           if (node.created2 && node.created2 < node.created1) { |           node.created2 ?? Number.MAX_SAFE_INTEGER, | ||||||
|             lowest = node.created2; |           node.first_seen ?? Number.MAX_SAFE_INTEGER | ||||||
|           } else { |         ); | ||||||
|             lowest = node.created1; |         if (lowest < node.first_seen) { | ||||||
|           } |  | ||||||
|         } else if (node.created2) { |  | ||||||
|           lowest = node.created2; |  | ||||||
|         } |  | ||||||
|         if (lowest && lowest < node.first_seen) { |  | ||||||
|           const query = `UPDATE nodes SET first_seen = FROM_UNIXTIME(?) WHERE public_key = ?`; |           const query = `UPDATE nodes SET first_seen = FROM_UNIXTIME(?) WHERE public_key = ?`; | ||||||
|           const params = [lowest, node.public_key]; |           const params = [lowest, node.public_key]; | ||||||
|           await DB.query(query, params); |           await DB.query(query, params); | ||||||
|         } |         } | ||||||
|  |         ++progress; | ||||||
|  |         const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |         if (elapsedSeconds > 10) { | ||||||
|  |           logger.info(`Updating node first seen date ${progress}/${nodes.length}`); | ||||||
|  |           this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |           ++updated; | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|       logger.info(`Node first seen dates scan complete.`); |       logger.info(`Updated ${updated} node first seen dates`); | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err('$updateNodeFirstSeen() error: ' + (e instanceof Error ? e.message : e)); |       logger.err('$updateNodeFirstSeen() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private async $lookUpCreationDateFromChain() { |   private async $lookUpCreationDateFromChain(): Promise<void> { | ||||||
|     logger.info(`Running channel creation date lookup...`); |     let progress = 0; | ||||||
|  | 
 | ||||||
|  |     logger.info(`Running channel creation date lookup`); | ||||||
|     try { |     try { | ||||||
|       const channels = await channelsApi.$getChannelsWithoutCreatedDate(); |       const channels = await channelsApi.$getChannelsWithoutCreatedDate(); | ||||||
|       for (const channel of channels) { |       for (const channel of channels) { | ||||||
|         const transaction = await bitcoinClient.getRawTransaction(channel.transaction_id, 1); |         const transaction = await fundingTxFetcher.$fetchChannelOpenTx(channel.short_id); | ||||||
|         await DB.query(`UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`, [transaction.blocktime, channel.id]); |         await DB.query(` | ||||||
|  |           UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`,
 | ||||||
|  |           [transaction.timestamp, channel.id] | ||||||
|  |         ); | ||||||
|  |         ++progress; | ||||||
|  |         const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |         if (elapsedSeconds > 10) { | ||||||
|  |           logger.info(`Updating channel creation date ${progress}/${channels.length}`); | ||||||
|  |           this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|       logger.info(`Channel creation dates scan complete.`); |       logger.info(`Updated ${channels.length} channels' creation date`); | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err('$setCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e)); |       logger.err('$lookUpCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   // Looking for channels whos nodes are inactive
 |   /** | ||||||
|   private async $findInactiveNodesAndChannels(): Promise<void> { |    * If a channel does not have any active node linked to it, then also | ||||||
|     logger.info(`Running inactive channels scan...`); |    * mark that channel as inactive | ||||||
|  |    */ | ||||||
|  |   private async $deactivateChannelsWithoutActiveNodes(): Promise<void> { | ||||||
|  |     logger.info(`Find channels which nodes are offline`); | ||||||
| 
 | 
 | ||||||
|     try { |     try { | ||||||
|       const [channels]: [{ id: string }[]] = await <any>DB.query(` |       const result = await DB.query<ResultSetHeader>(` | ||||||
|         SELECT channels.id |         UPDATE channels | ||||||
|         FROM channels |         SET status = 0 | ||||||
|         WHERE channels.status = 1 |         WHERE channels.status = 1 | ||||||
|         AND ( |         AND ( | ||||||
|           ( |           ( | ||||||
|             SELECT COUNT(*) |             SELECT COUNT(*) | ||||||
|             FROM nodes |             FROM nodes | ||||||
|             WHERE nodes.public_key = channels.node1_public_key |             WHERE nodes.public_key = channels.node1_public_key | ||||||
|  |             AND nodes.status = 1 | ||||||
|           ) = 0 |           ) = 0 | ||||||
|         OR ( |         OR ( | ||||||
|             SELECT COUNT(*) |             SELECT COUNT(*) | ||||||
|             FROM nodes |             FROM nodes | ||||||
|             WHERE nodes.public_key = channels.node2_public_key |             WHERE nodes.public_key = channels.node2_public_key | ||||||
|  |             AND nodes.status = 1 | ||||||
|           ) = 0) |           ) = 0) | ||||||
|         `);
 |         `);
 | ||||||
| 
 | 
 | ||||||
|       for (const channel of channels) { |       if (result[0].changedRows ?? 0 > 0) { | ||||||
|         await this.$updateChannelStatus(channel.id, 0); |         logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`); | ||||||
|  |       } else { | ||||||
|  |         logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`); | ||||||
|       } |       } | ||||||
|       logger.info(`Inactive channels scan complete.`); |  | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err('$findInactiveNodesAndChannels() error: ' + (e instanceof Error ? e.message : e)); |       logger.err('$deactivateChannelsWithoutActiveNodes() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private async $scanForClosedChannels(): Promise<void> { |   private async $scanForClosedChannels(): Promise<void> { | ||||||
|  |     let progress = 0; | ||||||
|  | 
 | ||||||
|     try { |     try { | ||||||
|       logger.info(`Starting closed channels scan...`); |       logger.info(`Starting closed channels scan...`); | ||||||
|       const channels = await channelsApi.$getChannelsByStatus(0); |       const channels = await channelsApi.$getChannelsByStatus(0); | ||||||
| @ -154,6 +236,13 @@ class NetworkSyncService { | |||||||
|             await DB.query(`UPDATE channels SET closing_transaction_id = ? WHERE id = ?`, [spendingTx.txid, channel.id]); |             await DB.query(`UPDATE channels SET closing_transaction_id = ? WHERE id = ?`, [spendingTx.txid, channel.id]); | ||||||
|           } |           } | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         ++progress; | ||||||
|  |         const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |         if (elapsedSeconds > 10) { | ||||||
|  |           logger.info(`Checking if channel has been closed ${progress}/${channels.length}`); | ||||||
|  |           this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|       logger.info(`Closed channels scan complete.`); |       logger.info(`Closed channels scan complete.`); | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
| @ -171,6 +260,9 @@ class NetworkSyncService { | |||||||
|     if (!config.ESPLORA.REST_API_URL) { |     if (!config.ESPLORA.REST_API_URL) { | ||||||
|       return; |       return; | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     let progress = 0; | ||||||
|  | 
 | ||||||
|     try { |     try { | ||||||
|       logger.info(`Started running closed channel forensics...`); |       logger.info(`Started running closed channel forensics...`); | ||||||
|       const channels = await channelsApi.$getClosedChannelsWithoutReason(); |       const channels = await channelsApi.$getClosedChannelsWithoutReason(); | ||||||
| @ -216,6 +308,13 @@ class NetworkSyncService { | |||||||
|           logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.'); |           logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.'); | ||||||
|           await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]); |           await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]); | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         ++progress; | ||||||
|  |         const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); | ||||||
|  |         if (elapsedSeconds > 10) { | ||||||
|  |           logger.info(`Updating channel closed channel forensics ${progress}/${channels.length}`); | ||||||
|  |           this.loggerTimer = new Date().getTime() / 1000; | ||||||
|  |         } | ||||||
|       } |       } | ||||||
|       logger.info(`Closed channels forensics scan complete.`); |       logger.info(`Closed channels forensics scan complete.`); | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
| @ -270,195 +369,6 @@ class NetworkSyncService { | |||||||
|       } |       } | ||||||
|       return 1; |       return 1; | ||||||
|   } |   } | ||||||
| 
 |  | ||||||
|   private async $saveChannel(channel: ILightningApi.Channel): Promise<void> { |  | ||||||
|     const [ txid, vout ] = channel.chan_point.split(':'); |  | ||||||
| 
 |  | ||||||
|     const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {}; |  | ||||||
|     const policy2: Partial<ILightningApi.RoutingPolicy> = channel.node2_policy || {}; |  | ||||||
| 
 |  | ||||||
|     try { |  | ||||||
|       const query = `INSERT INTO channels
 |  | ||||||
|         ( |  | ||||||
|           id, |  | ||||||
|           short_id, |  | ||||||
|           capacity, |  | ||||||
|           transaction_id, |  | ||||||
|           transaction_vout, |  | ||||||
|           updated_at, |  | ||||||
|           status, |  | ||||||
|           node1_public_key, |  | ||||||
|           node1_base_fee_mtokens, |  | ||||||
|           node1_cltv_delta, |  | ||||||
|           node1_fee_rate, |  | ||||||
|           node1_is_disabled, |  | ||||||
|           node1_max_htlc_mtokens, |  | ||||||
|           node1_min_htlc_mtokens, |  | ||||||
|           node1_updated_at, |  | ||||||
|           node2_public_key, |  | ||||||
|           node2_base_fee_mtokens, |  | ||||||
|           node2_cltv_delta, |  | ||||||
|           node2_fee_rate, |  | ||||||
|           node2_is_disabled, |  | ||||||
|           node2_max_htlc_mtokens, |  | ||||||
|           node2_min_htlc_mtokens, |  | ||||||
|           node2_updated_at |  | ||||||
|         ) |  | ||||||
|         VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |  | ||||||
|         ON DUPLICATE KEY UPDATE |  | ||||||
|           capacity = ?, |  | ||||||
|           updated_at = ?, |  | ||||||
|           status = 1, |  | ||||||
|           node1_public_key = ?, |  | ||||||
|           node1_base_fee_mtokens = ?, |  | ||||||
|           node1_cltv_delta = ?, |  | ||||||
|           node1_fee_rate = ?, |  | ||||||
|           node1_is_disabled = ?, |  | ||||||
|           node1_max_htlc_mtokens = ?, |  | ||||||
|           node1_min_htlc_mtokens = ?, |  | ||||||
|           node1_updated_at = ?, |  | ||||||
|           node2_public_key = ?, |  | ||||||
|           node2_base_fee_mtokens = ?, |  | ||||||
|           node2_cltv_delta = ?, |  | ||||||
|           node2_fee_rate = ?, |  | ||||||
|           node2_is_disabled = ?, |  | ||||||
|           node2_max_htlc_mtokens = ?, |  | ||||||
|           node2_min_htlc_mtokens = ?, |  | ||||||
|           node2_updated_at = ? |  | ||||||
|         ;`;
 |  | ||||||
| 
 |  | ||||||
|       await DB.query(query, [ |  | ||||||
|         this.toIntegerId(channel.channel_id), |  | ||||||
|         this.toShortId(channel.channel_id), |  | ||||||
|         channel.capacity, |  | ||||||
|         txid, |  | ||||||
|         vout, |  | ||||||
|         this.utcDateToMysql(channel.last_update), |  | ||||||
|         channel.node1_pub, |  | ||||||
|         policy1.fee_base_msat, |  | ||||||
|         policy1.time_lock_delta, |  | ||||||
|         policy1.fee_rate_milli_msat, |  | ||||||
|         policy1.disabled, |  | ||||||
|         policy1.max_htlc_msat, |  | ||||||
|         policy1.min_htlc, |  | ||||||
|         this.utcDateToMysql(policy1.last_update), |  | ||||||
|         channel.node2_pub, |  | ||||||
|         policy2.fee_base_msat, |  | ||||||
|         policy2.time_lock_delta, |  | ||||||
|         policy2.fee_rate_milli_msat, |  | ||||||
|         policy2.disabled, |  | ||||||
|         policy2.max_htlc_msat, |  | ||||||
|         policy2.min_htlc, |  | ||||||
|         this.utcDateToMysql(policy2.last_update), |  | ||||||
|         channel.capacity, |  | ||||||
|         this.utcDateToMysql(channel.last_update), |  | ||||||
|         channel.node1_pub, |  | ||||||
|         policy1.fee_base_msat, |  | ||||||
|         policy1.time_lock_delta, |  | ||||||
|         policy1.fee_rate_milli_msat, |  | ||||||
|         policy1.disabled, |  | ||||||
|         policy1.max_htlc_msat, |  | ||||||
|         policy1.min_htlc, |  | ||||||
|         this.utcDateToMysql(policy1.last_update), |  | ||||||
|         channel.node2_pub, |  | ||||||
|         policy2.fee_base_msat, |  | ||||||
|         policy2.time_lock_delta, |  | ||||||
|         policy2.fee_rate_milli_msat, |  | ||||||
|         policy2.disabled, |  | ||||||
|         policy2.max_htlc_msat, |  | ||||||
|         policy2.min_htlc, |  | ||||||
|         this.utcDateToMysql(policy2.last_update) |  | ||||||
|       ]); |  | ||||||
|     } catch (e) { |  | ||||||
|       logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e)); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private async $updateChannelStatus(channelId: string, status: number): Promise<void> { |  | ||||||
|     try { |  | ||||||
|       await DB.query(`UPDATE channels SET status = ? WHERE id = ?`, [status, channelId]); |  | ||||||
|     } catch (e) { |  | ||||||
|       logger.err('$updateChannelStatus() error: ' + (e instanceof Error ? e.message : e)); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private async $setChannelsInactive(graphChannelsIds: string[]): Promise<void> { |  | ||||||
|     if (graphChannelsIds.length === 0) { |  | ||||||
|       return; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     try { |  | ||||||
|       await DB.query(` |  | ||||||
|         UPDATE channels |  | ||||||
|         SET status = 0 |  | ||||||
|         WHERE short_id NOT IN ( |  | ||||||
|           ${graphChannelsIds.map(id => `"${id}"`).join(',')} |  | ||||||
|         ) |  | ||||||
|         AND status != 2 |  | ||||||
|       `);
 |  | ||||||
|     } catch (e) { |  | ||||||
|       logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e)); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private async $saveNode(node: ILightningApi.Node): Promise<void> { |  | ||||||
|     try { |  | ||||||
|       const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; |  | ||||||
|       const query = `INSERT INTO nodes(
 |  | ||||||
|           public_key, |  | ||||||
|           first_seen, |  | ||||||
|           updated_at, |  | ||||||
|           alias, |  | ||||||
|           color, |  | ||||||
|           sockets |  | ||||||
|         ) |  | ||||||
|         VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?) |  | ||||||
|         ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?`;
 |  | ||||||
| 
 |  | ||||||
|       await DB.query(query, [ |  | ||||||
|         node.pub_key, |  | ||||||
|         node.last_update, |  | ||||||
|         node.alias, |  | ||||||
|         node.color, |  | ||||||
|         sockets, |  | ||||||
|         node.last_update, |  | ||||||
|         node.alias, |  | ||||||
|         node.color, |  | ||||||
|         sockets, |  | ||||||
|       ]); |  | ||||||
|     } catch (e) { |  | ||||||
|       logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e)); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private toIntegerId(id: string): string { |  | ||||||
|     if (config.LIGHTNING.BACKEND === 'cln') { |  | ||||||
|       return convertChannelId(id); |  | ||||||
|     } |  | ||||||
|     else if (config.LIGHTNING.BACKEND === 'lnd') { |  | ||||||
|       return id; |  | ||||||
|     } |  | ||||||
|     return ''; |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   /** Decodes a channel id returned by lnd as uint64 to a short channel id */ |  | ||||||
|   private toShortId(id: string): string { |  | ||||||
|     if (config.LIGHTNING.BACKEND === 'cln') { |  | ||||||
|       return id; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     const n = BigInt(id); |  | ||||||
|     return [ |  | ||||||
|       n >> 40n, // nth block
 |  | ||||||
|       (n >> 16n) & 0xffffffn, // nth tx of the block
 |  | ||||||
|       n & 0xffffn // nth output of the tx
 |  | ||||||
|     ].join('x'); |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private utcDateToMysql(date?: number): string { |  | ||||||
|     const d = new Date((date || 0) * 1000); |  | ||||||
|     return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; |  | ||||||
|   } |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| export default new NetworkSyncService(); | export default new NetworkSyncService(); | ||||||
|  | |||||||
| @ -1,8 +1,8 @@ | |||||||
| import DB from '../../database'; |  | ||||||
| import logger from '../../logger'; | import logger from '../../logger'; | ||||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||||
| import LightningStatsImporter from './sync-tasks/stats-importer'; | import LightningStatsImporter from './sync-tasks/stats-importer'; | ||||||
| import config from '../../config'; | import config from '../../config'; | ||||||
|  | import { Common } from '../../api/common'; | ||||||
| 
 | 
 | ||||||
| class LightningStatsUpdater { | class LightningStatsUpdater { | ||||||
|   public async $startService(): Promise<void> { |   public async $startService(): Promise<void> { | ||||||
| @ -12,31 +12,22 @@ class LightningStatsUpdater { | |||||||
|     LightningStatsImporter.$run(); |     LightningStatsImporter.$run(); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private setDateMidnight(date: Date): void { |  | ||||||
|     date.setUTCHours(0); |  | ||||||
|     date.setUTCMinutes(0); |  | ||||||
|     date.setUTCSeconds(0); |  | ||||||
|     date.setUTCMilliseconds(0); |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   private async $runTasks(): Promise<void> { |   private async $runTasks(): Promise<void> { | ||||||
|     await this.$logStatsDaily(); |     await this.$logStatsDaily(); | ||||||
| 
 | 
 | ||||||
|     setTimeout(() => { |     setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL); | ||||||
|       this.$runTasks(); |  | ||||||
|     }, 1000 * config.LIGHTNING.NODE_STATS_REFRESH_INTERVAL); |  | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   /** |   /** | ||||||
|    * Update the latest entry for each node every config.LIGHTNING.NODE_STATS_REFRESH_INTERVAL seconds |    * Update the latest entry for each node every config.LIGHTNING.STATS_REFRESH_INTERVAL seconds | ||||||
|    */ |    */ | ||||||
|   private async $logStatsDaily(): Promise<void> { |   private async $logStatsDaily(): Promise<void> { | ||||||
|     const date = new Date(); |     const date = new Date(); | ||||||
|     this.setDateMidnight(date); |     Common.setDateMidnight(date); | ||||||
| 
 |  | ||||||
|     logger.info(`Updating latest networks stats`); |  | ||||||
|     const networkGraph = await lightningApi.$getNetworkGraph(); |     const networkGraph = await lightningApi.$getNetworkGraph(); | ||||||
|     LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph); |     LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph); | ||||||
|  |      | ||||||
|  |     logger.info(`Updated latest network stats`); | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,5 +1,6 @@ | |||||||
| import { existsSync, promises } from 'fs'; | import { existsSync, promises } from 'fs'; | ||||||
| import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; | import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; | ||||||
|  | import { Common } from '../../../api/common'; | ||||||
| import config from '../../../config'; | import config from '../../../config'; | ||||||
| import logger from '../../../logger'; | import logger from '../../../logger'; | ||||||
| 
 | 
 | ||||||
| @ -69,7 +70,11 @@ class FundingTxFetcher { | |||||||
|     this.running = false; |     this.running = false; | ||||||
|   } |   } | ||||||
|    |    | ||||||
|   public async $fetchChannelOpenTx(channelId: string): Promise<any> { |   public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> { | ||||||
|  |     if (channelId.indexOf('x') === -1) { | ||||||
|  |       channelId = Common.channelIntegerIdToShortId(channelId); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     if (this.fundingTxCache[channelId]) { |     if (this.fundingTxCache[channelId]) { | ||||||
|       return this.fundingTxCache[channelId]; |       return this.fundingTxCache[channelId]; | ||||||
|     } |     } | ||||||
| @ -110,4 +115,4 @@ class FundingTxFetcher { | |||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| export default new FundingTxFetcher; | export default new FundingTxFetcher; | ||||||
|  | |||||||
| @ -6,7 +6,10 @@ import DB from '../../../database'; | |||||||
| import logger from '../../../logger'; | import logger from '../../../logger'; | ||||||
| 
 | 
 | ||||||
| export async function $lookupNodeLocation(): Promise<void> { | export async function $lookupNodeLocation(): Promise<void> { | ||||||
|   logger.info(`Running node location updater using Maxmind...`); |   let loggerTimer = new Date().getTime() / 1000; | ||||||
|  |   let progress = 0; | ||||||
|  | 
 | ||||||
|  |   logger.info(`Running node location updater using Maxmind`); | ||||||
|   try { |   try { | ||||||
|     const nodes = await nodesApi.$getAllNodes(); |     const nodes = await nodesApi.$getAllNodes(); | ||||||
|     const lookupCity = await maxmind.open<CityResponse>(config.MAXMIND.GEOLITE2_CITY); |     const lookupCity = await maxmind.open<CityResponse>(config.MAXMIND.GEOLITE2_CITY); | ||||||
| @ -18,21 +21,24 @@ export async function $lookupNodeLocation(): Promise<void> { | |||||||
|       for (const socket of sockets) { |       for (const socket of sockets) { | ||||||
|         const ip = socket.substring(0, socket.lastIndexOf(':')).replace('[', '').replace(']', ''); |         const ip = socket.substring(0, socket.lastIndexOf(':')).replace('[', '').replace(']', ''); | ||||||
|         const hasClearnet = [4, 6].includes(net.isIP(ip)); |         const hasClearnet = [4, 6].includes(net.isIP(ip)); | ||||||
|  | 
 | ||||||
|         if (hasClearnet && ip !== '127.0.1.1' && ip !== '127.0.0.1') { |         if (hasClearnet && ip !== '127.0.1.1' && ip !== '127.0.0.1') { | ||||||
|           const city = lookupCity.get(ip); |           const city = lookupCity.get(ip); | ||||||
|           const asn = lookupAsn.get(ip); |           const asn = lookupAsn.get(ip); | ||||||
|           const isp = lookupIsp.get(ip); |           const isp = lookupIsp.get(ip); | ||||||
| 
 | 
 | ||||||
|           if (city && (asn || isp)) { |           if (city && (asn || isp)) { | ||||||
|             const query = `UPDATE nodes SET 
 |             const query = ` | ||||||
|               as_number = ?,  |               UPDATE nodes SET  | ||||||
|               city_id = ?,  |                 as_number = ?,  | ||||||
|               country_id = ?,  |                 city_id = ?,  | ||||||
|               subdivision_id = ?,  |                 country_id = ?,  | ||||||
|               longitude = ?,  |                 subdivision_id = ?,  | ||||||
|               latitude = ?,  |                 longitude = ?,  | ||||||
|               accuracy_radius = ? |                 latitude = ?,  | ||||||
|             WHERE public_key = ?`;
 |                 accuracy_radius = ? | ||||||
|  |               WHERE public_key = ? | ||||||
|  |             `;
 | ||||||
| 
 | 
 | ||||||
|             const params = [ |             const params = [ | ||||||
|               isp?.autonomous_system_number ?? asn?.autonomous_system_number, |               isp?.autonomous_system_number ?? asn?.autonomous_system_number, | ||||||
| @ -46,25 +52,25 @@ export async function $lookupNodeLocation(): Promise<void> { | |||||||
|             ]; |             ]; | ||||||
|             await DB.query(query, params); |             await DB.query(query, params); | ||||||
| 
 | 
 | ||||||
|              // Store Continent
 |             // Store Continent
 | ||||||
|              if (city.continent?.geoname_id) { |             if (city.continent?.geoname_id) { | ||||||
|                await DB.query( |               await DB.query( | ||||||
|                 `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'continent', ?)`, |                 `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'continent', ?)`, | ||||||
|                 [city.continent?.geoname_id, JSON.stringify(city.continent?.names)]); |                 [city.continent?.geoname_id, JSON.stringify(city.continent?.names)]); | ||||||
|              } |             } | ||||||
| 
 | 
 | ||||||
|              // Store Country
 |             // Store Country
 | ||||||
|              if (city.country?.geoname_id) { |             if (city.country?.geoname_id) { | ||||||
|                await DB.query( |               await DB.query( | ||||||
|                 `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country', ?)`, |                 `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country', ?)`, | ||||||
|                 [city.country?.geoname_id, JSON.stringify(city.country?.names)]); |                 [city.country?.geoname_id, JSON.stringify(city.country?.names)]); | ||||||
|              } |             } | ||||||
| 
 | 
 | ||||||
|             // Store Country ISO code
 |             // Store Country ISO code
 | ||||||
|             if (city.country?.iso_code) { |             if (city.country?.iso_code) { | ||||||
|               await DB.query( |               await DB.query( | ||||||
|                `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`, |                 `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`, | ||||||
|                [city.country?.geoname_id, city.country?.iso_code]); |                 [city.country?.geoname_id, city.country?.iso_code]); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             // Store Division
 |             // Store Division
 | ||||||
| @ -88,10 +94,17 @@ export async function $lookupNodeLocation(): Promise<void> { | |||||||
|                 [isp?.autonomous_system_number ?? asn?.autonomous_system_number, JSON.stringify(isp?.isp ?? asn?.autonomous_system_organization)]); |                 [isp?.autonomous_system_number ?? asn?.autonomous_system_number, JSON.stringify(isp?.isp ?? asn?.autonomous_system_organization)]); | ||||||
|             } |             } | ||||||
|           } |           } | ||||||
|  | 
 | ||||||
|  |           ++progress; | ||||||
|  |           const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); | ||||||
|  |           if (elapsedSeconds > 10) { | ||||||
|  |             logger.info(`Updating node location data ${progress}/${nodes.length}`); | ||||||
|  |             loggerTimer = new Date().getTime() / 1000; | ||||||
|  |           } | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|     logger.info(`Node location data updated.`); |     logger.info(`${progress} nodes location data updated`); | ||||||
|   } catch (e) { |   } catch (e) { | ||||||
|     logger.err('$lookupNodeLocation() error: ' + (e instanceof Error ? e.message : e)); |     logger.err('$lookupNodeLocation() error: ' + (e instanceof Error ? e.message : e)); | ||||||
|   } |   } | ||||||
|  | |||||||
| @ -128,32 +128,32 @@ class LightningStatsImporter { | |||||||
|       if (channel.node1_policy !== undefined) { // Coming from the node
 |       if (channel.node1_policy !== undefined) { // Coming from the node
 | ||||||
|         for (const policy of [channel.node1_policy, channel.node2_policy]) { |         for (const policy of [channel.node1_policy, channel.node2_policy]) { | ||||||
|           if (policy && policy.fee_rate_milli_msat < 5000) { |           if (policy && policy.fee_rate_milli_msat < 5000) { | ||||||
|             avgFeeRate += policy.fee_rate_milli_msat; |             avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10); | ||||||
|             feeRates.push(policy.fee_rate_milli_msat); |             feeRates.push(parseInt(policy.fee_rate_milli_msat, 10)); | ||||||
|           }   |           }   | ||||||
|           if (policy && policy.fee_base_msat < 5000) { |           if (policy && policy.fee_base_msat < 5000) { | ||||||
|             avgBaseFee += policy.fee_base_msat;       |             avgBaseFee += parseInt(policy.fee_base_msat, 10); | ||||||
|             baseFees.push(policy.fee_base_msat); |             baseFees.push(parseInt(policy.fee_base_msat, 10)); | ||||||
|           } |           } | ||||||
|         } |         } | ||||||
|       } else { // Coming from the historical import
 |       } else { // Coming from the historical import
 | ||||||
|         if (channel.fee_rate_milli_msat < 5000) { |         if (channel.fee_rate_milli_msat < 5000) { | ||||||
|           avgFeeRate += channel.fee_rate_milli_msat; |           avgFeeRate += parseInt(channel.fee_rate_milli_msat, 10); | ||||||
|           feeRates.push(channel.fee_rate_milli_msat); |           feeRates.push(parseInt(channel.fee_rate_milli_msat), 10); | ||||||
|         }   |         }   | ||||||
|         if (channel.fee_base_msat < 5000) { |         if (channel.fee_base_msat < 5000) { | ||||||
|           avgBaseFee += channel.fee_base_msat;       |           avgBaseFee += parseInt(channel.fee_base_msat, 10); | ||||||
|           baseFees.push(channel.fee_base_msat); |           baseFees.push(parseInt(channel.fee_base_msat), 10); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|      | 
 | ||||||
|     avgFeeRate /= networkGraph.edges.length; |     avgFeeRate /= Math.max(networkGraph.edges.length, 1); | ||||||
|     avgBaseFee /= networkGraph.edges.length; |     avgBaseFee /= Math.max(networkGraph.edges.length, 1); | ||||||
|     const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; |     const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; | ||||||
|     const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; |     const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; | ||||||
|     const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; |     const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; | ||||||
|     const avgCapacity = Math.round(capacity / capacities.length); |     const avgCapacity = Math.round(capacity / Math.max(capacities.length, 1)); | ||||||
| 
 | 
 | ||||||
|     let query = `INSERT INTO lightning_stats(
 |     let query = `INSERT INTO lightning_stats(
 | ||||||
|         added, |         added, | ||||||
| @ -251,6 +251,9 @@ class LightningStatsImporter { | |||||||
|     }; |     }; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |   /** | ||||||
|  |    * Import topology files LN historical data into the database | ||||||
|  |    */ | ||||||
|   async $importHistoricalLightningStats(): Promise<void> { |   async $importHistoricalLightningStats(): Promise<void> { | ||||||
|     let latestNodeCount = 1; |     let latestNodeCount = 1; | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user