From 4ea1e98547c09db50fade30756a7a60ed4064d60 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Mon, 1 Aug 2022 17:25:44 +0200 Subject: [PATCH 01/17] Import LN historical statistics (network wide + per node) --- backend/package-lock.json | 38 +++ backend/package.json | 1 + backend/src/api/database-migration.ts | 6 +- backend/src/config.ts | 4 +- .../tasks/lightning/stats-updater.service.ts | 183 +---------- .../sync-tasks/funding-tx-fetcher.ts | 104 +++++++ .../lightning/sync-tasks/stats-importer.ts | 287 ++++++++++++++++++ 7 files changed, 440 insertions(+), 183 deletions(-) create mode 100644 backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts create mode 100644 backend/src/tasks/lightning/sync-tasks/stats-importer.ts diff --git a/backend/package-lock.json b/backend/package-lock.json index b23a7f874..968cb953b 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -31,6 +31,7 @@ "@typescript-eslint/parser": "^5.30.5", "eslint": "^8.19.0", "eslint-config-prettier": "^8.5.0", + "fast-xml-parser": "^4.0.9", "prettier": "^2.7.1" } }, @@ -1496,6 +1497,22 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "node_modules/fast-xml-parser": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz", + "integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==", + "dev": true, + "dependencies": { + "strnum": "^1.0.5" + }, + "bin": { + "fxparser": "src/cli/cli.js" + }, + "funding": { + "type": "paypal", + "url": "https://paypal.me/naturalintelligence" + } + }, "node_modules/fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", @@ -2665,6 +2682,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/strnum": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz", + "integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==", + "dev": true + }, "node_modules/text-table": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", @@ -3973,6 +3996,15 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "fast-xml-parser": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz", + "integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==", + "dev": true, + "requires": { + "strnum": "^1.0.5" + } + }, "fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", @@ -4817,6 +4849,12 @@ "integrity": "sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==", "dev": true }, + "strnum": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz", + "integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==", + "dev": true + }, "text-table": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", diff --git a/backend/package.json b/backend/package.json index 6345e89da..750380156 100644 --- a/backend/package.json +++ b/backend/package.json @@ -53,6 +53,7 @@ "@typescript-eslint/parser": "^5.30.5", "eslint": "^8.19.0", "eslint-config-prettier": "^8.5.0", + "fast-xml-parser": "^4.0.9", "prettier": "^2.7.1" } } diff --git a/backend/src/api/database-migration.ts b/backend/src/api/database-migration.ts index d26bfd6cc..816efc7cc 100644 --- a/backend/src/api/database-migration.ts +++ b/backend/src/api/database-migration.ts @@ -4,7 +4,7 @@ import logger from '../logger'; import { Common } from './common'; class DatabaseMigration { - private static currentVersion = 33; + private static currentVersion = 34; private queryTimeout = 120000; private statisticsAddedIndexed = false; private uniqueLogs: string[] = []; @@ -311,6 +311,10 @@ class DatabaseMigration { if (databaseSchemaVersion < 33 && isBitcoin == true) { await this.$executeQuery('ALTER TABLE `geo_names` CHANGE `type` `type` enum("city","country","division","continent","as_organization", "country_iso_code") NOT NULL'); } + + if (databaseSchemaVersion < 34 && isBitcoin == true) { + await this.$executeQuery('ALTER TABLE `lightning_stats` ADD clearnet_tor_nodes int(11) NOT NULL DEFAULT "0"'); + } } /** diff --git a/backend/src/config.ts b/backend/src/config.ts index 5560a25a7..d480e6c51 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -31,6 +31,7 @@ interface IConfig { LIGHTNING: { ENABLED: boolean; BACKEND: 'lnd' | 'cln' | 'ldk'; + TOPOLOGY_FOLDER: string; }; LND: { TLS_CERT_PATH: string; @@ -177,7 +178,8 @@ const defaults: IConfig = { }, 'LIGHTNING': { 'ENABLED': false, - 'BACKEND': 'lnd' + 'BACKEND': 'lnd', + 'TOPOLOGY_FOLDER': '', }, 'LND': { 'TLS_CERT_PATH': '', diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index 0a3ade614..c48b683cd 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -3,7 +3,7 @@ import DB from '../../database'; import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; import channelsApi from '../../api/explorer/channels.api'; -import * as net from 'net'; +import { isIP } from 'net'; class LightningStatsUpdater { hardCodedStartTime = '2018-01-12'; @@ -28,9 +28,6 @@ class LightningStatsUpdater { return; } - await this.$populateHistoricalStatistics(); - await this.$populateHistoricalNodeStatistics(); - setTimeout(() => { this.$runTasks(); }, this.timeUntilMidnight()); @@ -85,7 +82,7 @@ class LightningStatsUpdater { if (hasOnion) { torNodes++; } - const hasClearnet = [4, 6].includes(net.isIP(socket.addr.split(':')[0])); + const hasClearnet = [4, 6].includes(isIP(socket.split(':')[0])); if (hasClearnet) { clearnetNodes++; } @@ -167,182 +164,6 @@ class LightningStatsUpdater { logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e)); } } - - // We only run this on first launch - private async $populateHistoricalStatistics() { - try { - const [rows]: any = await DB.query(`SELECT COUNT(*) FROM lightning_stats`); - // Only run if table is empty - if (rows[0]['COUNT(*)'] > 0) { - return; - } - logger.info(`Running historical stats population...`); - - const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels ORDER BY created ASC`); - const [nodes]: any = await DB.query(`SELECT first_seen, sockets FROM nodes ORDER BY first_seen ASC`); - - const date: Date = new Date(this.hardCodedStartTime); - const currentDate = new Date(); - this.setDateMidnight(currentDate); - - while (date < currentDate) { - let totalCapacity = 0; - let channelsCount = 0; - - for (const channel of channels) { - if (new Date(channel.created) > date) { - break; - } - if (channel.closing_date === null || new Date(channel.closing_date) > date) { - totalCapacity += channel.capacity; - channelsCount++; - } - } - - let nodeCount = 0; - let clearnetNodes = 0; - let torNodes = 0; - let unannouncedNodes = 0; - - for (const node of nodes) { - if (new Date(node.first_seen) > date) { - break; - } - nodeCount++; - - const sockets = node.sockets.split(','); - let isUnnanounced = true; - for (const socket of sockets) { - const hasOnion = socket.indexOf('.onion') !== -1; - if (hasOnion) { - torNodes++; - isUnnanounced = false; - } - const hasClearnet = [4, 6].includes(net.isIP(socket.substring(0, socket.lastIndexOf(':')))); - if (hasClearnet) { - clearnetNodes++; - isUnnanounced = false; - } - } - if (isUnnanounced) { - unannouncedNodes++; - } - } - - const query = `INSERT INTO lightning_stats( - added, - channel_count, - node_count, - total_capacity, - tor_nodes, - clearnet_nodes, - unannounced_nodes, - avg_capacity, - avg_fee_rate, - avg_base_fee_mtokens, - med_capacity, - med_fee_rate, - med_base_fee_mtokens - ) - VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; - - const rowTimestamp = date.getTime() / 1000; // Save timestamp for the row insertion down below - - date.setUTCDate(date.getUTCDate() + 1); - - // Last iteration, save channels stats - const channelStats = (date >= currentDate ? await channelsApi.$getChannelsStats() : undefined); - - await DB.query(query, [ - rowTimestamp, - channelsCount, - nodeCount, - totalCapacity, - torNodes, - clearnetNodes, - unannouncedNodes, - channelStats?.avgCapacity ?? 0, - channelStats?.avgFeeRate ?? 0, - channelStats?.avgBaseFee ?? 0, - channelStats?.medianCapacity ?? 0, - channelStats?.medianFeeRate ?? 0, - channelStats?.medianBaseFee ?? 0, - ]); - } - - logger.info('Historical stats populated.'); - } catch (e) { - logger.err('$populateHistoricalData() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private async $populateHistoricalNodeStatistics() { - try { - const [rows]: any = await DB.query(`SELECT COUNT(*) FROM node_stats`); - // Only run if table is empty - if (rows[0]['COUNT(*)'] > 0) { - return; - } - logger.info(`Running historical node stats population...`); - - const [nodes]: any = await DB.query(`SELECT public_key, first_seen, alias FROM nodes ORDER BY first_seen ASC`); - - for (const node of nodes) { - const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels WHERE node1_public_key = ? OR node2_public_key = ? ORDER BY created ASC`, [node.public_key, node.public_key]); - - const date: Date = new Date(this.hardCodedStartTime); - const currentDate = new Date(); - this.setDateMidnight(currentDate); - - let lastTotalCapacity = 0; - let lastChannelsCount = 0; - - while (date < currentDate) { - let totalCapacity = 0; - let channelsCount = 0; - for (const channel of channels) { - if (new Date(channel.created) > date) { - break; - } - if (channel.closing_date !== null && new Date(channel.closing_date) < date) { - date.setUTCDate(date.getUTCDate() + 1); - continue; - } - totalCapacity += channel.capacity; - channelsCount++; - } - - if (lastTotalCapacity === totalCapacity && lastChannelsCount === channelsCount) { - date.setUTCDate(date.getUTCDate() + 1); - continue; - } - - lastTotalCapacity = totalCapacity; - lastChannelsCount = channelsCount; - - const query = `INSERT INTO node_stats( - public_key, - added, - capacity, - channels - ) - VALUES (?, FROM_UNIXTIME(?), ?, ?)`; - - await DB.query(query, [ - node.public_key, - date.getTime() / 1000, - totalCapacity, - channelsCount, - ]); - date.setUTCDate(date.getUTCDate() + 1); - } - logger.debug('Updated node_stats for: ' + node.alias); - } - logger.info('Historical stats populated.'); - } catch (e) { - logger.err('$populateHistoricalNodeData() error: ' + (e instanceof Error ? e.message : e)); - } - } } export default new LightningStatsUpdater(); diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts new file mode 100644 index 000000000..b9407c44d --- /dev/null +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -0,0 +1,104 @@ +import { existsSync, readFileSync, writeFileSync } from 'fs'; +import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; +import config from '../../../config'; +import logger from '../../../logger'; + +const BLOCKS_CACHE_MAX_SIZE = 100; +const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json'; + +class FundingTxFetcher { + private running = false; + private blocksCache = {}; + private channelNewlyProcessed = 0; + public fundingTxCache = {}; + + async $fetchChannelsFundingTxs(channelIds: string[]): Promise { + if (this.running) { + return; + } + this.running = true; + + // Load funding tx disk cache + if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { + try { + this.fundingTxCache = JSON.parse(readFileSync(CACHE_FILE_NAME, 'utf-8')); + } catch (e) { + logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`); + this.fundingTxCache = {}; + } + logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`); + } + + const globalTimer = new Date().getTime() / 1000; + let cacheTimer = new Date().getTime() / 1000; + let loggerTimer = new Date().getTime() / 1000; + let channelProcessed = 0; + this.channelNewlyProcessed = 0; + for (const channelId of channelIds) { + await this.$fetchChannelOpenTx(channelId); + ++channelProcessed; + + let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); + if (elapsedSeconds > 10) { + elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer); + logger.debug(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + + `(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` + + `elapsed: ${elapsedSeconds} seconds` + ); + loggerTimer = new Date().getTime() / 1000; + } + + elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer); + if (elapsedSeconds > 60) { + logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); + writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + cacheTimer = new Date().getTime() / 1000; + } + } + + if (this.channelNewlyProcessed > 0) { + logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`); + logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); + writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + } + + this.running = false; + } + + public async $fetchChannelOpenTx(channelId: string): Promise { + if (this.fundingTxCache[channelId]) { + return this.fundingTxCache[channelId]; + } + + const parts = channelId.split('x'); + const blockHeight = parts[0]; + const txIdx = parts[1]; + const outputIdx = parts[2]; + + let block = this.blocksCache[blockHeight]; + if (!block) { + const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); + block = await bitcoinClient.getBlock(blockHash, 2); + this.blocksCache[block.height] = block; + } + + const blocksCacheHashes = Object.keys(this.blocksCache).sort(); + if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) { + for (let i = 0; i < 10; ++i) { + delete this.blocksCache[blocksCacheHashes[i]]; + } + } + + this.fundingTxCache[channelId] = { + timestamp: block.time, + txid: block.tx[txIdx].txid, + value: block.tx[txIdx].vout[outputIdx].value, + }; + + ++this.channelNewlyProcessed; + + return this.fundingTxCache[channelId]; + } +} + +export default new FundingTxFetcher; \ No newline at end of file diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts new file mode 100644 index 000000000..a0a256457 --- /dev/null +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -0,0 +1,287 @@ +import DB from '../../../database'; +import { readdirSync, readFileSync } from 'fs'; +import { XMLParser } from 'fast-xml-parser'; +import logger from '../../../logger'; +import fundingTxFetcher from './funding-tx-fetcher'; +import config from '../../../config'; + +interface Node { + id: string; + timestamp: number; + features: string; + rgb_color: string; + alias: string; + addresses: string; + out_degree: number; + in_degree: number; +} + +interface Channel { + scid: string; + source: string; + destination: string; + timestamp: number; + features: string; + fee_base_msat: number; + fee_proportional_millionths: number; + htlc_minimim_msat: number; + cltv_expiry_delta: number; + htlc_maximum_msat: number; +} + +const topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; +const parser = new XMLParser(); + +let latestNodeCount = 1; // Ignore gap in the data + +async function $run(): Promise { + // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); + // logger.info('Caching funding txs for currently existing channels'); + // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); + + await $importHistoricalLightningStats(); +} + +/** + * Parse the file content into XML, and return a list of nodes and channels + */ +function parseFile(fileContent): any { + const graph = parser.parse(fileContent); + if (Object.keys(graph).length === 0) { + return null; + } + + const nodes: Node[] = []; + const channels: Channel[] = []; + + // If there is only one entry, the parser does not return an array, so we override this + if (!Array.isArray(graph.graphml.graph.node)) { + graph.graphml.graph.node = [graph.graphml.graph.node]; + } + if (!Array.isArray(graph.graphml.graph.edge)) { + graph.graphml.graph.edge = [graph.graphml.graph.edge]; + } + + for (const node of graph.graphml.graph.node) { + if (!node.data) { + continue; + } + nodes.push({ + id: node.data[0], + timestamp: node.data[1], + features: node.data[2], + rgb_color: node.data[3], + alias: node.data[4], + addresses: node.data[5], + out_degree: node.data[6], + in_degree: node.data[7], + }); + } + + for (const channel of graph.graphml.graph.edge) { + if (!channel.data) { + continue; + } + channels.push({ + scid: channel.data[0], + source: channel.data[1], + destination: channel.data[2], + timestamp: channel.data[3], + features: channel.data[4], + fee_base_msat: channel.data[5], + fee_proportional_millionths: channel.data[6], + htlc_minimim_msat: channel.data[7], + cltv_expiry_delta: channel.data[8], + htlc_maximum_msat: channel.data[9], + }); + } + + return { + nodes: nodes, + channels: channels, + }; +} + +/** + * Generate LN network stats for one day + */ +async function computeNetworkStats(timestamp: number, networkGraph): Promise { + // Node counts and network shares + let clearnetNodes = 0; + let torNodes = 0; + let clearnetTorNodes = 0; + let unannouncedNodes = 0; + + for (const node of networkGraph.nodes) { + let hasOnion = false; + let hasClearnet = false; + let isUnnanounced = true; + + const sockets = node.addresses.split(','); + for (const socket of sockets) { + hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); + hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); + } + if (hasOnion && hasClearnet) { + clearnetTorNodes++; + isUnnanounced = false; + } else if (hasOnion) { + torNodes++; + isUnnanounced = false; + } else if (hasClearnet) { + clearnetNodes++; + isUnnanounced = false; + } + if (isUnnanounced) { + unannouncedNodes++; + } + } + + // Channels and node historical stats + const nodeStats = {}; + let capacity = 0; + let avgFeeRate = 0; + let avgBaseFee = 0; + const capacities: number[] = []; + const feeRates: number[] = []; + const baseFees: number[] = []; + for (const channel of networkGraph.channels) { + const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); + if (!tx) { + logger.err(`Unable to fetch funding tx for channel ${channel.scid}. Capacity and creation date will stay unknown.`); + continue; + } + + if (!nodeStats[channel.source]) { + nodeStats[channel.source] = { + capacity: 0, + channels: 0, + }; + } + if (!nodeStats[channel.destination]) { + nodeStats[channel.destination] = { + capacity: 0, + channels: 0, + }; + } + + nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.source].channels++; + nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.destination].channels++; + + capacity += Math.round(tx.value * 100000000); + avgFeeRate += channel.fee_proportional_millionths; + avgBaseFee += channel.fee_base_msat; + capacities.push(Math.round(tx.value * 100000000)); + feeRates.push(channel.fee_proportional_millionths); + baseFees.push(channel.fee_base_msat); + } + + avgFeeRate /= networkGraph.channels.length; + avgBaseFee /= networkGraph.channels.length; + const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; + const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; + const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; + + let query = `INSERT INTO lightning_stats( + added, + channel_count, + node_count, + total_capacity, + tor_nodes, + clearnet_nodes, + unannounced_nodes, + clearnet_tor_nodes, + avg_capacity, + avg_fee_rate, + avg_base_fee_mtokens, + med_capacity, + med_fee_rate, + med_base_fee_mtokens + ) + VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; + + await DB.query(query, [ + timestamp, + networkGraph.channels.length, + networkGraph.nodes.length, + capacity, + torNodes, + clearnetNodes, + unannouncedNodes, + clearnetTorNodes, + Math.round(capacity / networkGraph.channels.length), + avgFeeRate, + avgBaseFee, + medCapacity, + medFeeRate, + medBaseFee, + ]); + + for (const public_key of Object.keys(nodeStats)) { + query = `INSERT INTO node_stats( + public_key, + added, + capacity, + channels + ) + VALUES (?, FROM_UNIXTIME(?), ?, ?)`; + + await DB.query(query, [ + public_key, + timestamp, + nodeStats[public_key].capacity, + nodeStats[public_key].channels, + ]); + } +} + +export async function $importHistoricalLightningStats(): Promise { + const fileList = readdirSync(topologiesFolder); + fileList.sort().reverse(); + + const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); + const existingStatsTimestamps = {}; + for (const row of rows) { + existingStatsTimestamps[row.added] = true; + } + + for (const filename of fileList) { + const timestamp = parseInt(filename.split('_')[1], 10); + const fileContent = readFileSync(`${topologiesFolder}/${filename}`, 'utf8'); + + const graph = parseFile(fileContent); + if (!graph) { + continue; + } + + // Ignore drop of more than 90% of the node count as it's probably a missing data point + const diffRatio = graph.nodes.length / latestNodeCount; + if (diffRatio < 0.90) { + continue; + } + latestNodeCount = graph.nodes.length; + + // Stats exist already, don't calculate/insert them + if (existingStatsTimestamps[timestamp] === true) { + continue; + } + + logger.debug(`Processing ${topologiesFolder}/${filename}`); + + const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; + logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); + + // Cache funding txs + logger.debug(`Caching funding txs for ${datestr}`); + await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); + + logger.debug(`Generating LN network stats for ${datestr}`); + await computeNetworkStats(timestamp, graph); + } + + logger.info(`Lightning network stats historical import completed`); +} + +$run().then(() => process.exit(0)); \ No newline at end of file From 91ada9ce751dea183c592f5bad4e4d3a5b1a036e Mon Sep 17 00:00:00 2001 From: nymkappa Date: Mon, 1 Aug 2022 17:48:04 +0200 Subject: [PATCH 02/17] Integrate LN stats importer into the main process --- backend/src/index.ts | 10 +- .../tasks/lightning/stats-updater.service.ts | 5 +- .../lightning/sync-tasks/stats-importer.ts | 472 +++++++++--------- 3 files changed, 246 insertions(+), 241 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index b7159afaf..fa80fb2ad 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -29,11 +29,11 @@ import channelsRoutes from './api/explorer/channels.routes'; import generalLightningRoutes from './api/explorer/general.routes'; import lightningStatsUpdater from './tasks/lightning/stats-updater.service'; import nodeSyncService from './tasks/lightning/node-sync.service'; -import statisticsRoutes from "./api/statistics/statistics.routes"; -import miningRoutes from "./api/mining/mining-routes"; -import bisqRoutes from "./api/bisq/bisq.routes"; -import liquidRoutes from "./api/liquid/liquid.routes"; -import bitcoinRoutes from "./api/bitcoin/bitcoin.routes"; +import statisticsRoutes from './api/statistics/statistics.routes'; +import miningRoutes from './api/mining/mining-routes'; +import bisqRoutes from './api/bisq/bisq.routes'; +import liquidRoutes from './api/liquid/liquid.routes'; +import bitcoinRoutes from './api/bitcoin/bitcoin.routes'; class Server { private wss: WebSocket.Server | undefined; diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index c48b683cd..c5ca55cd8 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -4,11 +4,12 @@ import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; import channelsApi from '../../api/explorer/channels.api'; import { isIP } from 'net'; +import LightningStatsImporter from './sync-tasks/stats-importer'; class LightningStatsUpdater { hardCodedStartTime = '2018-01-12'; - public async $startService() { + public async $startService(): Promise { logger.info('Starting Lightning Stats service'); let isInSync = false; let error: any; @@ -28,6 +29,8 @@ class LightningStatsUpdater { return; } + LightningStatsImporter.$run(); + setTimeout(() => { this.$runTasks(); }, this.timeUntilMidnight()); diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index a0a256457..9dd5751b9 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -29,259 +29,261 @@ interface Channel { htlc_maximum_msat: number; } -const topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; -const parser = new XMLParser(); +class LightningStatsImporter { + topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; + parser = new XMLParser(); -let latestNodeCount = 1; // Ignore gap in the data + latestNodeCount = 1; // Ignore gap in the data -async function $run(): Promise { - // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); - // logger.info('Caching funding txs for currently existing channels'); - // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); - - await $importHistoricalLightningStats(); -} - -/** - * Parse the file content into XML, and return a list of nodes and channels - */ -function parseFile(fileContent): any { - const graph = parser.parse(fileContent); - if (Object.keys(graph).length === 0) { - return null; + async $run(): Promise { + // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); + // logger.info('Caching funding txs for currently existing channels'); + // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); + + await this.$importHistoricalLightningStats(); } - const nodes: Node[] = []; - const channels: Channel[] = []; + /** + * Parse the file content into XML, and return a list of nodes and channels + */ + parseFile(fileContent): any { + const graph = this.parser.parse(fileContent); + if (Object.keys(graph).length === 0) { + return null; + } - // If there is only one entry, the parser does not return an array, so we override this - if (!Array.isArray(graph.graphml.graph.node)) { - graph.graphml.graph.node = [graph.graphml.graph.node]; - } - if (!Array.isArray(graph.graphml.graph.edge)) { - graph.graphml.graph.edge = [graph.graphml.graph.edge]; + const nodes: Node[] = []; + const channels: Channel[] = []; + + // If there is only one entry, the parser does not return an array, so we override this + if (!Array.isArray(graph.graphml.graph.node)) { + graph.graphml.graph.node = [graph.graphml.graph.node]; + } + if (!Array.isArray(graph.graphml.graph.edge)) { + graph.graphml.graph.edge = [graph.graphml.graph.edge]; + } + + for (const node of graph.graphml.graph.node) { + if (!node.data) { + continue; + } + nodes.push({ + id: node.data[0], + timestamp: node.data[1], + features: node.data[2], + rgb_color: node.data[3], + alias: node.data[4], + addresses: node.data[5], + out_degree: node.data[6], + in_degree: node.data[7], + }); + } + + for (const channel of graph.graphml.graph.edge) { + if (!channel.data) { + continue; + } + channels.push({ + scid: channel.data[0], + source: channel.data[1], + destination: channel.data[2], + timestamp: channel.data[3], + features: channel.data[4], + fee_base_msat: channel.data[5], + fee_proportional_millionths: channel.data[6], + htlc_minimim_msat: channel.data[7], + cltv_expiry_delta: channel.data[8], + htlc_maximum_msat: channel.data[9], + }); + } + + return { + nodes: nodes, + channels: channels, + }; } - for (const node of graph.graphml.graph.node) { - if (!node.data) { - continue; - } - nodes.push({ - id: node.data[0], - timestamp: node.data[1], - features: node.data[2], - rgb_color: node.data[3], - alias: node.data[4], - addresses: node.data[5], - out_degree: node.data[6], - in_degree: node.data[7], - }); - } + /** + * Generate LN network stats for one day + */ + async computeNetworkStats(timestamp: number, networkGraph): Promise { + // Node counts and network shares + let clearnetNodes = 0; + let torNodes = 0; + let clearnetTorNodes = 0; + let unannouncedNodes = 0; - for (const channel of graph.graphml.graph.edge) { - if (!channel.data) { - continue; - } - channels.push({ - scid: channel.data[0], - source: channel.data[1], - destination: channel.data[2], - timestamp: channel.data[3], - features: channel.data[4], - fee_base_msat: channel.data[5], - fee_proportional_millionths: channel.data[6], - htlc_minimim_msat: channel.data[7], - cltv_expiry_delta: channel.data[8], - htlc_maximum_msat: channel.data[9], - }); - } + for (const node of networkGraph.nodes) { + let hasOnion = false; + let hasClearnet = false; + let isUnnanounced = true; - return { - nodes: nodes, - channels: channels, - }; -} - -/** - * Generate LN network stats for one day - */ -async function computeNetworkStats(timestamp: number, networkGraph): Promise { - // Node counts and network shares - let clearnetNodes = 0; - let torNodes = 0; - let clearnetTorNodes = 0; - let unannouncedNodes = 0; - - for (const node of networkGraph.nodes) { - let hasOnion = false; - let hasClearnet = false; - let isUnnanounced = true; - - const sockets = node.addresses.split(','); - for (const socket of sockets) { - hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); - hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); - } - if (hasOnion && hasClearnet) { - clearnetTorNodes++; - isUnnanounced = false; - } else if (hasOnion) { - torNodes++; - isUnnanounced = false; - } else if (hasClearnet) { - clearnetNodes++; - isUnnanounced = false; - } - if (isUnnanounced) { - unannouncedNodes++; - } - } - - // Channels and node historical stats - const nodeStats = {}; - let capacity = 0; - let avgFeeRate = 0; - let avgBaseFee = 0; - const capacities: number[] = []; - const feeRates: number[] = []; - const baseFees: number[] = []; - for (const channel of networkGraph.channels) { - const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); - if (!tx) { - logger.err(`Unable to fetch funding tx for channel ${channel.scid}. Capacity and creation date will stay unknown.`); - continue; + const sockets = node.addresses.split(','); + for (const socket of sockets) { + hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); + hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); + } + if (hasOnion && hasClearnet) { + clearnetTorNodes++; + isUnnanounced = false; + } else if (hasOnion) { + torNodes++; + isUnnanounced = false; + } else if (hasClearnet) { + clearnetNodes++; + isUnnanounced = false; + } + if (isUnnanounced) { + unannouncedNodes++; + } } - if (!nodeStats[channel.source]) { - nodeStats[channel.source] = { - capacity: 0, - channels: 0, - }; - } - if (!nodeStats[channel.destination]) { - nodeStats[channel.destination] = { - capacity: 0, - channels: 0, - }; + // Channels and node historical stats + const nodeStats = {}; + let capacity = 0; + let avgFeeRate = 0; + let avgBaseFee = 0; + const capacities: number[] = []; + const feeRates: number[] = []; + const baseFees: number[] = []; + for (const channel of networkGraph.channels) { + const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); + if (!tx) { + logger.err(`Unable to fetch funding tx for channel ${channel.scid}. Capacity and creation date will stay unknown.`); + continue; + } + + if (!nodeStats[channel.source]) { + nodeStats[channel.source] = { + capacity: 0, + channels: 0, + }; + } + if (!nodeStats[channel.destination]) { + nodeStats[channel.destination] = { + capacity: 0, + channels: 0, + }; + } + + nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.source].channels++; + nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); + nodeStats[channel.destination].channels++; + + capacity += Math.round(tx.value * 100000000); + avgFeeRate += channel.fee_proportional_millionths; + avgBaseFee += channel.fee_base_msat; + capacities.push(Math.round(tx.value * 100000000)); + feeRates.push(channel.fee_proportional_millionths); + baseFees.push(channel.fee_base_msat); } - nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); - nodeStats[channel.source].channels++; - nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); - nodeStats[channel.destination].channels++; - - capacity += Math.round(tx.value * 100000000); - avgFeeRate += channel.fee_proportional_millionths; - avgBaseFee += channel.fee_base_msat; - capacities.push(Math.round(tx.value * 100000000)); - feeRates.push(channel.fee_proportional_millionths); - baseFees.push(channel.fee_base_msat); - } - - avgFeeRate /= networkGraph.channels.length; - avgBaseFee /= networkGraph.channels.length; - const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; - const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; - const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; - - let query = `INSERT INTO lightning_stats( - added, - channel_count, - node_count, - total_capacity, - tor_nodes, - clearnet_nodes, - unannounced_nodes, - clearnet_tor_nodes, - avg_capacity, - avg_fee_rate, - avg_base_fee_mtokens, - med_capacity, - med_fee_rate, - med_base_fee_mtokens - ) - VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; - - await DB.query(query, [ - timestamp, - networkGraph.channels.length, - networkGraph.nodes.length, - capacity, - torNodes, - clearnetNodes, - unannouncedNodes, - clearnetTorNodes, - Math.round(capacity / networkGraph.channels.length), - avgFeeRate, - avgBaseFee, - medCapacity, - medFeeRate, - medBaseFee, - ]); - - for (const public_key of Object.keys(nodeStats)) { - query = `INSERT INTO node_stats( - public_key, + avgFeeRate /= networkGraph.channels.length; + avgBaseFee /= networkGraph.channels.length; + const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; + const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; + const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; + + let query = `INSERT INTO lightning_stats( added, - capacity, - channels + channel_count, + node_count, + total_capacity, + tor_nodes, + clearnet_nodes, + unannounced_nodes, + clearnet_tor_nodes, + avg_capacity, + avg_fee_rate, + avg_base_fee_mtokens, + med_capacity, + med_fee_rate, + med_base_fee_mtokens ) - VALUES (?, FROM_UNIXTIME(?), ?, ?)`; - + VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; + await DB.query(query, [ - public_key, timestamp, - nodeStats[public_key].capacity, - nodeStats[public_key].channels, + networkGraph.channels.length, + networkGraph.nodes.length, + capacity, + torNodes, + clearnetNodes, + unannouncedNodes, + clearnetTorNodes, + Math.round(capacity / networkGraph.channels.length), + avgFeeRate, + avgBaseFee, + medCapacity, + medFeeRate, + medBaseFee, ]); + + for (const public_key of Object.keys(nodeStats)) { + query = `INSERT INTO node_stats( + public_key, + added, + capacity, + channels + ) + VALUES (?, FROM_UNIXTIME(?), ?, ?)`; + + await DB.query(query, [ + public_key, + timestamp, + nodeStats[public_key].capacity, + nodeStats[public_key].channels, + ]); + } + } + + async $importHistoricalLightningStats(): Promise { + const fileList = readdirSync(this.topologiesFolder); + fileList.sort().reverse(); + + const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); + const existingStatsTimestamps = {}; + for (const row of rows) { + existingStatsTimestamps[row.added] = true; + } + + for (const filename of fileList) { + const timestamp = parseInt(filename.split('_')[1], 10); + const fileContent = readFileSync(`${this.topologiesFolder}/${filename}`, 'utf8'); + + const graph = this.parseFile(fileContent); + if (!graph) { + continue; + } + + // Ignore drop of more than 90% of the node count as it's probably a missing data point + const diffRatio = graph.nodes.length / this.latestNodeCount; + if (diffRatio < 0.90) { + continue; + } + this.latestNodeCount = graph.nodes.length; + + // Stats exist already, don't calculate/insert them + if (existingStatsTimestamps[timestamp] === true) { + continue; + } + + logger.debug(`Processing ${this.topologiesFolder}/${filename}`); + + const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; + logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); + + // Cache funding txs + logger.debug(`Caching funding txs for ${datestr}`); + await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); + + logger.debug(`Generating LN network stats for ${datestr}`); + await this.computeNetworkStats(timestamp, graph); + } + + logger.info(`Lightning network stats historical import completed`); } } -export async function $importHistoricalLightningStats(): Promise { - const fileList = readdirSync(topologiesFolder); - fileList.sort().reverse(); - - const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); - const existingStatsTimestamps = {}; - for (const row of rows) { - existingStatsTimestamps[row.added] = true; - } - - for (const filename of fileList) { - const timestamp = parseInt(filename.split('_')[1], 10); - const fileContent = readFileSync(`${topologiesFolder}/${filename}`, 'utf8'); - - const graph = parseFile(fileContent); - if (!graph) { - continue; - } - - // Ignore drop of more than 90% of the node count as it's probably a missing data point - const diffRatio = graph.nodes.length / latestNodeCount; - if (diffRatio < 0.90) { - continue; - } - latestNodeCount = graph.nodes.length; - - // Stats exist already, don't calculate/insert them - if (existingStatsTimestamps[timestamp] === true) { - continue; - } - - logger.debug(`Processing ${topologiesFolder}/${filename}`); - - const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; - logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); - - // Cache funding txs - logger.debug(`Caching funding txs for ${datestr}`); - await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); - - logger.debug(`Generating LN network stats for ${datestr}`); - await computeNetworkStats(timestamp, graph); - } - - logger.info(`Lightning network stats historical import completed`); -} - -$run().then(() => process.exit(0)); \ No newline at end of file +export default new LightningStatsImporter; \ No newline at end of file From 2daf94f65a5a3687140190800ca780e95737358a Mon Sep 17 00:00:00 2001 From: nymkappa Date: Mon, 1 Aug 2022 18:21:45 +0200 Subject: [PATCH 03/17] Re-use LN stats importer code to log daily LN stats --- .../tasks/lightning/stats-updater.service.ts | 111 ++---------------- .../lightning/sync-tasks/stats-importer.ts | 4 +- 2 files changed, 11 insertions(+), 104 deletions(-) diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index c5ca55cd8..d093892bb 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -56,116 +56,21 @@ class LightningStatsUpdater { } private async $runTasks(): Promise { - await this.$logLightningStatsDaily(); - await this.$logNodeStatsDaily(); + await this.$logStatsDaily(); setTimeout(() => { this.$runTasks(); }, this.timeUntilMidnight()); } - private async $logLightningStatsDaily() { - try { - logger.info(`Running lightning daily stats log...`); + private async $logStatsDaily(): Promise { + const date = new Date(); + this.setDateMidnight(date); + date.setUTCHours(24); - const networkGraph = await lightningApi.$getNetworkGraph(); - let total_capacity = 0; - for (const channel of networkGraph.edges) { - if (channel.capacity) { - total_capacity += parseInt(channel.capacity); - } - } - - let clearnetNodes = 0; - let torNodes = 0; - let unannouncedNodes = 0; - for (const node of networkGraph.nodes) { - for (const socket of node.addresses) { - const hasOnion = socket.addr.indexOf('.onion') !== -1; - if (hasOnion) { - torNodes++; - } - const hasClearnet = [4, 6].includes(isIP(socket.split(':')[0])); - if (hasClearnet) { - clearnetNodes++; - } - } - if (node.addresses.length === 0) { - unannouncedNodes++; - } - } - - const channelStats = await channelsApi.$getChannelsStats(); - - const query = `INSERT INTO lightning_stats( - added, - channel_count, - node_count, - total_capacity, - tor_nodes, - clearnet_nodes, - unannounced_nodes, - avg_capacity, - avg_fee_rate, - avg_base_fee_mtokens, - med_capacity, - med_fee_rate, - med_base_fee_mtokens - ) - VALUES (NOW() - INTERVAL 1 DAY, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; - - await DB.query(query, [ - networkGraph.edges.length, - networkGraph.nodes.length, - total_capacity, - torNodes, - clearnetNodes, - unannouncedNodes, - channelStats.avgCapacity, - channelStats.avgFeeRate, - channelStats.avgBaseFee, - channelStats.medianCapacity, - channelStats.medianFeeRate, - channelStats.medianBaseFee, - ]); - logger.info(`Lightning daily stats done.`); - } catch (e) { - logger.err('$logLightningStatsDaily() error: ' + (e instanceof Error ? e.message : e)); - } - } - - private async $logNodeStatsDaily() { - try { - logger.info(`Running daily node stats update...`); - - const query = ` - SELECT nodes.public_key, c1.channels_count_left, c2.channels_count_right, c1.channels_capacity_left, - c2.channels_capacity_right - FROM nodes - LEFT JOIN ( - SELECT node1_public_key, COUNT(id) AS channels_count_left, SUM(capacity) AS channels_capacity_left - FROM channels - WHERE channels.status = 1 - GROUP BY node1_public_key - ) c1 ON c1.node1_public_key = nodes.public_key - LEFT JOIN ( - SELECT node2_public_key, COUNT(id) AS channels_count_right, SUM(capacity) AS channels_capacity_right - FROM channels WHERE channels.status = 1 GROUP BY node2_public_key - ) c2 ON c2.node2_public_key = nodes.public_key - `; - - const [nodes]: any = await DB.query(query); - - for (const node of nodes) { - await DB.query( - `INSERT INTO node_stats(public_key, added, capacity, channels) VALUES (?, NOW() - INTERVAL 1 DAY, ?, ?)`, - [node.public_key, (parseInt(node.channels_capacity_left || 0, 10)) + (parseInt(node.channels_capacity_right || 0, 10)), - node.channels_count_left + node.channels_count_right]); - } - logger.info('Daily node stats has updated.'); - } catch (e) { - logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e)); - } + logger.info(`Running lightning daily stats log...`); + const networkGraph = await lightningApi.$getNetworkGraph(); + LightningStatsImporter.computeNetworkStats(date.getTime(), networkGraph); } } diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index 9dd5751b9..f6d70df7d 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -36,6 +36,8 @@ class LightningStatsImporter { latestNodeCount = 1; // Ignore gap in the data async $run(): Promise { + logger.info(`Importing historical lightning stats`); + // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); // logger.info('Caching funding txs for currently existing channels'); // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); @@ -106,7 +108,7 @@ class LightningStatsImporter { /** * Generate LN network stats for one day */ - async computeNetworkStats(timestamp: number, networkGraph): Promise { + public async computeNetworkStats(timestamp: number, networkGraph): Promise { // Node counts and network shares let clearnetNodes = 0; let torNodes = 0; From b246c6f4c3a28ce7946b3113a9e7ac848a2752d9 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Mon, 1 Aug 2022 19:50:55 +0200 Subject: [PATCH 04/17] We don't need a synced node to import historical data --- .../tasks/lightning/stats-updater.service.ts | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index d093892bb..f364629b9 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -1,9 +1,6 @@ - import DB from '../../database'; import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; -import channelsApi from '../../api/explorer/channels.api'; -import { isIP } from 'net'; import LightningStatsImporter from './sync-tasks/stats-importer'; class LightningStatsUpdater { @@ -11,23 +8,6 @@ class LightningStatsUpdater { public async $startService(): Promise { logger.info('Starting Lightning Stats service'); - let isInSync = false; - let error: any; - try { - error = null; - isInSync = await this.$lightningIsSynced(); - } catch (e) { - error = e; - } - if (!isInSync) { - if (error) { - logger.warn('Was not able to fetch Lightning Node status: ' + (error instanceof Error ? error.message : error) + '. Retrying in 1 minute...'); - } else { - logger.notice('The Lightning graph is not yet in sync. Retrying in 1 minute...'); - } - setTimeout(() => this.$startService(), 60 * 1000); - return; - } LightningStatsImporter.$run(); @@ -50,11 +30,6 @@ class LightningStatsUpdater { date.setUTCMilliseconds(0); } - private async $lightningIsSynced(): Promise { - const nodeInfo = await lightningApi.$getInfo(); - return nodeInfo.synced_to_chain && nodeInfo.synced_to_graph; - } - private async $runTasks(): Promise { await this.$logStatsDaily(); From 528749089455e667407910c79575d53be5162003 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 12:19:57 +0200 Subject: [PATCH 05/17] Make sure to not count channels twice --- .../tasks/lightning/stats-updater.service.ts | 1 - .../sync-tasks/funding-tx-fetcher.ts | 34 ++++++++-- .../lightning/sync-tasks/stats-importer.ts | 65 +++++++++++-------- 3 files changed, 66 insertions(+), 34 deletions(-) diff --git a/backend/src/tasks/lightning/stats-updater.service.ts b/backend/src/tasks/lightning/stats-updater.service.ts index f364629b9..5701ef22a 100644 --- a/backend/src/tasks/lightning/stats-updater.service.ts +++ b/backend/src/tasks/lightning/stats-updater.service.ts @@ -1,4 +1,3 @@ -import DB from '../../database'; import logger from '../../logger'; import lightningApi from '../../api/lightning/lightning-api-factory'; import LightningStatsImporter from './sync-tasks/stats-importer'; diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index b9407c44d..4068de8f1 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -1,8 +1,11 @@ -import { existsSync, readFileSync, writeFileSync } from 'fs'; +import { existsSync, promises } from 'fs'; import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; import config from '../../../config'; +import DB from '../../../database'; import logger from '../../../logger'; +const fsPromises = promises; + const BLOCKS_CACHE_MAX_SIZE = 100; const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json'; @@ -21,7 +24,7 @@ class FundingTxFetcher { // Load funding tx disk cache if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { try { - this.fundingTxCache = JSON.parse(readFileSync(CACHE_FILE_NAME, 'utf-8')); + this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8')); } catch (e) { logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`); this.fundingTxCache = {}; @@ -51,7 +54,7 @@ class FundingTxFetcher { elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer); if (elapsedSeconds > 60) { logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); - writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); cacheTimer = new Date().getTime() / 1000; } } @@ -59,7 +62,7 @@ class FundingTxFetcher { if (this.channelNewlyProcessed > 0) { logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`); logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); - writeFileSync(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); + fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); } this.running = false; @@ -76,13 +79,30 @@ class FundingTxFetcher { const outputIdx = parts[2]; let block = this.blocksCache[blockHeight]; + // Check if we have the block in the `blocks_summaries` table to avoid calling core + if (!block) { + const [rows] = await DB.query(` + SELECT UNIX_TIMESTAMP(blocks.blockTimestamp) AS time, blocks_summaries.transactions AS tx + FROM blocks_summaries + JOIN blocks ON blocks.hash = blocks_summaries.id + WHERE blocks_summaries.height = ${blockHeight} + `); + block = rows[0] ?? null; + if (block) { + block.tx = JSON.parse(block.tx); + if (block.tx.length === 0) { + block = null; + } + } + } + // Fetch it from core if (!block) { const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); block = await bitcoinClient.getBlock(blockHash, 2); - this.blocksCache[block.height] = block; } + this.blocksCache[block.height] = block; - const blocksCacheHashes = Object.keys(this.blocksCache).sort(); + const blocksCacheHashes = Object.keys(this.blocksCache).sort((a, b) => parseInt(b) - parseInt(a)).reverse(); if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) { for (let i = 0; i < 10; ++i) { delete this.blocksCache[blocksCacheHashes[i]]; @@ -92,7 +112,7 @@ class FundingTxFetcher { this.fundingTxCache[channelId] = { timestamp: block.time, txid: block.tx[txIdx].txid, - value: block.tx[txIdx].vout[outputIdx].value, + value: block.tx[txIdx].value / 100000000 ?? block.tx[txIdx].vout[outputIdx].value, }; ++this.channelNewlyProcessed; diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index f6d70df7d..8482b558c 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -1,10 +1,12 @@ import DB from '../../../database'; -import { readdirSync, readFileSync } from 'fs'; +import { promises } from 'fs'; import { XMLParser } from 'fast-xml-parser'; import logger from '../../../logger'; import fundingTxFetcher from './funding-tx-fetcher'; import config from '../../../config'; +const fsPromises = promises; + interface Node { id: string; timestamp: number; @@ -33,14 +35,12 @@ class LightningStatsImporter { topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; parser = new XMLParser(); - latestNodeCount = 1; // Ignore gap in the data - async $run(): Promise { logger.info(`Importing historical lightning stats`); - // const [channels]: any[] = await DB.query('SELECT short_id from channels;'); - // logger.info('Caching funding txs for currently existing channels'); - // await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); + const [channels]: any[] = await DB.query('SELECT short_id from channels;'); + logger.info('Caching funding txs for currently existing channels'); + await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); await this.$importHistoricalLightningStats(); } @@ -148,6 +148,8 @@ class LightningStatsImporter { const capacities: number[] = []; const feeRates: number[] = []; const baseFees: number[] = []; + const alreadyCountedChannels = {}; + for (const channel of networkGraph.channels) { const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); if (!tx) { @@ -173,10 +175,14 @@ class LightningStatsImporter { nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); nodeStats[channel.destination].channels++; - capacity += Math.round(tx.value * 100000000); + if (!alreadyCountedChannels[channel.scid.slice(0, -2)]) { + capacity += Math.round(tx.value * 100000000); + capacities.push(Math.round(tx.value * 100000000)); + alreadyCountedChannels[channel.scid.slice(0, -2)] = true; + } + avgFeeRate += channel.fee_proportional_millionths; avgBaseFee += channel.fee_base_msat; - capacities.push(Math.round(tx.value * 100000000)); feeRates.push(channel.fee_proportional_millionths); baseFees.push(channel.fee_base_msat); } @@ -186,6 +192,7 @@ class LightningStatsImporter { const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; + const avgCapacity = Math.round(capacity / capacities.length); let query = `INSERT INTO lightning_stats( added, @@ -207,14 +214,14 @@ class LightningStatsImporter { await DB.query(query, [ timestamp, - networkGraph.channels.length, + capacities.length, networkGraph.nodes.length, capacity, torNodes, clearnetNodes, unannouncedNodes, clearnetTorNodes, - Math.round(capacity / networkGraph.channels.length), + avgCapacity, avgFeeRate, avgBaseFee, medCapacity, @@ -241,10 +248,10 @@ class LightningStatsImporter { } async $importHistoricalLightningStats(): Promise { - const fileList = readdirSync(this.topologiesFolder); + const fileList = await fsPromises.readdir(this.topologiesFolder); fileList.sort().reverse(); - const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added FROM lightning_stats'); + const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) AS added FROM lightning_stats'); const existingStatsTimestamps = {}; for (const row of rows) { existingStatsTimestamps[row.added] = true; @@ -252,26 +259,30 @@ class LightningStatsImporter { for (const filename of fileList) { const timestamp = parseInt(filename.split('_')[1], 10); - const fileContent = readFileSync(`${this.topologiesFolder}/${filename}`, 'utf8'); - - const graph = this.parseFile(fileContent); - if (!graph) { - continue; - } - - // Ignore drop of more than 90% of the node count as it's probably a missing data point - const diffRatio = graph.nodes.length / this.latestNodeCount; - if (diffRatio < 0.90) { - continue; - } - this.latestNodeCount = graph.nodes.length; // Stats exist already, don't calculate/insert them - if (existingStatsTimestamps[timestamp] === true) { + if (existingStatsTimestamps[timestamp] !== undefined) { continue; } logger.debug(`Processing ${this.topologiesFolder}/${filename}`); + const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); + + let graph; + if (filename.indexOf('.json') !== -1) { + try { + graph = JSON.parse(fileContent); + } catch (e) { + logger.debug(`Invalid topology file, cannot parse the content`); + } + } else { + graph = this.parseFile(fileContent); + if (!graph) { + logger.debug(`Invalid topology file, cannot parse the content`); + continue; + } + await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); + } const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); @@ -282,6 +293,8 @@ class LightningStatsImporter { logger.debug(`Generating LN network stats for ${datestr}`); await this.computeNetworkStats(timestamp, graph); + + existingStatsTimestamps[timestamp] = true; } logger.info(`Lightning network stats historical import completed`); From 7fdf95ad3403cca41dd29aa61f8bbadab5a189b5 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 13:03:32 +0200 Subject: [PATCH 06/17] Remove buggy tx vout value fetching and improve performances --- .../sync-tasks/funding-tx-fetcher.ts | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index 4068de8f1..9da721876 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -1,4 +1,5 @@ import { existsSync, promises } from 'fs'; +import bitcoinApiFactory from '../../../api/bitcoin/bitcoin-api-factory'; import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; import config from '../../../config'; import DB from '../../../database'; @@ -79,26 +80,10 @@ class FundingTxFetcher { const outputIdx = parts[2]; let block = this.blocksCache[blockHeight]; - // Check if we have the block in the `blocks_summaries` table to avoid calling core - if (!block) { - const [rows] = await DB.query(` - SELECT UNIX_TIMESTAMP(blocks.blockTimestamp) AS time, blocks_summaries.transactions AS tx - FROM blocks_summaries - JOIN blocks ON blocks.hash = blocks_summaries.id - WHERE blocks_summaries.height = ${blockHeight} - `); - block = rows[0] ?? null; - if (block) { - block.tx = JSON.parse(block.tx); - if (block.tx.length === 0) { - block = null; - } - } - } // Fetch it from core if (!block) { const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); - block = await bitcoinClient.getBlock(blockHash, 2); + block = await bitcoinClient.getBlock(blockHash, 1); } this.blocksCache[block.height] = block; @@ -109,10 +94,14 @@ class FundingTxFetcher { } } + const txid = block.tx[txIdx]; + const rawTx = await bitcoinClient.getRawTransaction(txid); + const tx = await bitcoinClient.decodeRawTransaction(rawTx); + this.fundingTxCache[channelId] = { timestamp: block.time, - txid: block.tx[txIdx].txid, - value: block.tx[txIdx].value / 100000000 ?? block.tx[txIdx].vout[outputIdx].value, + txid: txid, + value: tx.vout[outputIdx].value, }; ++this.channelNewlyProcessed; From 5d7e42195f4f666bb112b5f3deba02e90cafbf2e Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 15:02:24 +0200 Subject: [PATCH 07/17] Reduce massive gaps in the imported historical LN data --- .../tasks/lightning/sync-tasks/stats-importer.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index 8482b558c..4f7c5ca04 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -248,6 +248,8 @@ class LightningStatsImporter { } async $importHistoricalLightningStats(): Promise { + let latestNodeCount = 1; + const fileList = await fsPromises.readdir(this.topologiesFolder); fileList.sort().reverse(); @@ -284,6 +286,17 @@ class LightningStatsImporter { await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); } + if (timestamp > 1556316000) { + // "No, the reason most likely is just that I started collection in 2019, + // so what I had before that is just the survivors from before, which weren't that many" + const diffRatio = graph.nodes.length / latestNodeCount; + if (diffRatio < 0.9) { + // Ignore drop of more than 90% of the node count as it's probably a missing data point + continue; + } + } + latestNodeCount = graph.nodes.length; + const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); From d7f2f4136c9a0fb13a19bbf7bd0ce407a18af19b Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 15:58:29 +0200 Subject: [PATCH 08/17] Small cleanup --- .../src/tasks/lightning/sync-tasks/stats-importer.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index 4f7c5ca04..5c6a6c5a2 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -151,9 +151,11 @@ class LightningStatsImporter { const alreadyCountedChannels = {}; for (const channel of networkGraph.channels) { - const tx = await fundingTxFetcher.$fetchChannelOpenTx(channel.scid.slice(0, -2)); + const short_id = channel.scid.slice(0, -2); + + const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id); if (!tx) { - logger.err(`Unable to fetch funding tx for channel ${channel.scid}. Capacity and creation date will stay unknown.`); + logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`); continue; } @@ -175,10 +177,10 @@ class LightningStatsImporter { nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); nodeStats[channel.destination].channels++; - if (!alreadyCountedChannels[channel.scid.slice(0, -2)]) { + if (!alreadyCountedChannels[short_id]) { capacity += Math.round(tx.value * 100000000); capacities.push(Math.round(tx.value * 100000000)); - alreadyCountedChannels[channel.scid.slice(0, -2)] = true; + alreadyCountedChannels[short_id] = true; } avgFeeRate += channel.fee_proportional_millionths; From 5b521cfc7cc9160a1b722c8beb3fadccf3d31576 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 17:56:46 +0200 Subject: [PATCH 09/17] Don't insert gapped gossip data upon restart --- backend/src/tasks/lightning/sync-tasks/stats-importer.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index 5c6a6c5a2..f99529e02 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -255,10 +255,10 @@ class LightningStatsImporter { const fileList = await fsPromises.readdir(this.topologiesFolder); fileList.sort().reverse(); - const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) AS added FROM lightning_stats'); + const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added, node_count FROM lightning_stats'); const existingStatsTimestamps = {}; for (const row of rows) { - existingStatsTimestamps[row.added] = true; + existingStatsTimestamps[row.added] = rows[0]; } for (const filename of fileList) { @@ -266,6 +266,7 @@ class LightningStatsImporter { // Stats exist already, don't calculate/insert them if (existingStatsTimestamps[timestamp] !== undefined) { + latestNodeCount = existingStatsTimestamps[timestamp].node_count; continue; } From b6ba3c57811693f2deaa86a785eb15ee4fb7b707 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 18:15:34 +0200 Subject: [PATCH 10/17] Ignore channels fee rate > 5000ppm or base fee > 5000 in stats --- .../lightning/sync-tasks/stats-importer.ts | 144 ++++++++++-------- 1 file changed, 77 insertions(+), 67 deletions(-) diff --git a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts index f99529e02..91e67f77d 100644 --- a/backend/src/tasks/lightning/sync-tasks/stats-importer.ts +++ b/backend/src/tasks/lightning/sync-tasks/stats-importer.ts @@ -45,70 +45,10 @@ class LightningStatsImporter { await this.$importHistoricalLightningStats(); } - /** - * Parse the file content into XML, and return a list of nodes and channels - */ - parseFile(fileContent): any { - const graph = this.parser.parse(fileContent); - if (Object.keys(graph).length === 0) { - return null; - } - - const nodes: Node[] = []; - const channels: Channel[] = []; - - // If there is only one entry, the parser does not return an array, so we override this - if (!Array.isArray(graph.graphml.graph.node)) { - graph.graphml.graph.node = [graph.graphml.graph.node]; - } - if (!Array.isArray(graph.graphml.graph.edge)) { - graph.graphml.graph.edge = [graph.graphml.graph.edge]; - } - - for (const node of graph.graphml.graph.node) { - if (!node.data) { - continue; - } - nodes.push({ - id: node.data[0], - timestamp: node.data[1], - features: node.data[2], - rgb_color: node.data[3], - alias: node.data[4], - addresses: node.data[5], - out_degree: node.data[6], - in_degree: node.data[7], - }); - } - - for (const channel of graph.graphml.graph.edge) { - if (!channel.data) { - continue; - } - channels.push({ - scid: channel.data[0], - source: channel.data[1], - destination: channel.data[2], - timestamp: channel.data[3], - features: channel.data[4], - fee_base_msat: channel.data[5], - fee_proportional_millionths: channel.data[6], - htlc_minimim_msat: channel.data[7], - cltv_expiry_delta: channel.data[8], - htlc_maximum_msat: channel.data[9], - }); - } - - return { - nodes: nodes, - channels: channels, - }; - } - /** * Generate LN network stats for one day */ - public async computeNetworkStats(timestamp: number, networkGraph): Promise { + public async computeNetworkStats(timestamp: number, networkGraph): Promise { // Node counts and network shares let clearnetNodes = 0; let torNodes = 0; @@ -183,10 +123,15 @@ class LightningStatsImporter { alreadyCountedChannels[short_id] = true; } - avgFeeRate += channel.fee_proportional_millionths; - avgBaseFee += channel.fee_base_msat; - feeRates.push(channel.fee_proportional_millionths); - baseFees.push(channel.fee_base_msat); + if (channel.fee_proportional_millionths < 5000) { + avgFeeRate += channel.fee_proportional_millionths; + feeRates.push(channel.fee_proportional_millionths); + } + + if (channel.fee_base_msat < 5000) { + avgBaseFee += channel.fee_base_msat; + baseFees.push(channel.fee_base_msat); + } } avgFeeRate /= networkGraph.channels.length; @@ -247,6 +192,11 @@ class LightningStatsImporter { nodeStats[public_key].channels, ]); } + + return { + added: timestamp, + node_count: networkGraph.nodes.length + }; } async $importHistoricalLightningStats(): Promise { @@ -308,13 +258,73 @@ class LightningStatsImporter { await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); logger.debug(`Generating LN network stats for ${datestr}`); - await this.computeNetworkStats(timestamp, graph); + const stat = await this.computeNetworkStats(timestamp, graph); - existingStatsTimestamps[timestamp] = true; + existingStatsTimestamps[timestamp] = stat; } logger.info(`Lightning network stats historical import completed`); } + + /** + * Parse the file content into XML, and return a list of nodes and channels + */ + private parseFile(fileContent): any { + const graph = this.parser.parse(fileContent); + if (Object.keys(graph).length === 0) { + return null; + } + + const nodes: Node[] = []; + const channels: Channel[] = []; + + // If there is only one entry, the parser does not return an array, so we override this + if (!Array.isArray(graph.graphml.graph.node)) { + graph.graphml.graph.node = [graph.graphml.graph.node]; + } + if (!Array.isArray(graph.graphml.graph.edge)) { + graph.graphml.graph.edge = [graph.graphml.graph.edge]; + } + + for (const node of graph.graphml.graph.node) { + if (!node.data) { + continue; + } + nodes.push({ + id: node.data[0], + timestamp: node.data[1], + features: node.data[2], + rgb_color: node.data[3], + alias: node.data[4], + addresses: node.data[5], + out_degree: node.data[6], + in_degree: node.data[7], + }); + } + + for (const channel of graph.graphml.graph.edge) { + if (!channel.data) { + continue; + } + channels.push({ + scid: channel.data[0], + source: channel.data[1], + destination: channel.data[2], + timestamp: channel.data[3], + features: channel.data[4], + fee_base_msat: channel.data[5], + fee_proportional_millionths: channel.data[6], + htlc_minimim_msat: channel.data[7], + cltv_expiry_delta: channel.data[8], + htlc_maximum_msat: channel.data[9], + }); + } + + return { + nodes: nodes, + channels: channels, + }; + } } export default new LightningStatsImporter; \ No newline at end of file From 3f83e517f05828af08175cb417d8a2d4b87b8cf4 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Fri, 29 Jul 2022 08:08:22 +0200 Subject: [PATCH 11/17] Create CLightningClient class --- backend/src/rpc-api/core-lightning/jsonrpc.ts | 249 ++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 backend/src/rpc-api/core-lightning/jsonrpc.ts diff --git a/backend/src/rpc-api/core-lightning/jsonrpc.ts b/backend/src/rpc-api/core-lightning/jsonrpc.ts new file mode 100644 index 000000000..037dfff75 --- /dev/null +++ b/backend/src/rpc-api/core-lightning/jsonrpc.ts @@ -0,0 +1,249 @@ +'use strict'; + +const methods = [ + 'addgossip', + 'autocleaninvoice', + 'check', + 'checkmessage', + 'close', + 'connect', + 'createinvoice', + 'createinvoicerequest', + 'createoffer', + 'createonion', + 'decode', + 'decodepay', + 'delexpiredinvoice', + 'delinvoice', + 'delpay', + 'dev-listaddrs', + 'dev-rescan-outputs', + 'disableoffer', + 'disconnect', + 'estimatefees', + 'feerates', + 'fetchinvoice', + 'fundchannel', + 'fundchannel_cancel', + 'fundchannel_complete', + 'fundchannel_start', + 'fundpsbt', + 'getchaininfo', + 'getinfo', + 'getlog', + 'getrawblockbyheight', + 'getroute', + 'getsharedsecret', + 'getutxout', + 'help', + 'invoice', + 'keysend', + 'legacypay', + 'listchannels', + 'listconfigs', + 'listforwards', + 'listfunds', + 'listinvoices', + 'listnodes', + 'listoffers', + 'listpays', + 'listpeers', + 'listsendpays', + 'listtransactions', + 'multifundchannel', + 'multiwithdraw', + 'newaddr', + 'notifications', + 'offer', + 'offerout', + 'openchannel_abort', + 'openchannel_bump', + 'openchannel_init', + 'openchannel_signed', + 'openchannel_update', + 'pay', + 'payersign', + 'paystatus', + 'ping', + 'plugin', + 'reserveinputs', + 'sendinvoice', + 'sendonion', + 'sendonionmessage', + 'sendpay', + 'sendpsbt', + 'sendrawtransaction', + 'setchannelfee', + 'signmessage', + 'signpsbt', + 'stop', + 'txdiscard', + 'txprepare', + 'txsend', + 'unreserveinputs', + 'utxopsbt', + 'waitanyinvoice', + 'waitblockheight', + 'waitinvoice', + 'waitsendpay', + 'withdraw' +]; + + +import EventEmitter from 'events'; +import { existsSync, statSync } from 'fs'; +import { createConnection, Socket } from 'net'; +import { homedir } from 'os'; +import path from 'path'; +import { createInterface, Interface } from 'readline'; +import logger from '../../logger'; + +class LightningError extends Error { + type: string = 'lightning'; + message: string = 'lightning-client error'; + + constructor(error) { + super(); + this.type = error.type; + this.message = error.message; + } +} + +const defaultRpcPath = path.join(homedir(), '.lightning') + , fStat = (...p) => statSync(path.join(...p)) + , fExists = (...p) => existsSync(path.join(...p)) + +class CLightningClient extends EventEmitter { + private rpcPath: string; + private reconnectWait: number; + private reconnectTimeout; + private reqcount: number; + private client: Socket; + private rl: Interface; + private clientConnectionPromise: Promise; + + constructor(rpcPath = defaultRpcPath) { + if (!path.isAbsolute(rpcPath)) { + throw new Error('The rpcPath must be an absolute path'); + } + + if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) { + // network directory provided, use the lightning-rpc within in + if (fExists(rpcPath, 'lightning-rpc')) { + rpcPath = path.join(rpcPath, 'lightning-rpc'); + } + + // main data directory provided, default to using the bitcoin mainnet subdirectory + // to be removed in v0.2.0 + else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) { + logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`) + logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`) + rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc') + } + } + + logger.debug(`[CLightningClient] Connecting to ${rpcPath}`); + + super(); + this.rpcPath = rpcPath; + this.reconnectWait = 0.5; + this.reconnectTimeout = null; + this.reqcount = 0; + + const _self = this; + + this.client = createConnection(rpcPath); + this.rl = createInterface({ input: this.client }) + + this.clientConnectionPromise = new Promise(resolve => { + _self.client.on('connect', () => { + logger.debug(`[CLightningClient] Lightning client connected`); + _self.reconnectWait = 1; + resolve(); + }); + + _self.client.on('end', () => { + logger.err('[CLightningClient] Lightning client connection closed, reconnecting'); + _self.increaseWaitTime(); + _self.reconnect(); + }); + + _self.client.on('error', error => { + logger.err(`[CLightningClient] Lightning client connection error: ${error}`); + _self.emit('error', error); + _self.increaseWaitTime(); + _self.reconnect(); + }); + }); + + this.rl.on('line', line => { + line = line.trim(); + if (!line) { + return; + } + const data = JSON.parse(line); + logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`); + _self.emit('res:' + data.id, data); + }); + } + + increaseWaitTime(): void { + if (this.reconnectWait >= 16) { + this.reconnectWait = 16; + } else { + this.reconnectWait *= 2; + } + } + + reconnect(): void { + const _self = this; + + if (this.reconnectTimeout) { + return; + } + + this.reconnectTimeout = setTimeout(() => { + logger.debug('[CLightningClient] Trying to reconnect...'); + + _self.client.connect(_self.rpcPath); + _self.reconnectTimeout = null; + }, this.reconnectWait * 1000); + } + + call(method, args = []): Promise { + const _self = this; + + const callInt = ++this.reqcount; + const sendObj = { + jsonrpc: '2.0', + method, + params: args, + id: '' + callInt + }; + + logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`); + + // Wait for the client to connect + return this.clientConnectionPromise + .then(() => new Promise((resolve, reject) => { + // Wait for a response + this.once('res:' + callInt, res => res.error == null + ? resolve(res.result) + : reject(new LightningError(res.error)) + ); + + // Send the command + _self.client.write(JSON.stringify(sendObj)); + })); + } +} + +const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase()); + +methods.forEach(k => { + CLightningClient.prototype[protify(k)] = function (...args: any) { + return this.call(k, args); + }; +}); + +export default new CLightningClient(); From a94403b3a1bd3ec65435d1b4067ad8f90a8bb2f8 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Fri, 29 Jul 2022 16:33:07 +0200 Subject: [PATCH 12/17] Wrote some utility functions to convert clightning output to our db schema --- .../lightning/clightning/clightning-client.ts | 4 + .../clightning/clightning-convert.ts | 95 +++++++++++++++++++ .../lightning/clightning}/jsonrpc.ts | 12 +-- backend/src/config.ts | 8 ++ 4 files changed, 113 insertions(+), 6 deletions(-) create mode 100644 backend/src/api/lightning/clightning/clightning-client.ts create mode 100644 backend/src/api/lightning/clightning/clightning-convert.ts rename backend/src/{rpc-api/core-lightning => api/lightning/clightning}/jsonrpc.ts (94%) diff --git a/backend/src/api/lightning/clightning/clightning-client.ts b/backend/src/api/lightning/clightning/clightning-client.ts new file mode 100644 index 000000000..2b974bca0 --- /dev/null +++ b/backend/src/api/lightning/clightning/clightning-client.ts @@ -0,0 +1,4 @@ +import config from '../../../config'; +import CLightningClient from './jsonrpc'; + +export default new CLightningClient(config.CLIGHTNING.SOCKET); diff --git a/backend/src/api/lightning/clightning/clightning-convert.ts b/backend/src/api/lightning/clightning/clightning-convert.ts new file mode 100644 index 000000000..34ef6f942 --- /dev/null +++ b/backend/src/api/lightning/clightning/clightning-convert.ts @@ -0,0 +1,95 @@ +import logger from "../../../logger"; +import { ILightningApi } from "../lightning-api.interface"; + +export function convertNode(clNode: any): ILightningApi.Node { + return { + alias: clNode.alias ?? '', + color: `#${clNode.color ?? ''}`, + features: [], // TODO parse and return clNode.feature + public_key: clNode.nodeid, + sockets: clNode.addresses?.map(addr => `${addr.address}:${addr.port}`) ?? [], + updated_at: new Date((clNode?.last_timestamp ?? 0) * 1000).toUTCString(), + }; +} + +export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightningApi.Channel[] { + const consolidatedChannelList: ILightningApi.Channel[] = []; + const clChannelsDict = {}; + const clChannelsDictCount = {}; + + for (const clChannel of clChannels) { + if (!clChannelsDict[clChannel.short_channel_id]) { + clChannelsDict[clChannel.short_channel_id] = clChannel; + clChannelsDictCount[clChannel.short_channel_id] = 1; + } else { + consolidatedChannelList.push( + buildBidirectionalChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) + ); + delete clChannelsDict[clChannel.short_channel_id]; + clChannelsDictCount[clChannel.short_channel_id]++; + } + } + const bidirectionalChannelsCount = consolidatedChannelList.length; + + for (const short_channel_id of Object.keys(clChannelsDict)) { + consolidatedChannelList.push(buildUnidirectionalChannel(clChannelsDict[short_channel_id])); + } + const unidirectionalChannelsCount = consolidatedChannelList.length - bidirectionalChannelsCount; + + logger.debug(`clightning knows ${clChannels.length} channels. ` + + `We found ${bidirectionalChannelsCount} bidirectional channels ` + + `and ${unidirectionalChannelsCount} unidirectional channels.`); + + return consolidatedChannelList; +} + +function buildBidirectionalChannel(clChannelA: any, clChannelB: any): ILightningApi.Channel { + const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0); + + return { + id: clChannelA.short_channel_id, + capacity: clChannelA.satoshis, + transaction_id: '', // TODO + transaction_vout: 0, // TODO + updated_at: new Date(lastUpdate * 1000).toUTCString(), + policies: [ + convertPolicy(clChannelA), + convertPolicy(clChannelB) + ] + }; +} + +function buildUnidirectionalChannel(clChannel: any): ILightningApi.Channel { + return { + id: clChannel.short_channel_id, + capacity: clChannel.satoshis, + policies: [convertPolicy(clChannel), getEmptyPolicy()], + transaction_id: '', // TODO + transaction_vout: 0, // TODO + updated_at: new Date((clChannel.last_update ?? 0) * 1000).toUTCString(), + }; +} + +function convertPolicy(clChannel: any): ILightningApi.Policy { + return { + public_key: clChannel.source, + base_fee_mtokens: clChannel.base_fee_millisatoshi, + fee_rate: clChannel.fee_per_millionth, + is_disabled: !clChannel.active, + max_htlc_mtokens: clChannel.htlc_maximum_msat.slice(0, -4), + min_htlc_mtokens: clChannel.htlc_minimum_msat.slice(0, -4), + updated_at: new Date((clChannel.last_update ?? 0) * 1000).toUTCString(), + }; +} + +function getEmptyPolicy(): ILightningApi.Policy { + return { + public_key: 'null', + base_fee_mtokens: '0', + fee_rate: 0, + is_disabled: true, + max_htlc_mtokens: '0', + min_htlc_mtokens: '0', + updated_at: new Date(0).toUTCString(), + }; +} diff --git a/backend/src/rpc-api/core-lightning/jsonrpc.ts b/backend/src/api/lightning/clightning/jsonrpc.ts similarity index 94% rename from backend/src/rpc-api/core-lightning/jsonrpc.ts rename to backend/src/api/lightning/clightning/jsonrpc.ts index 037dfff75..d0b187a54 100644 --- a/backend/src/rpc-api/core-lightning/jsonrpc.ts +++ b/backend/src/api/lightning/clightning/jsonrpc.ts @@ -1,3 +1,5 @@ +// Imported from https://github.com/shesek/lightning-client-js + 'use strict'; const methods = [ @@ -96,7 +98,7 @@ import { createConnection, Socket } from 'net'; import { homedir } from 'os'; import path from 'path'; import { createInterface, Interface } from 'readline'; -import logger from '../../logger'; +import logger from '../../../logger'; class LightningError extends Error { type: string = 'lightning'; @@ -113,7 +115,7 @@ const defaultRpcPath = path.join(homedir(), '.lightning') , fStat = (...p) => statSync(path.join(...p)) , fExists = (...p) => existsSync(path.join(...p)) -class CLightningClient extends EventEmitter { +export default class CLightningClient extends EventEmitter { private rpcPath: string; private reconnectWait: number; private reconnectTimeout; @@ -182,7 +184,7 @@ class CLightningClient extends EventEmitter { return; } const data = JSON.parse(line); - logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`); + // logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`); _self.emit('res:' + data.id, data); }); } @@ -210,7 +212,7 @@ class CLightningClient extends EventEmitter { }, this.reconnectWait * 1000); } - call(method, args = []): Promise { + call(method, args = []): Promise { const _self = this; const callInt = ++this.reqcount; @@ -245,5 +247,3 @@ methods.forEach(k => { return this.call(k, args); }; }); - -export default new CLightningClient(); diff --git a/backend/src/config.ts b/backend/src/config.ts index d480e6c51..b42a45ab2 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -38,6 +38,9 @@ interface IConfig { MACAROON_PATH: string; REST_API_URL: string; }; + CLIGHTNING: { + SOCKET: string; + }; ELECTRUM: { HOST: string; PORT: number; @@ -186,6 +189,9 @@ const defaults: IConfig = { 'MACAROON_PATH': '', 'REST_API_URL': 'https://localhost:8080', }, + 'CLIGHTNING': { + 'SOCKET': '', + }, 'SOCKS5PROXY': { 'ENABLED': false, 'USE_ONION': true, @@ -226,6 +232,7 @@ class Config implements IConfig { BISQ: IConfig['BISQ']; LIGHTNING: IConfig['LIGHTNING']; LND: IConfig['LND']; + CLIGHTNING: IConfig['CLIGHTNING']; SOCKS5PROXY: IConfig['SOCKS5PROXY']; PRICE_DATA_SERVER: IConfig['PRICE_DATA_SERVER']; EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER']; @@ -244,6 +251,7 @@ class Config implements IConfig { this.BISQ = configs.BISQ; this.LIGHTNING = configs.LIGHTNING; this.LND = configs.LND; + this.CLIGHTNING = configs.CLIGHTNING; this.SOCKS5PROXY = configs.SOCKS5PROXY; this.PRICE_DATA_SERVER = configs.PRICE_DATA_SERVER; this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER; From eb90434c28f7d19b1e172226438291195f8105f0 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Fri, 29 Jul 2022 17:41:09 +0200 Subject: [PATCH 13/17] Delete historical generation code --- .../lightning/clightning/clightning-client.ts | 265 +++++++++++++++++- .../clightning/clightning-convert.ts | 44 +-- .../src/api/lightning/clightning/jsonrpc.ts | 249 ---------------- .../lightning-api-abstract-factory.ts | 2 - .../api/lightning/lightning-api-factory.ts | 5 +- .../src/tasks/lightning/node-sync.service.ts | 2 +- 6 files changed, 295 insertions(+), 272 deletions(-) delete mode 100644 backend/src/api/lightning/clightning/jsonrpc.ts diff --git a/backend/src/api/lightning/clightning/clightning-client.ts b/backend/src/api/lightning/clightning/clightning-client.ts index 2b974bca0..629092d03 100644 --- a/backend/src/api/lightning/clightning/clightning-client.ts +++ b/backend/src/api/lightning/clightning/clightning-client.ts @@ -1,4 +1,263 @@ -import config from '../../../config'; -import CLightningClient from './jsonrpc'; +// Imported from https://github.com/shesek/lightning-client-js -export default new CLightningClient(config.CLIGHTNING.SOCKET); +'use strict'; + +const methods = [ + 'addgossip', + 'autocleaninvoice', + 'check', + 'checkmessage', + 'close', + 'connect', + 'createinvoice', + 'createinvoicerequest', + 'createoffer', + 'createonion', + 'decode', + 'decodepay', + 'delexpiredinvoice', + 'delinvoice', + 'delpay', + 'dev-listaddrs', + 'dev-rescan-outputs', + 'disableoffer', + 'disconnect', + 'estimatefees', + 'feerates', + 'fetchinvoice', + 'fundchannel', + 'fundchannel_cancel', + 'fundchannel_complete', + 'fundchannel_start', + 'fundpsbt', + 'getchaininfo', + 'getinfo', + 'getlog', + 'getrawblockbyheight', + 'getroute', + 'getsharedsecret', + 'getutxout', + 'help', + 'invoice', + 'keysend', + 'legacypay', + 'listchannels', + 'listconfigs', + 'listforwards', + 'listfunds', + 'listinvoices', + 'listnodes', + 'listoffers', + 'listpays', + 'listpeers', + 'listsendpays', + 'listtransactions', + 'multifundchannel', + 'multiwithdraw', + 'newaddr', + 'notifications', + 'offer', + 'offerout', + 'openchannel_abort', + 'openchannel_bump', + 'openchannel_init', + 'openchannel_signed', + 'openchannel_update', + 'pay', + 'payersign', + 'paystatus', + 'ping', + 'plugin', + 'reserveinputs', + 'sendinvoice', + 'sendonion', + 'sendonionmessage', + 'sendpay', + 'sendpsbt', + 'sendrawtransaction', + 'setchannelfee', + 'signmessage', + 'signpsbt', + 'stop', + 'txdiscard', + 'txprepare', + 'txsend', + 'unreserveinputs', + 'utxopsbt', + 'waitanyinvoice', + 'waitblockheight', + 'waitinvoice', + 'waitsendpay', + 'withdraw' +]; + + +import EventEmitter from 'events'; +import { existsSync, statSync } from 'fs'; +import { createConnection, Socket } from 'net'; +import { homedir } from 'os'; +import path from 'path'; +import { createInterface, Interface } from 'readline'; +import logger from '../../../logger'; +import { AbstractLightningApi } from '../lightning-api-abstract-factory'; +import { ILightningApi } from '../lightning-api.interface'; +import { convertAndmergeBidirectionalChannels, convertNode } from './clightning-convert'; + +class LightningError extends Error { + type: string = 'lightning'; + message: string = 'lightning-client error'; + + constructor(error) { + super(); + this.type = error.type; + this.message = error.message; + } +} + +const defaultRpcPath = path.join(homedir(), '.lightning') + , fStat = (...p) => statSync(path.join(...p)) + , fExists = (...p) => existsSync(path.join(...p)) + +export default class CLightningClient extends EventEmitter implements AbstractLightningApi { + private rpcPath: string; + private reconnectWait: number; + private reconnectTimeout; + private reqcount: number; + private client: Socket; + private rl: Interface; + private clientConnectionPromise: Promise; + + constructor(rpcPath = defaultRpcPath) { + if (!path.isAbsolute(rpcPath)) { + throw new Error('The rpcPath must be an absolute path'); + } + + if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) { + // network directory provided, use the lightning-rpc within in + if (fExists(rpcPath, 'lightning-rpc')) { + rpcPath = path.join(rpcPath, 'lightning-rpc'); + } + + // main data directory provided, default to using the bitcoin mainnet subdirectory + // to be removed in v0.2.0 + else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) { + logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`) + logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`) + rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc') + } + } + + logger.debug(`[CLightningClient] Connecting to ${rpcPath}`); + + super(); + this.rpcPath = rpcPath; + this.reconnectWait = 0.5; + this.reconnectTimeout = null; + this.reqcount = 0; + + const _self = this; + + this.client = createConnection(rpcPath); + this.rl = createInterface({ input: this.client }) + + this.clientConnectionPromise = new Promise(resolve => { + _self.client.on('connect', () => { + logger.info(`[CLightningClient] Lightning client connected`); + _self.reconnectWait = 1; + resolve(); + }); + + _self.client.on('end', () => { + logger.err('[CLightningClient] Lightning client connection closed, reconnecting'); + _self.increaseWaitTime(); + _self.reconnect(); + }); + + _self.client.on('error', error => { + logger.err(`[CLightningClient] Lightning client connection error: ${error}`); + _self.emit('error', error); + _self.increaseWaitTime(); + _self.reconnect(); + }); + }); + + this.rl.on('line', line => { + line = line.trim(); + if (!line) { + return; + } + const data = JSON.parse(line); + // logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`); + _self.emit('res:' + data.id, data); + }); + } + + increaseWaitTime(): void { + if (this.reconnectWait >= 16) { + this.reconnectWait = 16; + } else { + this.reconnectWait *= 2; + } + } + + reconnect(): void { + const _self = this; + + if (this.reconnectTimeout) { + return; + } + + this.reconnectTimeout = setTimeout(() => { + logger.debug('[CLightningClient] Trying to reconnect...'); + + _self.client.connect(_self.rpcPath); + _self.reconnectTimeout = null; + }, this.reconnectWait * 1000); + } + + call(method, args = []): Promise { + const _self = this; + + const callInt = ++this.reqcount; + const sendObj = { + jsonrpc: '2.0', + method, + params: args, + id: '' + callInt + }; + + // logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`); + + // Wait for the client to connect + return this.clientConnectionPromise + .then(() => new Promise((resolve, reject) => { + // Wait for a response + this.once('res:' + callInt, res => res.error == null + ? resolve(res.result) + : reject(new LightningError(res.error)) + ); + + // Send the command + _self.client.write(JSON.stringify(sendObj)); + })); + } + + async $getNetworkGraph(): Promise { + const listnodes: any[] = await this.call('listnodes'); + const listchannels: any[] = await this.call('listchannels'); + const channelsList = convertAndmergeBidirectionalChannels(listchannels['channels']); + + return { + nodes: listnodes['nodes'].map(node => convertNode(node)), + channels: channelsList, + }; + } +} + +const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase()); + +methods.forEach(k => { + CLightningClient.prototype[protify(k)] = function (...args: any) { + return this.call(k, args); + }; +}); diff --git a/backend/src/api/lightning/clightning/clightning-convert.ts b/backend/src/api/lightning/clightning/clightning-convert.ts index 34ef6f942..8ceec3b7e 100644 --- a/backend/src/api/lightning/clightning/clightning-convert.ts +++ b/backend/src/api/lightning/clightning/clightning-convert.ts @@ -1,6 +1,8 @@ -import logger from "../../../logger"; -import { ILightningApi } from "../lightning-api.interface"; +import { ILightningApi } from '../lightning-api.interface'; +/** + * Convert a clightning "listnode" entry to a lnd node entry + */ export function convertNode(clNode: any): ILightningApi.Node { return { alias: clNode.alias ?? '', @@ -12,7 +14,10 @@ export function convertNode(clNode: any): ILightningApi.Node { }; } -export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightningApi.Channel[] { +/** + * Convert clightning "listchannels" response to lnd "describegraph.channels" format + */ + export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightningApi.Channel[] { const consolidatedChannelList: ILightningApi.Channel[] = []; const clChannelsDict = {}; const clChannelsDictCount = {}; @@ -23,27 +28,24 @@ export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightn clChannelsDictCount[clChannel.short_channel_id] = 1; } else { consolidatedChannelList.push( - buildBidirectionalChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) + buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) ); delete clChannelsDict[clChannel.short_channel_id]; clChannelsDictCount[clChannel.short_channel_id]++; } } - const bidirectionalChannelsCount = consolidatedChannelList.length; - for (const short_channel_id of Object.keys(clChannelsDict)) { - consolidatedChannelList.push(buildUnidirectionalChannel(clChannelsDict[short_channel_id])); + consolidatedChannelList.push(buildIncompleteChannel(clChannelsDict[short_channel_id])); } - const unidirectionalChannelsCount = consolidatedChannelList.length - bidirectionalChannelsCount; - - logger.debug(`clightning knows ${clChannels.length} channels. ` + - `We found ${bidirectionalChannelsCount} bidirectional channels ` + - `and ${unidirectionalChannelsCount} unidirectional channels.`); return consolidatedChannelList; } -function buildBidirectionalChannel(clChannelA: any, clChannelB: any): ILightningApi.Channel { +/** + * Convert two clightning "getchannels" entries into a full a lnd "describegraph.channels" format + * In this case, clightning knows the channel policy for both nodes + */ +function buildFullChannel(clChannelA: any, clChannelB: any): ILightningApi.Channel { const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0); return { @@ -59,7 +61,11 @@ function buildBidirectionalChannel(clChannelA: any, clChannelB: any): ILightning }; } -function buildUnidirectionalChannel(clChannel: any): ILightningApi.Channel { +/** + * Convert one clightning "getchannels" entry into a full a lnd "describegraph.channels" format + * In this case, clightning knows the channel policy of only one node + */ + function buildIncompleteChannel(clChannel: any): ILightningApi.Channel { return { id: clChannel.short_channel_id, capacity: clChannel.satoshis, @@ -70,7 +76,10 @@ function buildUnidirectionalChannel(clChannel: any): ILightningApi.Channel { }; } -function convertPolicy(clChannel: any): ILightningApi.Policy { +/** + * Convert a clightning "listnode" response to a lnd channel policy format + */ + function convertPolicy(clChannel: any): ILightningApi.Policy { return { public_key: clChannel.source, base_fee_mtokens: clChannel.base_fee_millisatoshi, @@ -82,7 +91,10 @@ function convertPolicy(clChannel: any): ILightningApi.Policy { }; } -function getEmptyPolicy(): ILightningApi.Policy { +/** + * Create an empty channel policy in lnd format + */ + function getEmptyPolicy(): ILightningApi.Policy { return { public_key: 'null', base_fee_mtokens: '0', diff --git a/backend/src/api/lightning/clightning/jsonrpc.ts b/backend/src/api/lightning/clightning/jsonrpc.ts deleted file mode 100644 index d0b187a54..000000000 --- a/backend/src/api/lightning/clightning/jsonrpc.ts +++ /dev/null @@ -1,249 +0,0 @@ -// Imported from https://github.com/shesek/lightning-client-js - -'use strict'; - -const methods = [ - 'addgossip', - 'autocleaninvoice', - 'check', - 'checkmessage', - 'close', - 'connect', - 'createinvoice', - 'createinvoicerequest', - 'createoffer', - 'createonion', - 'decode', - 'decodepay', - 'delexpiredinvoice', - 'delinvoice', - 'delpay', - 'dev-listaddrs', - 'dev-rescan-outputs', - 'disableoffer', - 'disconnect', - 'estimatefees', - 'feerates', - 'fetchinvoice', - 'fundchannel', - 'fundchannel_cancel', - 'fundchannel_complete', - 'fundchannel_start', - 'fundpsbt', - 'getchaininfo', - 'getinfo', - 'getlog', - 'getrawblockbyheight', - 'getroute', - 'getsharedsecret', - 'getutxout', - 'help', - 'invoice', - 'keysend', - 'legacypay', - 'listchannels', - 'listconfigs', - 'listforwards', - 'listfunds', - 'listinvoices', - 'listnodes', - 'listoffers', - 'listpays', - 'listpeers', - 'listsendpays', - 'listtransactions', - 'multifundchannel', - 'multiwithdraw', - 'newaddr', - 'notifications', - 'offer', - 'offerout', - 'openchannel_abort', - 'openchannel_bump', - 'openchannel_init', - 'openchannel_signed', - 'openchannel_update', - 'pay', - 'payersign', - 'paystatus', - 'ping', - 'plugin', - 'reserveinputs', - 'sendinvoice', - 'sendonion', - 'sendonionmessage', - 'sendpay', - 'sendpsbt', - 'sendrawtransaction', - 'setchannelfee', - 'signmessage', - 'signpsbt', - 'stop', - 'txdiscard', - 'txprepare', - 'txsend', - 'unreserveinputs', - 'utxopsbt', - 'waitanyinvoice', - 'waitblockheight', - 'waitinvoice', - 'waitsendpay', - 'withdraw' -]; - - -import EventEmitter from 'events'; -import { existsSync, statSync } from 'fs'; -import { createConnection, Socket } from 'net'; -import { homedir } from 'os'; -import path from 'path'; -import { createInterface, Interface } from 'readline'; -import logger from '../../../logger'; - -class LightningError extends Error { - type: string = 'lightning'; - message: string = 'lightning-client error'; - - constructor(error) { - super(); - this.type = error.type; - this.message = error.message; - } -} - -const defaultRpcPath = path.join(homedir(), '.lightning') - , fStat = (...p) => statSync(path.join(...p)) - , fExists = (...p) => existsSync(path.join(...p)) - -export default class CLightningClient extends EventEmitter { - private rpcPath: string; - private reconnectWait: number; - private reconnectTimeout; - private reqcount: number; - private client: Socket; - private rl: Interface; - private clientConnectionPromise: Promise; - - constructor(rpcPath = defaultRpcPath) { - if (!path.isAbsolute(rpcPath)) { - throw new Error('The rpcPath must be an absolute path'); - } - - if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) { - // network directory provided, use the lightning-rpc within in - if (fExists(rpcPath, 'lightning-rpc')) { - rpcPath = path.join(rpcPath, 'lightning-rpc'); - } - - // main data directory provided, default to using the bitcoin mainnet subdirectory - // to be removed in v0.2.0 - else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) { - logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`) - logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`) - rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc') - } - } - - logger.debug(`[CLightningClient] Connecting to ${rpcPath}`); - - super(); - this.rpcPath = rpcPath; - this.reconnectWait = 0.5; - this.reconnectTimeout = null; - this.reqcount = 0; - - const _self = this; - - this.client = createConnection(rpcPath); - this.rl = createInterface({ input: this.client }) - - this.clientConnectionPromise = new Promise(resolve => { - _self.client.on('connect', () => { - logger.debug(`[CLightningClient] Lightning client connected`); - _self.reconnectWait = 1; - resolve(); - }); - - _self.client.on('end', () => { - logger.err('[CLightningClient] Lightning client connection closed, reconnecting'); - _self.increaseWaitTime(); - _self.reconnect(); - }); - - _self.client.on('error', error => { - logger.err(`[CLightningClient] Lightning client connection error: ${error}`); - _self.emit('error', error); - _self.increaseWaitTime(); - _self.reconnect(); - }); - }); - - this.rl.on('line', line => { - line = line.trim(); - if (!line) { - return; - } - const data = JSON.parse(line); - // logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`); - _self.emit('res:' + data.id, data); - }); - } - - increaseWaitTime(): void { - if (this.reconnectWait >= 16) { - this.reconnectWait = 16; - } else { - this.reconnectWait *= 2; - } - } - - reconnect(): void { - const _self = this; - - if (this.reconnectTimeout) { - return; - } - - this.reconnectTimeout = setTimeout(() => { - logger.debug('[CLightningClient] Trying to reconnect...'); - - _self.client.connect(_self.rpcPath); - _self.reconnectTimeout = null; - }, this.reconnectWait * 1000); - } - - call(method, args = []): Promise { - const _self = this; - - const callInt = ++this.reqcount; - const sendObj = { - jsonrpc: '2.0', - method, - params: args, - id: '' + callInt - }; - - logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`); - - // Wait for the client to connect - return this.clientConnectionPromise - .then(() => new Promise((resolve, reject) => { - // Wait for a response - this.once('res:' + callInt, res => res.error == null - ? resolve(res.result) - : reject(new LightningError(res.error)) - ); - - // Send the command - _self.client.write(JSON.stringify(sendObj)); - })); - } -} - -const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase()); - -methods.forEach(k => { - CLightningClient.prototype[protify(k)] = function (...args: any) { - return this.call(k, args); - }; -}); diff --git a/backend/src/api/lightning/lightning-api-abstract-factory.ts b/backend/src/api/lightning/lightning-api-abstract-factory.ts index 026568c6d..e6691b0a4 100644 --- a/backend/src/api/lightning/lightning-api-abstract-factory.ts +++ b/backend/src/api/lightning/lightning-api-abstract-factory.ts @@ -1,7 +1,5 @@ import { ILightningApi } from './lightning-api.interface'; export interface AbstractLightningApi { - $getNetworkInfo(): Promise; $getNetworkGraph(): Promise; - $getInfo(): Promise; } diff --git a/backend/src/api/lightning/lightning-api-factory.ts b/backend/src/api/lightning/lightning-api-factory.ts index ab551095c..fdadd8230 100644 --- a/backend/src/api/lightning/lightning-api-factory.ts +++ b/backend/src/api/lightning/lightning-api-factory.ts @@ -1,9 +1,12 @@ import config from '../../config'; +import CLightningClient from './clightning/clightning-client'; import { AbstractLightningApi } from './lightning-api-abstract-factory'; import LndApi from './lnd/lnd-api'; function lightningApiFactory(): AbstractLightningApi { - switch (config.LIGHTNING.BACKEND) { + switch (config.LIGHTNING.ENABLED === true && config.LIGHTNING.BACKEND) { + case 'cln': + return new CLightningClient(config.CLIGHTNING.SOCKET); case 'lnd': default: return new LndApi(); diff --git a/backend/src/tasks/lightning/node-sync.service.ts b/backend/src/tasks/lightning/node-sync.service.ts index 10cd2d744..d3367d51c 100644 --- a/backend/src/tasks/lightning/node-sync.service.ts +++ b/backend/src/tasks/lightning/node-sync.service.ts @@ -5,9 +5,9 @@ import bitcoinClient from '../../api/bitcoin/bitcoin-client'; import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory'; import config from '../../config'; import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; -import lightningApi from '../../api/lightning/lightning-api-factory'; import { ILightningApi } from '../../api/lightning/lightning-api.interface'; import { $lookupNodeLocation } from './sync-tasks/node-locations'; +import lightningApi from '../../api/lightning/lightning-api-factory'; class NodeSyncService { constructor() {} From 80f1ee45b5b8a0198fa572a69effef09e4d4fc95 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Mon, 1 Aug 2022 19:42:33 +0200 Subject: [PATCH 14/17] Rebased using the update lightning interfaces --- .../lightning/clightning/clightning-client.ts | 2 +- .../clightning/clightning-convert.ts | 80 +++++++++---------- .../src/tasks/lightning/node-sync.service.ts | 24 ++++-- 3 files changed, 57 insertions(+), 49 deletions(-) diff --git a/backend/src/api/lightning/clightning/clightning-client.ts b/backend/src/api/lightning/clightning/clightning-client.ts index 629092d03..f5643ed01 100644 --- a/backend/src/api/lightning/clightning/clightning-client.ts +++ b/backend/src/api/lightning/clightning/clightning-client.ts @@ -249,7 +249,7 @@ export default class CLightningClient extends EventEmitter implements AbstractLi return { nodes: listnodes['nodes'].map(node => convertNode(node)), - channels: channelsList, + edges: channelsList, }; } } diff --git a/backend/src/api/lightning/clightning/clightning-convert.ts b/backend/src/api/lightning/clightning/clightning-convert.ts index 8ceec3b7e..008094bf5 100644 --- a/backend/src/api/lightning/clightning/clightning-convert.ts +++ b/backend/src/api/lightning/clightning/clightning-convert.ts @@ -8,14 +8,19 @@ export function convertNode(clNode: any): ILightningApi.Node { alias: clNode.alias ?? '', color: `#${clNode.color ?? ''}`, features: [], // TODO parse and return clNode.feature - public_key: clNode.nodeid, - sockets: clNode.addresses?.map(addr => `${addr.address}:${addr.port}`) ?? [], - updated_at: new Date((clNode?.last_timestamp ?? 0) * 1000).toUTCString(), + pub_key: clNode.nodeid, + addresses: clNode.addresses?.map((addr) => { + return { + network: addr.type, + addr: `${addr.address}:${addr.port}` + }; + }), + last_update: clNode?.last_timestamp ?? 0, }; } /** - * Convert clightning "listchannels" response to lnd "describegraph.channels" format + * Convert clightning "listchannels" response to lnd "describegraph.edges" format */ export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightningApi.Channel[] { const consolidatedChannelList: ILightningApi.Channel[] = []; @@ -41,67 +46,58 @@ export function convertNode(clNode: any): ILightningApi.Node { return consolidatedChannelList; } +export function convertChannelId(channelId): string { + const s = channelId.split('x').map(part => parseInt(part)); + return BigInt((s[0] << 40) | (s[1] << 16) | s[2]).toString(); +} + /** - * Convert two clightning "getchannels" entries into a full a lnd "describegraph.channels" format + * Convert two clightning "getchannels" entries into a full a lnd "describegraph.edges" format * In this case, clightning knows the channel policy for both nodes */ function buildFullChannel(clChannelA: any, clChannelB: any): ILightningApi.Channel { const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0); return { - id: clChannelA.short_channel_id, + channel_id: clChannelA.short_channel_id, capacity: clChannelA.satoshis, - transaction_id: '', // TODO - transaction_vout: 0, // TODO - updated_at: new Date(lastUpdate * 1000).toUTCString(), - policies: [ - convertPolicy(clChannelA), - convertPolicy(clChannelB) - ] + last_update: lastUpdate, + node1_policy: convertPolicy(clChannelA), + node2_policy: convertPolicy(clChannelB), + chan_point: ':0', // TODO + node1_pub: clChannelA.source, + node2_pub: clChannelB.source, }; } /** - * Convert one clightning "getchannels" entry into a full a lnd "describegraph.channels" format + * Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format * In this case, clightning knows the channel policy of only one node */ function buildIncompleteChannel(clChannel: any): ILightningApi.Channel { return { - id: clChannel.short_channel_id, + channel_id: clChannel.short_channel_id, capacity: clChannel.satoshis, - policies: [convertPolicy(clChannel), getEmptyPolicy()], - transaction_id: '', // TODO - transaction_vout: 0, // TODO - updated_at: new Date((clChannel.last_update ?? 0) * 1000).toUTCString(), + last_update: clChannel.last_update ?? 0, + node1_policy: convertPolicy(clChannel), + node2_policy: null, + chan_point: ':0', // TODO + node1_pub: clChannel.source, + node2_pub: clChannel.destination, }; } /** * Convert a clightning "listnode" response to a lnd channel policy format */ - function convertPolicy(clChannel: any): ILightningApi.Policy { + function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy { return { - public_key: clChannel.source, - base_fee_mtokens: clChannel.base_fee_millisatoshi, - fee_rate: clChannel.fee_per_millionth, - is_disabled: !clChannel.active, - max_htlc_mtokens: clChannel.htlc_maximum_msat.slice(0, -4), - min_htlc_mtokens: clChannel.htlc_minimum_msat.slice(0, -4), - updated_at: new Date((clChannel.last_update ?? 0) * 1000).toUTCString(), - }; -} - -/** - * Create an empty channel policy in lnd format - */ - function getEmptyPolicy(): ILightningApi.Policy { - return { - public_key: 'null', - base_fee_mtokens: '0', - fee_rate: 0, - is_disabled: true, - max_htlc_mtokens: '0', - min_htlc_mtokens: '0', - updated_at: new Date(0).toUTCString(), + time_lock_delta: 0, // TODO + min_htlc: clChannel.htlc_minimum_msat.slice(0, -4), + max_htlc_msat: clChannel.htlc_maximum_msat.slice(0, -4), + fee_base_msat: clChannel.base_fee_millisatoshi, + fee_rate_milli_msat: clChannel.fee_per_millionth, + disabled: !clChannel.active, + last_update: clChannel.last_update ?? 0, }; } diff --git a/backend/src/tasks/lightning/node-sync.service.ts b/backend/src/tasks/lightning/node-sync.service.ts index d3367d51c..863ee30da 100644 --- a/backend/src/tasks/lightning/node-sync.service.ts +++ b/backend/src/tasks/lightning/node-sync.service.ts @@ -8,6 +8,7 @@ import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; import { ILightningApi } from '../../api/lightning/lightning-api.interface'; import { $lookupNodeLocation } from './sync-tasks/node-locations'; import lightningApi from '../../api/lightning/lightning-api-factory'; +import { convertChannelId } from '../../api/lightning/clightning/clightning-convert'; class NodeSyncService { constructor() {} @@ -320,7 +321,7 @@ class NodeSyncService { ;`; await DB.query(query, [ - channel.channel_id, + this.toIntegerId(channel.channel_id), this.toShortId(channel.channel_id), channel.capacity, txid, @@ -391,8 +392,7 @@ class NodeSyncService { private async $saveNode(node: ILightningApi.Node): Promise { try { - const updatedAt = this.utcDateToMysql(node.last_update); - const sockets = node.addresses.map(a => a.addr).join(','); + const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; const query = `INSERT INTO nodes( public_key, first_seen, @@ -401,15 +401,16 @@ class NodeSyncService { color, sockets ) - VALUES (?, NOW(), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?, sockets = ?;`; + VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?) + ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?`; await DB.query(query, [ node.pub_key, - updatedAt, + node.last_update, node.alias, node.color, sockets, - updatedAt, + node.last_update, node.alias, node.color, sockets, @@ -419,8 +420,19 @@ class NodeSyncService { } } + private toIntegerId(id: string): string { + if (config.LIGHTNING.BACKEND === 'lnd') { + return id; + } + return convertChannelId(id); + } + /** Decodes a channel id returned by lnd as uint64 to a short channel id */ private toShortId(id: string): string { + if (config.LIGHTNING.BACKEND === 'cln') { + return id; + } + const n = BigInt(id); return [ n >> 40n, // nth block From 00cd3ee9bf6c74487b30ebb7ec092495059b4198 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 16:18:19 +0200 Subject: [PATCH 15/17] Don't run the ln network update if the graph is emtpy --- backend/src/index.ts | 6 +++--- ...node-sync.service.ts => network-sync.service.ts} | 13 +++++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) rename backend/src/tasks/lightning/{node-sync.service.ts => network-sync.service.ts} (97%) diff --git a/backend/src/index.ts b/backend/src/index.ts index fa80fb2ad..0f7cc7aa7 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -28,7 +28,7 @@ import nodesRoutes from './api/explorer/nodes.routes'; import channelsRoutes from './api/explorer/channels.routes'; import generalLightningRoutes from './api/explorer/general.routes'; import lightningStatsUpdater from './tasks/lightning/stats-updater.service'; -import nodeSyncService from './tasks/lightning/node-sync.service'; +import networkSyncService from './tasks/lightning/network-sync.service'; import statisticsRoutes from './api/statistics/statistics.routes'; import miningRoutes from './api/mining/mining-routes'; import bisqRoutes from './api/bisq/bisq.routes'; @@ -136,8 +136,8 @@ class Server { } if (config.LIGHTNING.ENABLED) { - nodeSyncService.$startService() - .then(() => lightningStatsUpdater.$startService()); + networkSyncService.$startService() + .then(() => lightningStatsUpdater.$startService()); } this.server.listen(config.MEMPOOL.HTTP_PORT, () => { diff --git a/backend/src/tasks/lightning/node-sync.service.ts b/backend/src/tasks/lightning/network-sync.service.ts similarity index 97% rename from backend/src/tasks/lightning/node-sync.service.ts rename to backend/src/tasks/lightning/network-sync.service.ts index 863ee30da..826664cf4 100644 --- a/backend/src/tasks/lightning/node-sync.service.ts +++ b/backend/src/tasks/lightning/network-sync.service.ts @@ -10,7 +10,7 @@ import { $lookupNodeLocation } from './sync-tasks/node-locations'; import lightningApi from '../../api/lightning/lightning-api-factory'; import { convertChannelId } from '../../api/lightning/clightning/clightning-convert'; -class NodeSyncService { +class NetworkSyncService { constructor() {} public async $startService() { @@ -28,6 +28,11 @@ class NodeSyncService { logger.info(`Updating nodes and channels...`); const networkGraph = await lightningApi.$getNetworkGraph(); + if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) { + logger.info(`LN Network graph is empty, retrying in 10 seconds`); + setTimeout(this.$runUpdater, 10000); + return; + } for (const node of networkGraph.nodes) { await this.$saveNode(node); @@ -376,6 +381,10 @@ class NodeSyncService { } private async $setChannelsInactive(graphChannelsIds: string[]): Promise { + if (graphChannelsIds.length === 0) { + return; + } + try { await DB.query(` UPDATE channels @@ -447,4 +456,4 @@ class NodeSyncService { } } -export default new NodeSyncService(); +export default new NetworkSyncService(); From a25af16f7c0557d1144d035344ee060c56dc04a7 Mon Sep 17 00:00:00 2001 From: nymkappa Date: Tue, 2 Aug 2022 16:39:34 +0200 Subject: [PATCH 16/17] Fetch funding tx for clightning channels --- .../lightning/clightning/clightning-client.ts | 2 +- .../clightning/clightning-convert.ts | 25 +++++++++++++------ backend/src/index.ts | 4 ++- .../sync-tasks/funding-tx-fetcher.ts | 16 ++++++------ 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/backend/src/api/lightning/clightning/clightning-client.ts b/backend/src/api/lightning/clightning/clightning-client.ts index f5643ed01..15f472f2e 100644 --- a/backend/src/api/lightning/clightning/clightning-client.ts +++ b/backend/src/api/lightning/clightning/clightning-client.ts @@ -245,7 +245,7 @@ export default class CLightningClient extends EventEmitter implements AbstractLi async $getNetworkGraph(): Promise { const listnodes: any[] = await this.call('listnodes'); const listchannels: any[] = await this.call('listchannels'); - const channelsList = convertAndmergeBidirectionalChannels(listchannels['channels']); + const channelsList = await convertAndmergeBidirectionalChannels(listchannels['channels']); return { nodes: listnodes['nodes'].map(node => convertNode(node)), diff --git a/backend/src/api/lightning/clightning/clightning-convert.ts b/backend/src/api/lightning/clightning/clightning-convert.ts index 008094bf5..1a267bc65 100644 --- a/backend/src/api/lightning/clightning/clightning-convert.ts +++ b/backend/src/api/lightning/clightning/clightning-convert.ts @@ -1,4 +1,5 @@ import { ILightningApi } from '../lightning-api.interface'; +import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher'; /** * Convert a clightning "listnode" entry to a lnd node entry @@ -22,7 +23,7 @@ export function convertNode(clNode: any): ILightningApi.Node { /** * Convert clightning "listchannels" response to lnd "describegraph.edges" format */ - export function convertAndmergeBidirectionalChannels(clChannels: any[]): ILightningApi.Channel[] { + export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise { const consolidatedChannelList: ILightningApi.Channel[] = []; const clChannelsDict = {}; const clChannelsDictCount = {}; @@ -33,14 +34,14 @@ export function convertNode(clNode: any): ILightningApi.Node { clChannelsDictCount[clChannel.short_channel_id] = 1; } else { consolidatedChannelList.push( - buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) + await buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) ); delete clChannelsDict[clChannel.short_channel_id]; clChannelsDictCount[clChannel.short_channel_id]++; } } for (const short_channel_id of Object.keys(clChannelsDict)) { - consolidatedChannelList.push(buildIncompleteChannel(clChannelsDict[short_channel_id])); + consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id])); } return consolidatedChannelList; @@ -55,16 +56,20 @@ export function convertChannelId(channelId): string { * Convert two clightning "getchannels" entries into a full a lnd "describegraph.edges" format * In this case, clightning knows the channel policy for both nodes */ -function buildFullChannel(clChannelA: any, clChannelB: any): ILightningApi.Channel { +async function buildFullChannel(clChannelA: any, clChannelB: any): Promise { const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0); - + + const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannelA.short_channel_id); + const parts = clChannelA.short_channel_id.split('x'); + const outputIdx = parts[2]; + return { channel_id: clChannelA.short_channel_id, capacity: clChannelA.satoshis, last_update: lastUpdate, node1_policy: convertPolicy(clChannelA), node2_policy: convertPolicy(clChannelB), - chan_point: ':0', // TODO + chan_point: `${tx.txid}:${outputIdx}`, node1_pub: clChannelA.source, node2_pub: clChannelB.source, }; @@ -74,14 +79,18 @@ function buildFullChannel(clChannelA: any, clChannelB: any): ILightningApi.Chann * Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format * In this case, clightning knows the channel policy of only one node */ - function buildIncompleteChannel(clChannel: any): ILightningApi.Channel { + async function buildIncompleteChannel(clChannel: any): Promise { + const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id); + const parts = clChannel.short_channel_id.split('x'); + const outputIdx = parts[2]; + return { channel_id: clChannel.short_channel_id, capacity: clChannel.satoshis, last_update: clChannel.last_update ?? 0, node1_policy: convertPolicy(clChannel), node2_policy: null, - chan_point: ':0', // TODO + chan_point: `${tx.txid}:${outputIdx}`, node1_pub: clChannel.source, node2_pub: clChannel.destination, }; diff --git a/backend/src/index.ts b/backend/src/index.ts index 0f7cc7aa7..976ec12df 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -34,6 +34,7 @@ import miningRoutes from './api/mining/mining-routes'; import bisqRoutes from './api/bisq/bisq.routes'; import liquidRoutes from './api/liquid/liquid.routes'; import bitcoinRoutes from './api/bitcoin/bitcoin.routes'; +import fundingTxFetcher from "./tasks/lightning/sync-tasks/funding-tx-fetcher"; class Server { private wss: WebSocket.Server | undefined; @@ -136,7 +137,8 @@ class Server { } if (config.LIGHTNING.ENABLED) { - networkSyncService.$startService() + fundingTxFetcher.$init() + .then(() => networkSyncService.$startService()) .then(() => lightningStatsUpdater.$startService()); } diff --git a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts index 9da721876..926d20c91 100644 --- a/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts +++ b/backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts @@ -1,8 +1,6 @@ import { existsSync, promises } from 'fs'; -import bitcoinApiFactory from '../../../api/bitcoin/bitcoin-api-factory'; import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; import config from '../../../config'; -import DB from '../../../database'; import logger from '../../../logger'; const fsPromises = promises; @@ -16,12 +14,7 @@ class FundingTxFetcher { private channelNewlyProcessed = 0; public fundingTxCache = {}; - async $fetchChannelsFundingTxs(channelIds: string[]): Promise { - if (this.running) { - return; - } - this.running = true; - + async $init(): Promise { // Load funding tx disk cache if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { try { @@ -32,6 +25,13 @@ class FundingTxFetcher { } logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`); } + } + + async $fetchChannelsFundingTxs(channelIds: string[]): Promise { + if (this.running) { + return; + } + this.running = true; const globalTimer = new Date().getTime() / 1000; let cacheTimer = new Date().getTime() / 1000; From 33f3b0006bd4d44c709b151552c6421645e393f8 Mon Sep 17 00:00:00 2001 From: wiz Date: Tue, 2 Aug 2022 21:49:53 +0200 Subject: [PATCH 17/17] Move fast-xml-parser from devDeps to deps --- backend/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/package.json b/backend/package.json index 750380156..47694ecf8 100644 --- a/backend/package.json +++ b/backend/package.json @@ -37,6 +37,7 @@ "bitcoinjs-lib": "6.0.1", "crypto-js": "^4.0.0", "express": "^4.18.0", + "fast-xml-parser": "^4.0.9", "maxmind": "^4.3.6", "mysql2": "2.3.3", "node-worker-threads-pool": "^1.5.1", @@ -53,7 +54,6 @@ "@typescript-eslint/parser": "^5.30.5", "eslint": "^8.19.0", "eslint-config-prettier": "^8.5.0", - "fast-xml-parser": "^4.0.9", "prettier": "^2.7.1" } }