From 582fd0149f5b009db3884f3d3a097f65539c660f Mon Sep 17 00:00:00 2001 From: softsimon Date: Sun, 24 Apr 2022 01:33:38 +0400 Subject: [PATCH] Store nodes and channels --- lightning-backend/mempool-config.sample.json | 6 +- .../src/api/lightning-api-abstract-factory.ts | 5 +- .../src/api/lightning-api.interface.ts | 7 + lightning-backend/src/api/lnd/lnd-api.ts | 8 +- lightning-backend/src/database-migration.ts | 232 ++++++++++++++++++ lightning-backend/src/index.ts | 11 +- .../src/tasks/node-sync.service.ts | 163 ++++++++++++ ...ts-updater.ts => stats-updater.service.ts} | 19 +- 8 files changed, 422 insertions(+), 29 deletions(-) create mode 100644 lightning-backend/src/database-migration.ts create mode 100644 lightning-backend/src/tasks/node-sync.service.ts rename lightning-backend/src/tasks/{stats-updater.ts => stats-updater.service.ts} (69%) diff --git a/lightning-backend/mempool-config.sample.json b/lightning-backend/mempool-config.sample.json index 8df468cc6..e45742042 100644 --- a/lightning-backend/mempool-config.sample.json +++ b/lightning-backend/mempool-config.sample.json @@ -20,8 +20,8 @@ "HOST": "127.0.0.1", "PORT": 3306, "SOCKET": "/var/run/mysql/mysql.sock", - "DATABASE": "mempool", - "USERNAME": "mempool", - "PASSWORD": "mempool" + "DATABASE": "lightning", + "USERNAME": "root", + "PASSWORD": "root" } } diff --git a/lightning-backend/src/api/lightning-api-abstract-factory.ts b/lightning-backend/src/api/lightning-api-abstract-factory.ts index 086498dd4..2e78b52f8 100644 --- a/lightning-backend/src/api/lightning-api-abstract-factory.ts +++ b/lightning-backend/src/api/lightning-api-abstract-factory.ts @@ -1,6 +1,7 @@ import { ILightningApi } from './lightning-api.interface'; export interface AbstractLightningApi { - getNetworkInfo(): Promise; - getNetworkGraph(): Promise; + $getNetworkInfo(): Promise; + $getNetworkGraph(): Promise; + $getChanInfo(id: string): Promise; } diff --git a/lightning-backend/src/api/lightning-api.interface.ts b/lightning-backend/src/api/lightning-api.interface.ts index 4540185ff..26999c119 100644 --- a/lightning-backend/src/api/lightning-api.interface.ts +++ b/lightning-backend/src/api/lightning-api.interface.ts @@ -26,6 +26,13 @@ export namespace ILightningApi { interface Policy { public_key: string; + base_fee_mtokens?: number; + cltv_delta?: number; + fee_rate?: number; + is_disabled?: boolean; + max_htlc_mtokens?: number; + min_htlc_mtokens?: number; + updated_at?: string; } export interface Node { diff --git a/lightning-backend/src/api/lnd/lnd-api.ts b/lightning-backend/src/api/lnd/lnd-api.ts index ebf477ca4..edddc31c6 100644 --- a/lightning-backend/src/api/lnd/lnd-api.ts +++ b/lightning-backend/src/api/lnd/lnd-api.ts @@ -25,13 +25,17 @@ class LndApi implements AbstractLightningApi { } } - async getNetworkInfo(): Promise { + async $getNetworkInfo(): Promise { return await lnService.getNetworkInfo({ lnd: this.lnd }); } - async getNetworkGraph(): Promise { + async $getNetworkGraph(): Promise { return await lnService.getNetworkGraph({ lnd: this.lnd }); } + + async $getChanInfo(id: string): Promise { + return await lnService.getChannel({ lnd: this.lnd, id }); + } } export default LndApi; diff --git a/lightning-backend/src/database-migration.ts b/lightning-backend/src/database-migration.ts new file mode 100644 index 000000000..4690fa0e0 --- /dev/null +++ b/lightning-backend/src/database-migration.ts @@ -0,0 +1,232 @@ +import config from './config'; +import DB from './database'; +import logger from './logger'; + +const sleep = (ms: number) => new Promise(res => setTimeout(res, ms)); + +class DatabaseMigration { + private static currentVersion = 1; + private queryTimeout = 120000; + + constructor() { } + /** + * Entry point + */ + public async $initializeOrMigrateDatabase(): Promise { + logger.debug('MIGRATIONS: Running migrations'); + + await this.$printDatabaseVersion(); + + // First of all, if the `state` database does not exist, create it so we can track migration version + if (!await this.$checkIfTableExists('state')) { + logger.debug('MIGRATIONS: `state` table does not exist. Creating it.'); + try { + await this.$createMigrationStateTable(); + } catch (e) { + logger.err('MIGRATIONS: Unable to create `state` table, aborting in 10 seconds. ' + e); + await sleep(10000); + process.exit(-1); + } + logger.debug('MIGRATIONS: `state` table initialized.'); + } + + let databaseSchemaVersion = 0; + try { + databaseSchemaVersion = await this.$getSchemaVersionFromDatabase(); + } catch (e) { + logger.err('MIGRATIONS: Unable to get current database migration version, aborting in 10 seconds. ' + e); + await sleep(10000); + process.exit(-1); + } + + logger.debug('MIGRATIONS: Current state.schema_version ' + databaseSchemaVersion); + logger.debug('MIGRATIONS: Latest DatabaseMigration.version is ' + DatabaseMigration.currentVersion); + if (databaseSchemaVersion >= DatabaseMigration.currentVersion) { + logger.debug('MIGRATIONS: Nothing to do.'); + return; + } + + // Now, create missing tables. Those queries cannot be wrapped into a transaction unfortunately + try { + await this.$createMissingTablesAndIndexes(databaseSchemaVersion); + } catch (e) { + logger.err('MIGRATIONS: Unable to create required tables, aborting in 10 seconds. ' + e); + await sleep(10000); + process.exit(-1); + } + + if (DatabaseMigration.currentVersion > databaseSchemaVersion) { + logger.notice('MIGRATIONS: Upgrading datababse schema'); + try { + await this.$migrateTableSchemaFromVersion(databaseSchemaVersion); + logger.notice(`MIGRATIONS: OK. Database schema have been migrated from version ${databaseSchemaVersion} to ${DatabaseMigration.currentVersion} (latest version)`); + } catch (e) { + logger.err('MIGRATIONS: Unable to migrate database, aborting. ' + e); + } + } + + return; + } + + /** + * Create all missing tables + */ + private async $createMissingTablesAndIndexes(databaseSchemaVersion: number) { + try { + await this.$executeQuery(this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics')); + await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes')); + await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels')); + } catch (e) { + throw e; + } + } + + /** + * Small query execution wrapper to log all executed queries + */ + private async $executeQuery(query: string, silent: boolean = false): Promise { + if (!silent) { + logger.debug('MIGRATIONS: Execute query:\n' + query); + } + return DB.query({ sql: query, timeout: this.queryTimeout }); + } + + /** + * Check if 'table' exists in the database + */ + private async $checkIfTableExists(table: string): Promise { + const query = `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '${config.DATABASE.DATABASE}' AND TABLE_NAME = '${table}'`; + const [rows] = await DB.query({ sql: query, timeout: this.queryTimeout }); + return rows[0]['COUNT(*)'] === 1; + } + + /** + * Get current database version + */ + private async $getSchemaVersionFromDatabase(): Promise { + const query = `SELECT number FROM state WHERE name = 'schema_version';`; + const [rows] = await this.$executeQuery(query, true); + return rows[0]['number']; + } + + /** + * Create the `state` table + */ + private async $createMigrationStateTable(): Promise { + try { + const query = `CREATE TABLE IF NOT EXISTS state ( + name varchar(25) NOT NULL, + number int(11) NULL, + string varchar(100) NULL, + CONSTRAINT name_unique UNIQUE (name) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + await this.$executeQuery(query); + + // Set initial values + await this.$executeQuery(`INSERT INTO state VALUES('schema_version', 0, NULL);`); + } catch (e) { + throw e; + } + } + + /** + * We actually execute the migrations queries here + */ + private async $migrateTableSchemaFromVersion(version: number): Promise { + const transactionQueries: string[] = []; + for (const query of this.getMigrationQueriesFromVersion(version)) { + transactionQueries.push(query); + } + transactionQueries.push(this.getUpdateToLatestSchemaVersionQuery()); + + try { + await this.$executeQuery('START TRANSACTION;'); + for (const query of transactionQueries) { + await this.$executeQuery(query); + } + await this.$executeQuery('COMMIT;'); + } catch (e) { + await this.$executeQuery('ROLLBACK;'); + throw e; + } + } + + /** + * Generate migration queries based on schema version + */ + private getMigrationQueriesFromVersion(version: number): string[] { + const queries: string[] = []; + return queries; + } + + /** + * Save the schema version in the database + */ + private getUpdateToLatestSchemaVersionQuery(): string { + return `UPDATE state SET number = ${DatabaseMigration.currentVersion} WHERE name = 'schema_version';`; + } + + /** + * Print current database version + */ + private async $printDatabaseVersion() { + try { + const [rows] = await this.$executeQuery('SELECT VERSION() as version;', true); + logger.debug(`MIGRATIONS: Database engine version '${rows[0].version}'`); + } catch (e) { + logger.debug(`MIGRATIONS: Could not fetch database engine version. ` + e); + } + } + + private getCreateStatisticsQuery(): string { + return `CREATE TABLE IF NOT EXISTS statistics ( + id int(11) NOT NULL AUTO_INCREMENT, + added datetime NOT NULL, + channel_count int(11) NOT NULL, + node_count int(11) NOT NULL, + total_capacity double unsigned NOT NULL, + average_channel_size double unsigned NOT NULL, + CONSTRAINT PRIMARY KEY (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + } + + private getCreateNodesQuery(): string { + return `CREATE TABLE IF NOT EXISTS nodes ( + public_key varchar(66) NOT NULL, + first_seen datetime NOT NULL, + updated_at datetime NOT NULL, + alias varchar(200) COLLATE utf8mb4_general_ci NOT NULL, + color varchar(200) NOT NULL, + CONSTRAINT PRIMARY KEY (public_key) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + } + + private getCreateChannelsQuery(): string { + return `CREATE TABLE IF NOT EXISTS channels ( + id varchar(15) NOT NULL, + capacity double unsigned NOT NULL, + transaction_id varchar(64) NOT NULL, + transaction_vout int(11) NOT NULL, + updated_at datetime NOT NULL, + node1_public_key varchar(66) NOT NULL, + node1_base_fee_mtokens double unsigned NULL, + node1_cltv_delta int(11) NULL, + node1_fee_rate int(11) NULL, + node1_is_disabled boolean NULL, + node1_max_htlc_mtokens double unsigned NULL, + node1_min_htlc_mtokens double unsigned NULL, + node1_updated_at datetime NULL, + node2_public_key varchar(66) NOT NULL, + node2_base_fee_mtokens double unsigned NULL, + node2_cltv_delta int(11) NULL, + node2_fee_rate int(11) NULL, + node2_is_disabled boolean NULL, + node2_max_htlc_mtokens double unsigned NULL, + node2_min_htlc_mtokens double unsigned NULL, + node2_updated_at datetime NULL, + CONSTRAINT PRIMARY KEY (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + } +} + +export default new DatabaseMigration(); diff --git a/lightning-backend/src/index.ts b/lightning-backend/src/index.ts index f2dc0fb91..a9e487ef5 100644 --- a/lightning-backend/src/index.ts +++ b/lightning-backend/src/index.ts @@ -1,8 +1,9 @@ import config from './config'; import logger from './logger'; import DB from './database'; -import lightningApi from './api/lightning-api-factory'; -import statsUpdater from './tasks/stats-updater'; +import databaseMigration from './database-migration'; +import statsUpdater from './tasks/stats-updater.service'; +import nodeSyncService from './tasks/node-sync.service'; logger.notice(`Mempool Server is running on port ${config.MEMPOOL.HTTP_PORT}`); @@ -13,12 +14,10 @@ class LightningServer { async init() { await DB.checkDbConnection(); + await databaseMigration.$initializeOrMigrateDatabase(); statsUpdater.startService(); - - const networkGraph = await lightningApi.getNetworkGraph(); - logger.info('Network graph channels: ' + networkGraph.channels.length); - logger.info('Network graph nodes: ' + networkGraph.nodes.length); + nodeSyncService.startService(); } } diff --git a/lightning-backend/src/tasks/node-sync.service.ts b/lightning-backend/src/tasks/node-sync.service.ts new file mode 100644 index 000000000..8b5116be9 --- /dev/null +++ b/lightning-backend/src/tasks/node-sync.service.ts @@ -0,0 +1,163 @@ + +import DB from '../database'; +import logger from '../logger'; +import lightningApi from '../api/lightning-api-factory'; +import { ILightningApi } from '../api/lightning-api.interface'; + +class NodeSyncService { + constructor() {} + + public async startService() { + logger.info('Starting node sync service'); + + this.$updateNodes(); + + setInterval(async () => { + await this.$updateNodes(); + }, 1000 * 60 * 60); + } + + private async $updateNodes() { + try { + const networkGraph = await lightningApi.$getNetworkGraph(); + + for (const node of networkGraph.nodes) { + await this.$saveNode(node); + } + + for (const channel of networkGraph.channels) { + await this.$saveChannel(channel); + } + } catch (e) { + logger.err('$updateNodes() error: ' + (e instanceof Error ? e.message : e)); + } + } + + private async $saveChannel(channel: ILightningApi.Channel) { + try { + const d = new Date(Date.parse(channel.updated_at)); + const query = `INSERT INTO channels + ( + id, + capacity, + transaction_id, + transaction_vout, + updated_at, + node1_public_key, + node1_base_fee_mtokens, + node1_cltv_delta, + node1_fee_rate, + node1_is_disabled, + node1_max_htlc_mtokens, + node1_min_htlc_mtokens, + node1_updated_at, + node2_public_key, + node2_base_fee_mtokens, + node2_cltv_delta, + node2_fee_rate, + node2_is_disabled, + node2_max_htlc_mtokens, + node2_min_htlc_mtokens, + node2_updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + capacity = ?, + updated_at = ?, + node1_public_key = ?, + node1_base_fee_mtokens = ?, + node1_cltv_delta = ?, + node1_fee_rate = ?, + node1_is_disabled = ?, + node1_max_htlc_mtokens = ?, + node1_min_htlc_mtokens = ?, + node1_updated_at = ?, + node2_public_key = ?, + node2_base_fee_mtokens = ?, + node2_cltv_delta = ?, + node2_fee_rate = ?, + node2_is_disabled = ?, + node2_max_htlc_mtokens = ?, + node2_min_htlc_mtokens = ?, + node2_updated_at = ? + ;`; + + await DB.query(query, [ + channel.id, + channel.capacity, + channel.transaction_id, + channel.transaction_vout, + channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0, + channel.policies[0].public_key, + channel.policies[0].base_fee_mtokens, + channel.policies[0].cltv_delta, + channel.policies[0].fee_rate, + channel.policies[0].is_disabled, + channel.policies[0].max_htlc_mtokens, + channel.policies[0].min_htlc_mtokens, + channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0, + channel.policies[1].public_key, + channel.policies[1].base_fee_mtokens, + channel.policies[1].cltv_delta, + channel.policies[1].fee_rate, + channel.policies[1].is_disabled, + channel.policies[1].max_htlc_mtokens, + channel.policies[1].min_htlc_mtokens, + channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0, + channel.capacity, + channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0, + channel.policies[0].public_key, + channel.policies[0].base_fee_mtokens, + channel.policies[0].cltv_delta, + channel.policies[0].fee_rate, + channel.policies[0].is_disabled, + channel.policies[0].max_htlc_mtokens, + channel.policies[0].min_htlc_mtokens, + channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0, + channel.policies[1].public_key, + channel.policies[1].base_fee_mtokens, + channel.policies[1].cltv_delta, + channel.policies[1].fee_rate, + channel.policies[1].is_disabled, + channel.policies[1].max_htlc_mtokens, + channel.policies[1].min_htlc_mtokens, + channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0, + ]); + } catch (e) { + logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e)); + } + } + + private async $saveNode(node: ILightningApi.Node) { + try { + const updatedAt = this.utcDateToMysql(node.updated_at); + const query = `INSERT INTO nodes( + public_key, + first_seen, + updated_at, + alias, + color + ) + VALUES (?, NOW(), ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?;`; + + await DB.query(query, [ + node.public_key, + updatedAt, + node.alias, + node.color, + updatedAt, + node.alias, + node.color, + ]); + } catch (e) { + logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e)); + } + } + + private utcDateToMysql(dateString: string): string { + const d = new Date(Date.parse(dateString)); + return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; + } +} + +export default new NodeSyncService(); diff --git a/lightning-backend/src/tasks/stats-updater.ts b/lightning-backend/src/tasks/stats-updater.service.ts similarity index 69% rename from lightning-backend/src/tasks/stats-updater.ts rename to lightning-backend/src/tasks/stats-updater.service.ts index 251e30b15..2946977bc 100644 --- a/lightning-backend/src/tasks/stats-updater.ts +++ b/lightning-backend/src/tasks/stats-updater.service.ts @@ -3,24 +3,11 @@ import DB from '../database'; import logger from '../logger'; import lightningApi from '../api/lightning-api-factory'; -/* -CREATE TABLE IF NOT EXISTS lightning_stats ( - id int(11) NOT NULL AUTO_INCREMENT, - added datetime NOT NULL, - channel_count int(11) NOT NULL, - node_count int(11) NOT NULL, - total_capacity double unsigned NOT NULL, - average_channel_size double unsigned NOT NULL, - CONSTRAINT PRIMARY KEY (id) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -*/ - - class LightningStatsUpdater { constructor() {} public async startService() { - logger.info('Starting Lightning Stats service'); + logger.info('Starting Stats service'); const now = new Date(); const nextHourInterval = new Date(now.getFullYear(), now.getMonth(), now.getDate(), Math.floor(now.getHours() / 1) + 1, 0, 0, 0); @@ -35,10 +22,10 @@ class LightningStatsUpdater { } private async $logLightningStats() { - const networkInfo = await lightningApi.getNetworkInfo(); + const networkInfo = await lightningApi.$getNetworkInfo(); try { - const query = `INSERT INTO lightning_stats( + const query = `INSERT INTO statistics( added, channel_count, node_count,