diff --git a/backend/src/api/common.ts b/backend/src/api/common.ts index fe6b858e0..410d34a01 100644 --- a/backend/src/api/common.ts +++ b/backend/src/api/common.ts @@ -1,5 +1,6 @@ import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces'; import config from '../config'; +import { convertChannelId } from './lightning/clightning/clightning-convert'; export class Common { static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' @@ -184,4 +185,37 @@ export class Common { config.MEMPOOL.BLOCKS_SUMMARIES_INDEXING === true ); } + + static setDateMidnight(date: Date): void { + date.setUTCHours(0); + date.setUTCMinutes(0); + date.setUTCSeconds(0); + date.setUTCMilliseconds(0); + } + + static channelShortIdToIntegerId(id: string): string { + if (config.LIGHTNING.BACKEND === 'lnd') { + return id; + } + return convertChannelId(id); + } + + /** Decodes a channel id returned by lnd as uint64 to a short channel id */ + static channelIntegerIdToShortId(id: string): string { + if (config.LIGHTNING.BACKEND === 'cln') { + return id; + } + + const n = BigInt(id); + return [ + n >> 40n, // nth block + (n >> 16n) & 0xffffffn, // nth tx of the block + n & 0xffffn // nth output of the tx + ].join('x'); + } + + static utcDateToMysql(date?: number): string { + const d = new Date((date || 0) * 1000); + return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; + } } diff --git a/backend/src/api/explorer/channels.api.ts b/backend/src/api/explorer/channels.api.ts index 9928cc85b..6023d4c94 100644 --- a/backend/src/api/explorer/channels.api.ts +++ b/backend/src/api/explorer/channels.api.ts @@ -1,6 +1,9 @@ import logger from '../../logger'; import DB from '../../database'; import nodesApi from './nodes.api'; +import { ResultSetHeader } from 'mysql2'; +import { ILightningApi } from '../lightning/lightning-api.interface'; +import { Common } from '../common'; class ChannelsApi { public async $getAllChannels(): Promise { @@ -302,6 +305,139 @@ class ChannelsApi { }, }; } + + /** + * Save or update a channel present in the graph + */ + public async $saveChannel(channel: ILightningApi.Channel): Promise { + const [ txid, vout ] = channel.chan_point.split(':'); + + const policy1: Partial = channel.node1_policy || {}; + const policy2: Partial = channel.node2_policy || {}; + + try { + const query = `INSERT INTO channels + ( + id, + short_id, + capacity, + transaction_id, + transaction_vout, + updated_at, + status, + node1_public_key, + node1_base_fee_mtokens, + node1_cltv_delta, + node1_fee_rate, + node1_is_disabled, + node1_max_htlc_mtokens, + node1_min_htlc_mtokens, + node1_updated_at, + node2_public_key, + node2_base_fee_mtokens, + node2_cltv_delta, + node2_fee_rate, + node2_is_disabled, + node2_max_htlc_mtokens, + node2_min_htlc_mtokens, + node2_updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + capacity = ?, + updated_at = ?, + status = 1, + node1_public_key = ?, + node1_base_fee_mtokens = ?, + node1_cltv_delta = ?, + node1_fee_rate = ?, + node1_is_disabled = ?, + node1_max_htlc_mtokens = ?, + node1_min_htlc_mtokens = ?, + node1_updated_at = ?, + node2_public_key = ?, + node2_base_fee_mtokens = ?, + node2_cltv_delta = ?, + node2_fee_rate = ?, + node2_is_disabled = ?, + node2_max_htlc_mtokens = ?, + node2_min_htlc_mtokens = ?, + node2_updated_at = ? + ;`; + + await DB.query(query, [ + Common.channelShortIdToIntegerId(channel.channel_id), + Common.channelIntegerIdToShortId(channel.channel_id), + channel.capacity, + txid, + vout, + Common.utcDateToMysql(channel.last_update), + channel.node1_pub, + policy1.fee_base_msat, + policy1.time_lock_delta, + policy1.fee_rate_milli_msat, + policy1.disabled, + policy1.max_htlc_msat, + policy1.min_htlc, + Common.utcDateToMysql(policy1.last_update), + channel.node2_pub, + policy2.fee_base_msat, + policy2.time_lock_delta, + policy2.fee_rate_milli_msat, + policy2.disabled, + policy2.max_htlc_msat, + policy2.min_htlc, + Common.utcDateToMysql(policy2.last_update), + channel.capacity, + Common.utcDateToMysql(channel.last_update), + channel.node1_pub, + policy1.fee_base_msat, + policy1.time_lock_delta, + policy1.fee_rate_milli_msat, + policy1.disabled, + policy1.max_htlc_msat, + policy1.min_htlc, + Common.utcDateToMysql(policy1.last_update), + channel.node2_pub, + policy2.fee_base_msat, + policy2.time_lock_delta, + policy2.fee_rate_milli_msat, + policy2.disabled, + policy2.max_htlc_msat, + policy2.min_htlc, + Common.utcDateToMysql(policy2.last_update) + ]); + } catch (e) { + logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e)); + } + } + + /** + * Set all channels not in `graphChannelsIds` as inactive (status = 0) + */ + public async $setChannelsInactive(graphChannelsIds: string[]): Promise { + if (graphChannelsIds.length === 0) { + return; + } + + try { + const result = await DB.query(` + UPDATE channels + SET status = 0 + WHERE short_id NOT IN ( + ${graphChannelsIds.map(id => `"${id}"`).join(',')} + ) + AND status != 2 + `); + if (result[0].changedRows ?? 0 > 0) { + logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`); + } else { + logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`); + } + } catch (e) { + logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e)); + } + } } export default new ChannelsApi(); diff --git a/backend/src/api/explorer/nodes.api.ts b/backend/src/api/explorer/nodes.api.ts index 93eef9a48..d4857a3a4 100644 --- a/backend/src/api/explorer/nodes.api.ts +++ b/backend/src/api/explorer/nodes.api.ts @@ -1,5 +1,7 @@ import logger from '../../logger'; import DB from '../../database'; +import { ResultSetHeader } from 'mysql2'; +import { ILightningApi } from '../lightning/lightning-api.interface'; class NodesApi { public async $getNode(public_key: string): Promise { @@ -321,6 +323,66 @@ class NodesApi { throw e; } } + + /** + * Save or update a node present in the graph + */ + public async $saveNode(node: ILightningApi.Node): Promise { + try { + const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; + const query = `INSERT INTO nodes( + public_key, + first_seen, + updated_at, + alias, + color, + sockets, + status + ) + VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?, 1) + ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?, status = 1`; + + await DB.query(query, [ + node.pub_key, + node.last_update, + node.alias, + node.color, + sockets, + node.last_update, + node.alias, + node.color, + sockets, + ]); + } catch (e) { + logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e)); + } + } + + /** + * Set all nodes not in `nodesPubkeys` as inactive (status = 0) + */ + public async $setNodesInactive(graphNodesPubkeys: string[]): Promise { + if (graphNodesPubkeys.length === 0) { + return; + } + + try { + const result = await DB.query(` + UPDATE nodes + SET status = 0 + WHERE public_key NOT IN ( + ${graphNodesPubkeys.map(pubkey => `"${pubkey}"`).join(',')} + ) + `); + if (result[0].changedRows ?? 0 > 0) { + logger.info(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`); + } else { + logger.debug(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`); + } + } catch (e) { + logger.err('$setNodesInactive() error: ' + (e instanceof Error ? e.message : e)); + } + } } export default new NodesApi(); diff --git a/backend/src/config.ts b/backend/src/config.ts index d4dfc9edd..ddf1fd3d4 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -32,7 +32,8 @@ interface IConfig { ENABLED: boolean; BACKEND: 'lnd' | 'cln' | 'ldk'; TOPOLOGY_FOLDER: string; - NODE_STATS_REFRESH_INTERVAL: number; + STATS_REFRESH_INTERVAL: number; + GRAPH_REFRESH_INTERVAL: number; }; LND: { TLS_CERT_PATH: string; @@ -184,7 +185,8 @@ const defaults: IConfig = { 'ENABLED': false, 'BACKEND': 'lnd', 'TOPOLOGY_FOLDER': '', - 'NODE_STATS_REFRESH_INTERVAL': 600, + 'STATS_REFRESH_INTERVAL': 600, + 'GRAPH_REFRESH_INTERVAL': 600, }, 'LND': { 'TLS_CERT_PATH': '', diff --git a/backend/src/database.ts b/backend/src/database.ts index 66c876378..c2fb0980b 100644 --- a/backend/src/database.ts +++ b/backend/src/database.ts @@ -1,7 +1,7 @@ import config from './config'; import { createPool, Pool, PoolConnection } from 'mysql2/promise'; import logger from './logger'; -import { PoolOptions } from 'mysql2/typings/mysql'; +import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } from 'mysql2/typings/mysql'; class DB { constructor() { @@ -28,7 +28,9 @@ import { PoolOptions } from 'mysql2/typings/mysql'; } } - public async query(query, params?) { + public async query(query, params?): Promise<[T, FieldPacket[]]> + { this.checkDBFlag(); const pool = await this.getPool(); return pool.query(query, params); diff --git a/backend/src/tasks/lightning/network-sync.service.ts b/backend/src/tasks/lightning/network-sync.service.ts index c6bfdcbe3..8f2f77534 100644 --- a/backend/src/tasks/lightning/network-sync.service.ts +++ b/backend/src/tasks/lightning/network-sync.service.ts @@ -1,60 +1,43 @@ import DB from '../../database'; import logger from '../../logger'; import channelsApi from '../../api/explorer/channels.api'; -import bitcoinClient from '../../api/bitcoin/bitcoin-client'; import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory'; import config from '../../config'; import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; import { ILightningApi } from '../../api/lightning/lightning-api.interface'; import { $lookupNodeLocation } from './sync-tasks/node-locations'; import lightningApi from '../../api/lightning/lightning-api-factory'; -import { convertChannelId } from '../../api/lightning/clightning/clightning-convert'; -import { Common } from '../../api/common'; +import nodesApi from '../../api/explorer/nodes.api'; +import { ResultSetHeader } from 'mysql2'; +import fundingTxFetcher from './sync-tasks/funding-tx-fetcher'; class NetworkSyncService { + loggerTimer = 0; + constructor() {} - public async $startService() { - logger.info('Starting node sync service'); + public async $startService(): Promise { + logger.info('Starting lightning network sync service'); - await this.$runUpdater(); + this.loggerTimer = new Date().getTime() / 1000; - setInterval(async () => { - await this.$runUpdater(); - }, 1000 * 60 * 60); + await this.$runTasks(); } - private async $runUpdater(): Promise { + private async $runTasks(): Promise { try { - logger.info(`Updating nodes and channels...`); + logger.info(`Updating nodes and channels`); const networkGraph = await lightningApi.$getNetworkGraph(); if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) { logger.info(`LN Network graph is empty, retrying in 10 seconds`); - await Common.sleep$(10000); - this.$runUpdater(); + setTimeout(() => { this.$runTasks(); }, 10000); return; } - for (const node of networkGraph.nodes) { - await this.$saveNode(node); - } - logger.info(`Nodes updated.`); - - if (config.MAXMIND.ENABLED) { - await $lookupNodeLocation(); - } - - const graphChannelsIds: string[] = []; - for (const channel of networkGraph.edges) { - await this.$saveChannel(channel); - graphChannelsIds.push(channel.channel_id); - } - await this.$setChannelsInactive(graphChannelsIds); - - logger.info(`Channels updated.`); - - await this.$findInactiveNodesAndChannels(); + await this.$updateNodesList(networkGraph.nodes); + await this.$updateChannelsList(networkGraph.edges); + await this.$deactivateChannelsWithoutActiveNodes(); await this.$lookUpCreationDateFromChain(); await this.$updateNodeFirstSeen(); await this.$scanForClosedChannels(); @@ -63,60 +46,148 @@ class NetworkSyncService { } } catch (e) { - logger.err('$runUpdater() error: ' + (e instanceof Error ? e.message : e)); + logger.err('$runTasks() error: ' + (e instanceof Error ? e.message : e)); } + + setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL); + } + + /** + * Update the `nodes` table to reflect the current network graph state + */ + private async $updateNodesList(nodes: ILightningApi.Node[]): Promise { + let progress = 0; + + const graphNodesPubkeys: string[] = []; + for (const node of nodes) { + await nodesApi.$saveNode(node); + graphNodesPubkeys.push(node.pub_key); + ++progress; + + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating node ${progress}/${nodes.length}`); + this.loggerTimer = new Date().getTime() / 1000; + } + } + logger.info(`${progress} nodes updated`); + + // If a channel if not present in the graph, mark it as inactive + // nodesApi.$setNodesInactive(graphNodesPubkeys); + + if (config.MAXMIND.ENABLED) { + $lookupNodeLocation(); + } + } + + /** + * Update the `channels` table to reflect the current network graph state + */ + private async $updateChannelsList(channels: ILightningApi.Channel[]): Promise { + let progress = 0; + + const graphChannelsIds: string[] = []; + for (const channel of channels) { + await channelsApi.$saveChannel(channel); + graphChannelsIds.push(channel.channel_id); + ++progress; + + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating channel ${progress}/${channels.length}`); + this.loggerTimer = new Date().getTime() / 1000; + } + } + + logger.info(`${progress} channels updated`); + + // If a channel if not present in the graph, mark it as inactive + channelsApi.$setChannelsInactive(graphChannelsIds); } // This method look up the creation date of the earliest channel of the node // and update the node to that date in order to get the earliest first seen date - private async $updateNodeFirstSeen() { + private async $updateNodeFirstSeen(): Promise { + let progress = 0; + let updated = 0; + try { - const [nodes]: any[] = await DB.query(`SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node1_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created1, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node2_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created2 FROM nodes`); + const [nodes]: any[] = await DB.query(` + SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen, + ( + SELECT MIN(UNIX_TIMESTAMP(created)) + FROM channels + WHERE channels.node1_public_key = nodes.public_key + ) AS created1, + ( + SELECT MIN(UNIX_TIMESTAMP(created)) + FROM channels + WHERE channels.node2_public_key = nodes.public_key + ) AS created2 + FROM nodes + `); + for (const node of nodes) { - let lowest = 0; - if (node.created1) { - if (node.created2 && node.created2 < node.created1) { - lowest = node.created2; - } else { - lowest = node.created1; - } - } else if (node.created2) { - lowest = node.created2; - } - if (lowest && lowest < node.first_seen) { + const lowest = Math.min( + node.created1 ?? Number.MAX_SAFE_INTEGER, + node.created2 ?? Number.MAX_SAFE_INTEGER, + node.first_seen ?? Number.MAX_SAFE_INTEGER + ); + if (lowest < node.first_seen) { const query = `UPDATE nodes SET first_seen = FROM_UNIXTIME(?) WHERE public_key = ?`; const params = [lowest, node.public_key]; await DB.query(query, params); } + ++progress; + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating node first seen date ${progress}/${nodes.length}`); + this.loggerTimer = new Date().getTime() / 1000; + ++updated; + } } - logger.info(`Node first seen dates scan complete.`); + logger.info(`Updated ${updated} node first seen dates`); } catch (e) { logger.err('$updateNodeFirstSeen() error: ' + (e instanceof Error ? e.message : e)); } } - private async $lookUpCreationDateFromChain() { - logger.info(`Running channel creation date lookup...`); + private async $lookUpCreationDateFromChain(): Promise { + let progress = 0; + + logger.info(`Running channel creation date lookup`); try { const channels = await channelsApi.$getChannelsWithoutCreatedDate(); for (const channel of channels) { - const transaction = await bitcoinClient.getRawTransaction(channel.transaction_id, 1); - await DB.query(`UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`, [transaction.blocktime, channel.id]); + const transaction = await fundingTxFetcher.$fetchChannelOpenTx(channel.short_id); + await DB.query(` + UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`, + [transaction.timestamp, channel.id] + ); + ++progress; + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating channel creation date ${progress}/${channels.length}`); + this.loggerTimer = new Date().getTime() / 1000; + } } - logger.info(`Channel creation dates scan complete.`); + logger.info(`Updated ${channels.length} channels' creation date`); } catch (e) { - logger.err('$setCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e)); + logger.err('$lookUpCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e)); } } - // Looking for channels whos nodes are inactive - private async $findInactiveNodesAndChannels(): Promise { - logger.info(`Running inactive channels scan...`); + /** + * If a channel does not have any active node linked to it, then also + * mark that channel as inactive + */ + private async $deactivateChannelsWithoutActiveNodes(): Promise { + logger.info(`Find channels which nodes are offline`); try { - const [channels]: [{ id: string }[]] = await DB.query(` - SELECT channels.id - FROM channels + const result = await DB.query(` + UPDATE channels + SET status = 0 WHERE channels.status = 1 AND ( ( @@ -131,16 +202,19 @@ class NetworkSyncService { ) = 0) `); - for (const channel of channels) { - await this.$updateChannelStatus(channel.id, 0); + if (result[0].changedRows ?? 0 > 0) { + logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`); + } else { + logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`); } - logger.info(`Inactive channels scan complete.`); } catch (e) { - logger.err('$findInactiveNodesAndChannels() error: ' + (e instanceof Error ? e.message : e)); + logger.err('$deactivateChannelsWithoutActiveNodes() error: ' + (e instanceof Error ? e.message : e)); } } private async $scanForClosedChannels(): Promise { + let progress = 0; + try { logger.info(`Starting closed channels scan...`); const channels = await channelsApi.$getChannelsByStatus(0); @@ -154,6 +228,13 @@ class NetworkSyncService { await DB.query(`UPDATE channels SET closing_transaction_id = ? WHERE id = ?`, [spendingTx.txid, channel.id]); } } + + ++progress; + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Checking if channel has been closed ${progress}/${channels.length}`); + this.loggerTimer = new Date().getTime() / 1000; + } } logger.info(`Closed channels scan complete.`); } catch (e) { @@ -171,6 +252,9 @@ class NetworkSyncService { if (!config.ESPLORA.REST_API_URL) { return; } + + let progress = 0; + try { logger.info(`Started running closed channel forensics...`); const channels = await channelsApi.$getClosedChannelsWithoutReason(); @@ -216,6 +300,13 @@ class NetworkSyncService { logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.'); await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]); } + + ++progress; + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating channel closed channel forensics ${progress}/${channels.length}`); + this.loggerTimer = new Date().getTime() / 1000; + } } logger.info(`Closed channels forensics scan complete.`); } catch (e) { @@ -270,195 +361,6 @@ class NetworkSyncService { } return 1; } - - private async $saveChannel(channel: ILightningApi.Channel): Promise { - const [ txid, vout ] = channel.chan_point.split(':'); - - const policy1: Partial = channel.node1_policy || {}; - const policy2: Partial = channel.node2_policy || {}; - - try { - const query = `INSERT INTO channels - ( - id, - short_id, - capacity, - transaction_id, - transaction_vout, - updated_at, - status, - node1_public_key, - node1_base_fee_mtokens, - node1_cltv_delta, - node1_fee_rate, - node1_is_disabled, - node1_max_htlc_mtokens, - node1_min_htlc_mtokens, - node1_updated_at, - node2_public_key, - node2_base_fee_mtokens, - node2_cltv_delta, - node2_fee_rate, - node2_is_disabled, - node2_max_htlc_mtokens, - node2_min_htlc_mtokens, - node2_updated_at - ) - VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON DUPLICATE KEY UPDATE - capacity = ?, - updated_at = ?, - status = 1, - node1_public_key = ?, - node1_base_fee_mtokens = ?, - node1_cltv_delta = ?, - node1_fee_rate = ?, - node1_is_disabled = ?, - node1_max_htlc_mtokens = ?, - node1_min_htlc_mtokens = ?, - node1_updated_at = ?, - node2_public_key = ?, - node2_base_fee_mtokens = ?, - node2_cltv_delta = ?, - node2_fee_rate = ?, - node2_is_disabled = ?, - node2_max_htlc_mtokens = ?, - node2_min_htlc_mtokens = ?, - node2_updated_at = ? - ;`; - - await DB.query(query, [ - this.toIntegerId(channel.channel_id), - this.toShortId(channel.channel_id), - channel.capacity, - txid, - vout, - this.utcDateToMysql(channel.last_update), - channel.node1_pub, - policy1.fee_base_msat, - policy1.time_lock_delta, - policy1.fee_rate_milli_msat, - policy1.disabled, - policy1.max_htlc_msat, - policy1.min_htlc, - this.utcDateToMysql(policy1.last_update), - channel.node2_pub, - policy2.fee_base_msat, - policy2.time_lock_delta, - policy2.fee_rate_milli_msat, - policy2.disabled, - policy2.max_htlc_msat, - policy2.min_htlc, - this.utcDateToMysql(policy2.last_update), - channel.capacity, - this.utcDateToMysql(channel.last_update), - channel.node1_pub, - policy1.fee_base_msat, - policy1.time_lock_delta, - policy1.fee_rate_milli_msat, - policy1.disabled, - policy1.max_htlc_msat, - policy1.min_htlc, - this.utcDateToMysql(policy1.last_update), - channel.node2_pub, - policy2.fee_base_msat, - policy2.time_lock_delta, - policy2.fee_rate_milli_msat, - policy2.disabled, - policy2.max_htlc_msat, - policy2.min_htlc, - this.utcDateToMysql(policy2.last_update) - ]); - } catch (e) { - logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private async $updateChannelStatus(channelId: string, status: number): Promise { - try { - await DB.query(`UPDATE channels SET status = ? WHERE id = ?`, [status, channelId]); - } catch (e) { - logger.err('$updateChannelStatus() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private async $setChannelsInactive(graphChannelsIds: string[]): Promise { - if (graphChannelsIds.length === 0) { - return; - } - - try { - await DB.query(` - UPDATE channels - SET status = 0 - WHERE short_id NOT IN ( - ${graphChannelsIds.map(id => `"${id}"`).join(',')} - ) - AND status != 2 - `); - } catch (e) { - logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private async $saveNode(node: ILightningApi.Node): Promise { - try { - const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; - const query = `INSERT INTO nodes( - public_key, - first_seen, - updated_at, - alias, - color, - sockets - ) - VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?) - ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?`; - - await DB.query(query, [ - node.pub_key, - node.last_update, - node.alias, - node.color, - sockets, - node.last_update, - node.alias, - node.color, - sockets, - ]); - } catch (e) { - logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private toIntegerId(id: string): string { - if (config.LIGHTNING.BACKEND === 'cln') { - return convertChannelId(id); - } - else if (config.LIGHTNING.BACKEND === 'lnd') { - return id; - } - return ''; - } - - /** Decodes a channel id returned by lnd as uint64 to a short channel id */ - private toShortId(id: string): string { - if (config.LIGHTNING.BACKEND === 'cln') { - return id; - } - - const n = BigInt(id); - return [ - n >> 40n, // nth block - (n >> 16n) & 0xffffffn, // nth tx of the block - n & 0xffffn // nth output of the tx - ].join('x'); - } - - private utcDateToMysql(date?: number): string { - const d = new Date((date || 0) * 1000); - return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; - } } export default new NetworkSyncService(); diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index d58ff0ae6..c0db48976 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -1,8 +1,8 @@ -import DB from '../../database'; import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; import LightningStatsImporter from './sync-tasks/stats-importer'; import config from '../../config'; +import { Common } from '../../api/common'; class LightningStatsUpdater { public async $startService(): Promise { @@ -12,29 +12,20 @@ class LightningStatsUpdater { LightningStatsImporter.$run(); } - private setDateMidnight(date: Date): void { - date.setUTCHours(0); - date.setUTCMinutes(0); - date.setUTCSeconds(0); - date.setUTCMilliseconds(0); - } - private async $runTasks(): Promise { await this.$logStatsDaily(); - setTimeout(() => { - this.$runTasks(); - }, 1000 * config.LIGHTNING.NODE_STATS_REFRESH_INTERVAL); + setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL); } /** - * Update the latest entry for each node every config.LIGHTNING.NODE_STATS_REFRESH_INTERVAL seconds + * Update the latest entry for each node every config.LIGHTNING.STATS_REFRESH_INTERVAL seconds */ private async $logStatsDaily(): Promise { const date = new Date(); - this.setDateMidnight(date); + Common.setDateMidnight(date); - logger.info(`Updating latest networks stats`); + logger.info(`Updating latest network stats`); const networkGraph = await lightningApi.$getNetworkGraph(); LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph); } diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index 6ca72aef7..8ca05b929 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -69,7 +69,7 @@ class FundingTxFetcher { this.running = false; } - public async $fetchChannelOpenTx(channelId: string): Promise { + public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> { if (this.fundingTxCache[channelId]) { return this.fundingTxCache[channelId]; } diff --git a/backend/src/tasks/lightning/sync-tasks/node-locations.ts b/backend/src/tasks/lightning/sync-tasks/node-locations.ts index 483131b26..30a6bfc2a 100644 --- a/backend/src/tasks/lightning/sync-tasks/node-locations.ts +++ b/backend/src/tasks/lightning/sync-tasks/node-locations.ts @@ -6,7 +6,10 @@ import DB from '../../../database'; import logger from '../../../logger'; export async function $lookupNodeLocation(): Promise { - logger.info(`Running node location updater using Maxmind...`); + let loggerTimer = new Date().getTime() / 1000; + let progress = 0; + + logger.info(`Running node location updater using Maxmind`); try { const nodes = await nodesApi.$getAllNodes(); const lookupCity = await maxmind.open(config.MAXMIND.GEOLITE2_CITY); @@ -18,21 +21,24 @@ export async function $lookupNodeLocation(): Promise { for (const socket of sockets) { const ip = socket.substring(0, socket.lastIndexOf(':')).replace('[', '').replace(']', ''); const hasClearnet = [4, 6].includes(net.isIP(ip)); + if (hasClearnet && ip !== '127.0.1.1' && ip !== '127.0.0.1') { const city = lookupCity.get(ip); const asn = lookupAsn.get(ip); const isp = lookupIsp.get(ip); if (city && (asn || isp)) { - const query = `UPDATE nodes SET - as_number = ?, - city_id = ?, - country_id = ?, - subdivision_id = ?, - longitude = ?, - latitude = ?, - accuracy_radius = ? - WHERE public_key = ?`; + const query = ` + UPDATE nodes SET + as_number = ?, + city_id = ?, + country_id = ?, + subdivision_id = ?, + longitude = ?, + latitude = ?, + accuracy_radius = ? + WHERE public_key = ? + `; const params = [ isp?.autonomous_system_number ?? asn?.autonomous_system_number, @@ -46,25 +52,25 @@ export async function $lookupNodeLocation(): Promise { ]; await DB.query(query, params); - // Store Continent - if (city.continent?.geoname_id) { - await DB.query( + // Store Continent + if (city.continent?.geoname_id) { + await DB.query( `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'continent', ?)`, [city.continent?.geoname_id, JSON.stringify(city.continent?.names)]); - } + } - // Store Country - if (city.country?.geoname_id) { - await DB.query( + // Store Country + if (city.country?.geoname_id) { + await DB.query( `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country', ?)`, [city.country?.geoname_id, JSON.stringify(city.country?.names)]); - } + } // Store Country ISO code if (city.country?.iso_code) { await DB.query( - `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`, - [city.country?.geoname_id, city.country?.iso_code]); + `INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`, + [city.country?.geoname_id, city.country?.iso_code]); } // Store Division @@ -88,10 +94,17 @@ export async function $lookupNodeLocation(): Promise { [isp?.autonomous_system_number ?? asn?.autonomous_system_number, JSON.stringify(isp?.isp ?? asn?.autonomous_system_organization)]); } } + + ++progress; + const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); + if (elapsedSeconds > 10) { + logger.info(`Updating node location data ${progress}/${nodes.length}`); + loggerTimer = new Date().getTime() / 1000; + } } } } - logger.info(`Node location data updated.`); + logger.info(`${progress} nodes location data updated`); } catch (e) { logger.err('$lookupNodeLocation() error: ' + (e instanceof Error ? e.message : e)); }