Merge branch 'master' into nymkappa/bugfix/index-blocks-prices-often
This commit is contained in:
@@ -9,6 +9,7 @@ export interface AbstractBitcoinApi {
|
||||
$getBlockHash(height: number): Promise<string>;
|
||||
$getBlockHeader(hash: string): Promise<string>;
|
||||
$getBlock(hash: string): Promise<IEsploraApi.Block>;
|
||||
$getRawBlock(hash: string): Promise<string>;
|
||||
$getAddress(address: string): Promise<IEsploraApi.Address>;
|
||||
$getAddressTransactions(address: string, lastSeenTxId: string): Promise<IEsploraApi.Transaction[]>;
|
||||
$getAddressPrefix(prefix: string): string[];
|
||||
|
||||
@@ -77,7 +77,8 @@ class BitcoinApi implements AbstractBitcoinApi {
|
||||
}
|
||||
|
||||
$getRawBlock(hash: string): Promise<string> {
|
||||
return this.bitcoindClient.getBlock(hash, 0);
|
||||
return this.bitcoindClient.getBlock(hash, 0)
|
||||
.then((raw: string) => Buffer.from(raw, "hex"));
|
||||
}
|
||||
|
||||
$getBlockHash(height: number): Promise<string> {
|
||||
|
||||
@@ -103,9 +103,10 @@ class BitcoinRoutes {
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/header', this.getBlockHeader)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'blocks/tip/height', this.getBlockTipHeight)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'blocks/tip/hash', this.getBlockTipHash)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/raw', this.getRawBlock)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/txids', this.getTxIdsForBlock)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/txs', this.getBlockTransactions)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/txs/:index', this.getBlockTransactions)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block/:hash/txids', this.getTxIdsForBlock)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'block-height/:height', this.getBlockHeight)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'address/:address', this.getAddress)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'address/:address/txs', this.getAddressTransactions)
|
||||
@@ -470,6 +471,16 @@ class BitcoinRoutes {
|
||||
}
|
||||
}
|
||||
|
||||
private async getRawBlock(req: Request, res: Response) {
|
||||
try {
|
||||
const result = await bitcoinApi.$getRawBlock(req.params.hash);
|
||||
res.setHeader('content-type', 'application/octet-stream');
|
||||
res.send(result);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
}
|
||||
|
||||
private async getTxIdsForBlock(req: Request, res: Response) {
|
||||
try {
|
||||
const result = await bitcoinApi.$getTxIdsForBlock(req.params.hash);
|
||||
|
||||
@@ -50,6 +50,11 @@ class ElectrsApi implements AbstractBitcoinApi {
|
||||
.then((response) => response.data);
|
||||
}
|
||||
|
||||
$getRawBlock(hash: string): Promise<string> {
|
||||
return axios.get<string>(config.ESPLORA.REST_API_URL + '/block/' + hash + "/raw", this.axiosConfig)
|
||||
.then((response) => response.data);
|
||||
}
|
||||
|
||||
$getAddress(address: string): Promise<IEsploraApi.Address> {
|
||||
throw new Error('Method getAddress not implemented.');
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces';
|
||||
import config from '../config';
|
||||
import { convertChannelId } from './lightning/clightning/clightning-convert';
|
||||
export class Common {
|
||||
static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ?
|
||||
'144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49'
|
||||
@@ -184,4 +185,37 @@ export class Common {
|
||||
config.MEMPOOL.BLOCKS_SUMMARIES_INDEXING === true
|
||||
);
|
||||
}
|
||||
|
||||
static setDateMidnight(date: Date): void {
|
||||
date.setUTCHours(0);
|
||||
date.setUTCMinutes(0);
|
||||
date.setUTCSeconds(0);
|
||||
date.setUTCMilliseconds(0);
|
||||
}
|
||||
|
||||
static channelShortIdToIntegerId(id: string): string {
|
||||
if (config.LIGHTNING.BACKEND === 'lnd') {
|
||||
return id;
|
||||
}
|
||||
return convertChannelId(id);
|
||||
}
|
||||
|
||||
/** Decodes a channel id returned by lnd as uint64 to a short channel id */
|
||||
static channelIntegerIdToShortId(id: string): string {
|
||||
if (config.LIGHTNING.BACKEND === 'cln') {
|
||||
return id;
|
||||
}
|
||||
|
||||
const n = BigInt(id);
|
||||
return [
|
||||
n >> 40n, // nth block
|
||||
(n >> 16n) & 0xffffffn, // nth tx of the block
|
||||
n & 0xffffn // nth output of the tx
|
||||
].join('x');
|
||||
}
|
||||
|
||||
static utcDateToMysql(date?: number): string {
|
||||
const d = new Date((date || 0) * 1000);
|
||||
return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,13 +4,13 @@ import logger from '../logger';
|
||||
import { Common } from './common';
|
||||
|
||||
class DatabaseMigration {
|
||||
private static currentVersion = 33;
|
||||
private static currentVersion = 36;
|
||||
private queryTimeout = 120000;
|
||||
private statisticsAddedIndexed = false;
|
||||
private uniqueLogs: string[] = [];
|
||||
|
||||
private blocksTruncatedMessage = `'blocks' table has been truncated. Re-indexing from scratch.`;
|
||||
private hashratesTruncatedMessage = `'hashrates' table has been truncated. Re-indexing from scratch.`;
|
||||
private blocksTruncatedMessage = `'blocks' table has been truncated.`;
|
||||
private hashratesTruncatedMessage = `'hashrates' table has been truncated.`;
|
||||
|
||||
/**
|
||||
* Avoid printing multiple time the same message
|
||||
@@ -256,7 +256,9 @@ class DatabaseMigration {
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 26 && isBitcoin === true) {
|
||||
this.uniqueLog(logger.notice, `'lightning_stats' table has been truncated. Will re-generate historical data from scratch.`);
|
||||
if (config.LIGHTNING.ENABLED) {
|
||||
this.uniqueLog(logger.notice, `'lightning_stats' table has been truncated.`);
|
||||
}
|
||||
await this.$executeQuery(`TRUNCATE lightning_stats`);
|
||||
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD tor_nodes int(11) NOT NULL DEFAULT "0"');
|
||||
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD clearnet_nodes int(11) NOT NULL DEFAULT "0"');
|
||||
@@ -273,6 +275,9 @@ class DatabaseMigration {
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 28 && isBitcoin === true) {
|
||||
if (config.LIGHTNING.ENABLED) {
|
||||
this.uniqueLog(logger.notice, `'lightning_stats' and 'node_stats' tables have been truncated.`);
|
||||
}
|
||||
await this.$executeQuery(`TRUNCATE lightning_stats`);
|
||||
await this.$executeQuery(`TRUNCATE node_stats`);
|
||||
await this.$executeQuery(`ALTER TABLE lightning_stats MODIFY added DATE`);
|
||||
@@ -306,6 +311,19 @@ class DatabaseMigration {
|
||||
if (databaseSchemaVersion < 33 && isBitcoin == true) {
|
||||
await this.$executeQuery('ALTER TABLE `geo_names` CHANGE `type` `type` enum("city","country","division","continent","as_organization", "country_iso_code") NOT NULL');
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 34 && isBitcoin == true) {
|
||||
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD clearnet_tor_nodes int(11) NOT NULL DEFAULT "0"');
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 35 && isBitcoin == true) {
|
||||
await this.$executeQuery('DELETE from `lightning_stats` WHERE added > "2021-09-19"');
|
||||
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD CONSTRAINT added_unique UNIQUE (added);');
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 36 && isBitcoin == true) {
|
||||
await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import logger from '../../logger';
|
||||
import DB from '../../database';
|
||||
import nodesApi from './nodes.api';
|
||||
import { ResultSetHeader } from 'mysql2';
|
||||
import { ILightningApi } from '../lightning/lightning-api.interface';
|
||||
import { Common } from '../common';
|
||||
|
||||
class ChannelsApi {
|
||||
public async $getAllChannels(): Promise<any[]> {
|
||||
@@ -13,9 +17,10 @@ class ChannelsApi {
|
||||
}
|
||||
}
|
||||
|
||||
public async $getAllChannelsGeo(): Promise<any[]> {
|
||||
public async $getAllChannelsGeo(publicKey?: string): Promise<any[]> {
|
||||
try {
|
||||
const query = `SELECT nodes_1.public_key as node1_public_key, nodes_1.alias AS node1_alias,
|
||||
const params: string[] = [];
|
||||
let query = `SELECT nodes_1.public_key as node1_public_key, nodes_1.alias AS node1_alias,
|
||||
nodes_1.latitude AS node1_latitude, nodes_1.longitude AS node1_longitude,
|
||||
nodes_2.public_key as node2_public_key, nodes_2.alias AS node2_alias,
|
||||
nodes_2.latitude AS node2_latitude, nodes_2.longitude AS node2_longitude,
|
||||
@@ -26,7 +31,14 @@ class ChannelsApi {
|
||||
WHERE nodes_1.latitude IS NOT NULL AND nodes_1.longitude IS NOT NULL
|
||||
AND nodes_2.latitude IS NOT NULL AND nodes_2.longitude IS NOT NULL
|
||||
`;
|
||||
const [rows]: any = await DB.query(query);
|
||||
|
||||
if (publicKey !== undefined) {
|
||||
query += ' AND (nodes_1.public_key = ? OR nodes_2.public_key = ?)';
|
||||
params.push(publicKey);
|
||||
params.push(publicKey);
|
||||
}
|
||||
|
||||
const [rows]: any = await DB.query(query, params);
|
||||
return rows.map((row) => [
|
||||
row.node1_public_key, row.node1_alias, row.node1_longitude, row.node1_latitude,
|
||||
row.node2_public_key, row.node2_alias, row.node2_longitude, row.node2_latitude,
|
||||
@@ -173,15 +185,57 @@ class ChannelsApi {
|
||||
|
||||
public async $getChannelsForNode(public_key: string, index: number, length: number, status: string): Promise<any[]> {
|
||||
try {
|
||||
// Default active and inactive channels
|
||||
let statusQuery = '< 2';
|
||||
// Closed channels only
|
||||
if (status === 'closed') {
|
||||
statusQuery = '= 2';
|
||||
let channelStatusFilter;
|
||||
if (status === 'open') {
|
||||
channelStatusFilter = '< 2';
|
||||
} else if (status === 'closed') {
|
||||
channelStatusFilter = '= 2';
|
||||
}
|
||||
const query = `SELECT n1.alias AS alias_left, n2.alias AS alias_right, channels.*, ns1.channels AS channels_left, ns1.capacity AS capacity_left, ns2.channels AS channels_right, ns2.capacity AS capacity_right FROM channels LEFT JOIN nodes AS n1 ON n1.public_key = channels.node1_public_key LEFT JOIN nodes AS n2 ON n2.public_key = channels.node2_public_key LEFT JOIN node_stats AS ns1 ON ns1.public_key = channels.node1_public_key LEFT JOIN node_stats AS ns2 ON ns2.public_key = channels.node2_public_key WHERE (ns1.id = (SELECT MAX(id) FROM node_stats WHERE public_key = channels.node1_public_key) AND ns2.id = (SELECT MAX(id) FROM node_stats WHERE public_key = channels.node2_public_key)) AND (node1_public_key = ? OR node2_public_key = ?) AND status ${statusQuery} ORDER BY channels.capacity DESC LIMIT ?, ?`;
|
||||
const [rows]: any = await DB.query(query, [public_key, public_key, index, length]);
|
||||
const channels = rows.map((row) => this.convertChannel(row));
|
||||
|
||||
// Channels originating from node
|
||||
let query = `
|
||||
SELECT node2.alias, node2.public_key, channels.status, channels.node1_fee_rate,
|
||||
channels.capacity, channels.short_id, channels.id
|
||||
FROM channels
|
||||
JOIN nodes AS node2 ON node2.public_key = channels.node2_public_key
|
||||
WHERE node1_public_key = ? AND channels.status ${channelStatusFilter}
|
||||
`;
|
||||
const [channelsFromNode]: any = await DB.query(query, [public_key, index, length]);
|
||||
|
||||
// Channels incoming to node
|
||||
query = `
|
||||
SELECT node1.alias, node1.public_key, channels.status, channels.node2_fee_rate,
|
||||
channels.capacity, channels.short_id, channels.id
|
||||
FROM channels
|
||||
JOIN nodes AS node1 ON node1.public_key = channels.node1_public_key
|
||||
WHERE node2_public_key = ? AND channels.status ${channelStatusFilter}
|
||||
`;
|
||||
const [channelsToNode]: any = await DB.query(query, [public_key, index, length]);
|
||||
|
||||
let allChannels = channelsFromNode.concat(channelsToNode);
|
||||
allChannels.sort((a, b) => {
|
||||
return b.capacity - a.capacity;
|
||||
});
|
||||
allChannels = allChannels.slice(index, index + length);
|
||||
|
||||
const channels: any[] = []
|
||||
for (const row of allChannels) {
|
||||
const activeChannelsStats: any = await nodesApi.$getActiveChannelsStats(row.public_key);
|
||||
channels.push({
|
||||
status: row.status,
|
||||
capacity: row.capacity ?? 0,
|
||||
short_id: row.short_id,
|
||||
id: row.id,
|
||||
fee_rate: row.node1_fee_rate ?? row.node2_fee_rate ?? 0,
|
||||
node: {
|
||||
alias: row.alias.length > 0 ? row.alias : row.public_key.slice(0, 20),
|
||||
public_key: row.public_key,
|
||||
channels: activeChannelsStats.active_channel_count ?? 0,
|
||||
capacity: activeChannelsStats.capacity ?? 0,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return channels;
|
||||
} catch (e) {
|
||||
logger.err('$getChannelsForNode error: ' + (e instanceof Error ? e.message : e));
|
||||
@@ -197,7 +251,12 @@ class ChannelsApi {
|
||||
if (status === 'closed') {
|
||||
statusQuery = '= 2';
|
||||
}
|
||||
const query = `SELECT COUNT(*) AS count FROM channels WHERE (node1_public_key = ? OR node2_public_key = ?) AND status ${statusQuery}`;
|
||||
const query = `
|
||||
SELECT COUNT(*) AS count
|
||||
FROM channels
|
||||
WHERE (node1_public_key = ? OR node2_public_key = ?)
|
||||
AND status ${statusQuery}
|
||||
`;
|
||||
const [rows]: any = await DB.query(query, [public_key, public_key]);
|
||||
return rows[0]['count'];
|
||||
} catch (e) {
|
||||
@@ -246,6 +305,135 @@ class ChannelsApi {
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Save or update a channel present in the graph
|
||||
*/
|
||||
public async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
|
||||
const [ txid, vout ] = channel.chan_point.split(':');
|
||||
|
||||
const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {};
|
||||
const policy2: Partial<ILightningApi.RoutingPolicy> = channel.node2_policy || {};
|
||||
|
||||
const query = `INSERT INTO channels
|
||||
(
|
||||
id,
|
||||
short_id,
|
||||
capacity,
|
||||
transaction_id,
|
||||
transaction_vout,
|
||||
updated_at,
|
||||
status,
|
||||
node1_public_key,
|
||||
node1_base_fee_mtokens,
|
||||
node1_cltv_delta,
|
||||
node1_fee_rate,
|
||||
node1_is_disabled,
|
||||
node1_max_htlc_mtokens,
|
||||
node1_min_htlc_mtokens,
|
||||
node1_updated_at,
|
||||
node2_public_key,
|
||||
node2_base_fee_mtokens,
|
||||
node2_cltv_delta,
|
||||
node2_fee_rate,
|
||||
node2_is_disabled,
|
||||
node2_max_htlc_mtokens,
|
||||
node2_min_htlc_mtokens,
|
||||
node2_updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
capacity = ?,
|
||||
updated_at = ?,
|
||||
status = 1,
|
||||
node1_public_key = ?,
|
||||
node1_base_fee_mtokens = ?,
|
||||
node1_cltv_delta = ?,
|
||||
node1_fee_rate = ?,
|
||||
node1_is_disabled = ?,
|
||||
node1_max_htlc_mtokens = ?,
|
||||
node1_min_htlc_mtokens = ?,
|
||||
node1_updated_at = ?,
|
||||
node2_public_key = ?,
|
||||
node2_base_fee_mtokens = ?,
|
||||
node2_cltv_delta = ?,
|
||||
node2_fee_rate = ?,
|
||||
node2_is_disabled = ?,
|
||||
node2_max_htlc_mtokens = ?,
|
||||
node2_min_htlc_mtokens = ?,
|
||||
node2_updated_at = ?
|
||||
;`;
|
||||
|
||||
await DB.query(query, [
|
||||
Common.channelShortIdToIntegerId(channel.channel_id),
|
||||
Common.channelIntegerIdToShortId(channel.channel_id),
|
||||
channel.capacity,
|
||||
txid,
|
||||
vout,
|
||||
Common.utcDateToMysql(channel.last_update),
|
||||
channel.node1_pub,
|
||||
policy1.fee_base_msat,
|
||||
policy1.time_lock_delta,
|
||||
policy1.fee_rate_milli_msat,
|
||||
policy1.disabled,
|
||||
policy1.max_htlc_msat,
|
||||
policy1.min_htlc,
|
||||
Common.utcDateToMysql(policy1.last_update),
|
||||
channel.node2_pub,
|
||||
policy2.fee_base_msat,
|
||||
policy2.time_lock_delta,
|
||||
policy2.fee_rate_milli_msat,
|
||||
policy2.disabled,
|
||||
policy2.max_htlc_msat,
|
||||
policy2.min_htlc,
|
||||
Common.utcDateToMysql(policy2.last_update),
|
||||
channel.capacity,
|
||||
Common.utcDateToMysql(channel.last_update),
|
||||
channel.node1_pub,
|
||||
policy1.fee_base_msat,
|
||||
policy1.time_lock_delta,
|
||||
policy1.fee_rate_milli_msat,
|
||||
policy1.disabled,
|
||||
policy1.max_htlc_msat,
|
||||
policy1.min_htlc,
|
||||
Common.utcDateToMysql(policy1.last_update),
|
||||
channel.node2_pub,
|
||||
policy2.fee_base_msat,
|
||||
policy2.time_lock_delta,
|
||||
policy2.fee_rate_milli_msat,
|
||||
policy2.disabled,
|
||||
policy2.max_htlc_msat,
|
||||
policy2.min_htlc,
|
||||
Common.utcDateToMysql(policy2.last_update)
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set all channels not in `graphChannelsIds` as inactive (status = 0)
|
||||
*/
|
||||
public async $setChannelsInactive(graphChannelsIds: string[]): Promise<void> {
|
||||
if (graphChannelsIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await DB.query<ResultSetHeader>(`
|
||||
UPDATE channels
|
||||
SET status = 0
|
||||
WHERE short_id NOT IN (
|
||||
${graphChannelsIds.map(id => `"${id}"`).join(',')}
|
||||
)
|
||||
AND status != 2
|
||||
`);
|
||||
if (result[0].changedRows ?? 0 > 0) {
|
||||
logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`);
|
||||
} else {
|
||||
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not in the graph`);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new ChannelsApi();
|
||||
|
||||
@@ -11,7 +11,8 @@ class ChannelsRoutes {
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels/search/:search', this.$searchChannelsById)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels/:short_id', this.$getChannel)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels', this.$getChannelsForNode)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels-geo', this.$getChannelsGeo)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels-geo', this.$getAllChannelsGeo)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/channels-geo/:publicKey', this.$getAllChannelsGeo)
|
||||
;
|
||||
}
|
||||
|
||||
@@ -45,9 +46,11 @@ class ChannelsRoutes {
|
||||
}
|
||||
const index = parseInt(typeof req.query.index === 'string' ? req.query.index : '0', 10) || 0;
|
||||
const status: string = typeof req.query.status === 'string' ? req.query.status : '';
|
||||
const length = 25;
|
||||
const channels = await channelsApi.$getChannelsForNode(req.query.public_key, index, length, status);
|
||||
const channels = await channelsApi.$getChannelsForNode(req.query.public_key, index, 10, status);
|
||||
const channelsCount = await channelsApi.$getChannelsCountForNode(req.query.public_key, status);
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
|
||||
res.header('X-Total-Count', channelsCount.toString());
|
||||
res.json(channels);
|
||||
} catch (e) {
|
||||
@@ -94,9 +97,9 @@ class ChannelsRoutes {
|
||||
}
|
||||
}
|
||||
|
||||
private async $getChannelsGeo(req: Request, res: Response) {
|
||||
private async $getAllChannelsGeo(req: Request, res: Response) {
|
||||
try {
|
||||
const channels = await channelsApi.$getAllChannelsGeo();
|
||||
const channels = await channelsApi.$getAllChannelsGeo(req.params?.publicKey);
|
||||
res.json(channels);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
|
||||
@@ -1,43 +1,90 @@
|
||||
import logger from '../../logger';
|
||||
import DB from '../../database';
|
||||
import { ResultSetHeader } from 'mysql2';
|
||||
import { ILightningApi } from '../lightning/lightning-api.interface';
|
||||
|
||||
class NodesApi {
|
||||
public async $getNode(public_key: string): Promise<any> {
|
||||
try {
|
||||
const query = `
|
||||
SELECT nodes.*, geo_names_as.names as as_organization, geo_names_city.names as city,
|
||||
geo_names_country.names as country, geo_names_subdivision.names as subdivision,
|
||||
(SELECT Count(*)
|
||||
FROM channels
|
||||
WHERE channels.status < 2 AND ( channels.node1_public_key = ? OR channels.node2_public_key = ? )) AS channel_count,
|
||||
(SELECT Sum(capacity)
|
||||
FROM channels
|
||||
WHERE channels.status < 2 AND ( channels.node1_public_key = ? OR channels.node2_public_key = ? )) AS capacity,
|
||||
(SELECT Avg(capacity)
|
||||
FROM channels
|
||||
WHERE status < 2 AND ( node1_public_key = ? OR node2_public_key = ? )) AS channels_capacity_avg
|
||||
// General info
|
||||
let query = `
|
||||
SELECT public_key, alias, UNIX_TIMESTAMP(first_seen) AS first_seen,
|
||||
UNIX_TIMESTAMP(updated_at) AS updated_at, color, sockets as sockets,
|
||||
as_number, city_id, country_id, subdivision_id, longitude, latitude,
|
||||
geo_names_iso.names as iso_code, geo_names_as.names as as_organization, geo_names_city.names as city,
|
||||
geo_names_country.names as country, geo_names_subdivision.names as subdivision
|
||||
FROM nodes
|
||||
LEFT JOIN geo_names geo_names_as on geo_names_as.id = as_number
|
||||
LEFT JOIN geo_names geo_names_city on geo_names_city.id = city_id
|
||||
LEFT JOIN geo_names geo_names_subdivision on geo_names_subdivision.id = subdivision_id
|
||||
LEFT JOIN geo_names geo_names_country on geo_names_country.id = country_id
|
||||
LEFT JOIN geo_names geo_names_iso ON geo_names_iso.id = nodes.country_id AND geo_names_iso.type = 'country_iso_code'
|
||||
WHERE public_key = ?
|
||||
`;
|
||||
const [rows]: any = await DB.query(query, [public_key, public_key, public_key, public_key, public_key, public_key, public_key]);
|
||||
if (rows.length > 0) {
|
||||
rows[0].as_organization = JSON.parse(rows[0].as_organization);
|
||||
rows[0].subdivision = JSON.parse(rows[0].subdivision);
|
||||
rows[0].city = JSON.parse(rows[0].city);
|
||||
rows[0].country = JSON.parse(rows[0].country);
|
||||
return rows[0];
|
||||
let [rows]: any[] = await DB.query(query, [public_key]);
|
||||
if (rows.length === 0) {
|
||||
throw new Error(`This node does not exist, or our node is not seeing it yet`);
|
||||
}
|
||||
return null;
|
||||
|
||||
const node = rows[0];
|
||||
node.as_organization = JSON.parse(node.as_organization);
|
||||
node.subdivision = JSON.parse(node.subdivision);
|
||||
node.city = JSON.parse(node.city);
|
||||
node.country = JSON.parse(node.country);
|
||||
|
||||
// Active channels and capacity
|
||||
const activeChannelsStats: any = await this.$getActiveChannelsStats(public_key);
|
||||
node.active_channel_count = activeChannelsStats.active_channel_count ?? 0;
|
||||
node.capacity = activeChannelsStats.capacity ?? 0;
|
||||
|
||||
// Opened channels count
|
||||
query = `
|
||||
SELECT count(short_id) as opened_channel_count
|
||||
FROM channels
|
||||
WHERE status != 2 AND (channels.node1_public_key = ? OR channels.node2_public_key = ?)
|
||||
`;
|
||||
[rows] = await DB.query(query, [public_key, public_key]);
|
||||
node.opened_channel_count = 0;
|
||||
if (rows.length > 0) {
|
||||
node.opened_channel_count = rows[0].opened_channel_count;
|
||||
}
|
||||
|
||||
// Closed channels count
|
||||
query = `
|
||||
SELECT count(short_id) as closed_channel_count
|
||||
FROM channels
|
||||
WHERE status = 2 AND (channels.node1_public_key = ? OR channels.node2_public_key = ?)
|
||||
`;
|
||||
[rows] = await DB.query(query, [public_key, public_key]);
|
||||
node.closed_channel_count = 0;
|
||||
if (rows.length > 0) {
|
||||
node.closed_channel_count = rows[0].closed_channel_count;
|
||||
}
|
||||
|
||||
return node;
|
||||
} catch (e) {
|
||||
logger.err('$getNode error: ' + (e instanceof Error ? e.message : e));
|
||||
logger.err(`Cannot get node information for ${public_key}. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async $getActiveChannelsStats(node_public_key: string): Promise<unknown> {
|
||||
const query = `
|
||||
SELECT count(short_id) as active_channel_count, sum(capacity) as capacity
|
||||
FROM channels
|
||||
WHERE status = 1 AND (channels.node1_public_key = ? OR channels.node2_public_key = ?)
|
||||
`;
|
||||
const [rows]: any[] = await DB.query(query, [node_public_key, node_public_key]);
|
||||
if (rows.length > 0) {
|
||||
return {
|
||||
active_channel_count: rows[0].active_channel_count,
|
||||
capacity: rows[0].capacity
|
||||
};
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public async $getAllNodes(): Promise<any> {
|
||||
try {
|
||||
const query = `SELECT * FROM nodes`;
|
||||
@@ -51,7 +98,12 @@ class NodesApi {
|
||||
|
||||
public async $getNodeStats(public_key: string): Promise<any> {
|
||||
try {
|
||||
const query = `SELECT UNIX_TIMESTAMP(added) AS added, capacity, channels FROM node_stats WHERE public_key = ? ORDER BY added DESC`;
|
||||
const query = `
|
||||
SELECT UNIX_TIMESTAMP(added) AS added, capacity, channels
|
||||
FROM node_stats
|
||||
WHERE public_key = ?
|
||||
ORDER BY added DESC
|
||||
`;
|
||||
const [rows]: any = await DB.query(query, [public_key]);
|
||||
return rows;
|
||||
} catch (e) {
|
||||
@@ -62,8 +114,19 @@ class NodesApi {
|
||||
|
||||
public async $getTopCapacityNodes(): Promise<any> {
|
||||
try {
|
||||
const query = `SELECT nodes.*, node_stats.capacity, node_stats.channels FROM nodes LEFT JOIN node_stats ON node_stats.public_key = nodes.public_key ORDER BY node_stats.added DESC, node_stats.capacity DESC LIMIT 10`;
|
||||
const [rows]: any = await DB.query(query);
|
||||
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
|
||||
const latestDate = rows[0].maxAdded;
|
||||
|
||||
const query = `
|
||||
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
|
||||
FROM node_stats
|
||||
JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
WHERE added = FROM_UNIXTIME(${latestDate})
|
||||
ORDER BY capacity DESC
|
||||
LIMIT 10;
|
||||
`;
|
||||
[rows] = await DB.query(query);
|
||||
|
||||
return rows;
|
||||
} catch (e) {
|
||||
logger.err('$getTopCapacityNodes error: ' + (e instanceof Error ? e.message : e));
|
||||
@@ -73,8 +136,19 @@ class NodesApi {
|
||||
|
||||
public async $getTopChannelsNodes(): Promise<any> {
|
||||
try {
|
||||
const query = `SELECT nodes.*, node_stats.capacity, node_stats.channels FROM nodes LEFT JOIN node_stats ON node_stats.public_key = nodes.public_key ORDER BY node_stats.added DESC, node_stats.channels DESC LIMIT 10`;
|
||||
const [rows]: any = await DB.query(query);
|
||||
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
|
||||
const latestDate = rows[0].maxAdded;
|
||||
|
||||
const query = `
|
||||
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
|
||||
FROM node_stats
|
||||
JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
WHERE added = FROM_UNIXTIME(${latestDate})
|
||||
ORDER BY channels DESC
|
||||
LIMIT 10;
|
||||
`;
|
||||
[rows] = await DB.query(query);
|
||||
|
||||
return rows;
|
||||
} catch (e) {
|
||||
logger.err('$getTopChannelsNodes error: ' + (e instanceof Error ? e.message : e));
|
||||
@@ -94,29 +168,59 @@ class NodesApi {
|
||||
}
|
||||
}
|
||||
|
||||
public async $getNodesISP() {
|
||||
public async $getNodesISP(groupBy: string, showTor: boolean) {
|
||||
try {
|
||||
let query = `SELECT nodes.as_number as ispId, geo_names.names as names, COUNT(DISTINCT nodes.public_key) as nodesCount, SUM(capacity) as capacity
|
||||
const orderBy = groupBy === 'capacity' ? `CAST(SUM(capacity) as INT)` : `COUNT(DISTINCT nodes.public_key)`;
|
||||
|
||||
// Clearnet
|
||||
let query = `SELECT GROUP_CONCAT(DISTINCT(nodes.as_number)) as ispId, geo_names.names as names,
|
||||
COUNT(DISTINCT nodes.public_key) as nodesCount, CAST(SUM(capacity) as INT) as capacity
|
||||
FROM nodes
|
||||
JOIN geo_names ON geo_names.id = nodes.as_number
|
||||
JOIN channels ON channels.node1_public_key = nodes.public_key OR channels.node2_public_key = nodes.public_key
|
||||
GROUP BY as_number
|
||||
ORDER BY COUNT(DISTINCT nodes.public_key) DESC
|
||||
`;
|
||||
GROUP BY geo_names.names
|
||||
ORDER BY ${orderBy} DESC
|
||||
`;
|
||||
const [nodesCountPerAS]: any = await DB.query(query);
|
||||
|
||||
query = `SELECT COUNT(*) as total FROM nodes WHERE as_number IS NOT NULL`;
|
||||
const [nodesWithAS]: any = await DB.query(query);
|
||||
|
||||
let total = 0;
|
||||
const nodesPerAs: any[] = [];
|
||||
|
||||
for (const asGroup of nodesCountPerAS) {
|
||||
if (groupBy === 'capacity') {
|
||||
total += asGroup.capacity;
|
||||
} else {
|
||||
total += asGroup.nodesCount;
|
||||
}
|
||||
}
|
||||
|
||||
// Tor
|
||||
if (showTor) {
|
||||
query = `SELECT COUNT(DISTINCT nodes.public_key) as nodesCount, CAST(SUM(capacity) as INT) as capacity
|
||||
FROM nodes
|
||||
JOIN channels ON channels.node1_public_key = nodes.public_key OR channels.node2_public_key = nodes.public_key
|
||||
ORDER BY ${orderBy} DESC
|
||||
`;
|
||||
const [nodesCountTor]: any = await DB.query(query);
|
||||
|
||||
total += groupBy === 'capacity' ? nodesCountTor[0].capacity : nodesCountTor[0].nodesCount;
|
||||
nodesPerAs.push({
|
||||
ispId: null,
|
||||
name: 'Tor',
|
||||
count: nodesCountTor[0].nodesCount,
|
||||
share: Math.floor((groupBy === 'capacity' ? nodesCountTor[0].capacity : nodesCountTor[0].nodesCount) / total * 10000) / 100,
|
||||
capacity: nodesCountTor[0].capacity,
|
||||
});
|
||||
}
|
||||
|
||||
for (const as of nodesCountPerAS) {
|
||||
nodesPerAs.push({
|
||||
ispId: as.ispId,
|
||||
name: JSON.parse(as.names),
|
||||
count: as.nodesCount,
|
||||
share: Math.floor(as.nodesCount / nodesWithAS[0].total * 10000) / 100,
|
||||
share: Math.floor((groupBy === 'capacity' ? as.capacity : as.nodesCount) / total * 10000) / 100,
|
||||
capacity: as.capacity,
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
return nodesPerAs;
|
||||
@@ -129,8 +233,8 @@ class NodesApi {
|
||||
public async $getNodesPerCountry(countryId: string) {
|
||||
try {
|
||||
const query = `
|
||||
SELECT node_stats.public_key, node_stats.capacity, node_stats.channels, nodes.alias,
|
||||
UNIX_TIMESTAMP(nodes.first_seen) as first_seen, UNIX_TIMESTAMP(nodes.updated_at) as updated_at,
|
||||
SELECT nodes.public_key, CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity, CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
|
||||
nodes.alias, UNIX_TIMESTAMP(nodes.first_seen) as first_seen, UNIX_TIMESTAMP(nodes.updated_at) as updated_at,
|
||||
geo_names_city.names as city
|
||||
FROM node_stats
|
||||
JOIN (
|
||||
@@ -138,7 +242,7 @@ class NodesApi {
|
||||
FROM node_stats
|
||||
GROUP BY public_key
|
||||
) as b ON b.public_key = node_stats.public_key AND b.last_added = node_stats.added
|
||||
JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
|
||||
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
|
||||
WHERE geo_names_country.id = ?
|
||||
@@ -159,8 +263,8 @@ class NodesApi {
|
||||
public async $getNodesPerISP(ISPId: string) {
|
||||
try {
|
||||
const query = `
|
||||
SELECT node_stats.public_key, node_stats.capacity, node_stats.channels, nodes.alias,
|
||||
UNIX_TIMESTAMP(nodes.first_seen) as first_seen, UNIX_TIMESTAMP(nodes.updated_at) as updated_at,
|
||||
SELECT nodes.public_key, CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity, CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
|
||||
nodes.alias, UNIX_TIMESTAMP(nodes.first_seen) as first_seen, UNIX_TIMESTAMP(nodes.updated_at) as updated_at,
|
||||
geo_names_city.names as city, geo_names_country.names as country
|
||||
FROM node_stats
|
||||
JOIN (
|
||||
@@ -168,14 +272,14 @@ class NodesApi {
|
||||
FROM node_stats
|
||||
GROUP BY public_key
|
||||
) as b ON b.public_key = node_stats.public_key AND b.last_added = node_stats.added
|
||||
JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
|
||||
JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
|
||||
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
|
||||
WHERE nodes.as_number = ?
|
||||
WHERE nodes.as_number IN (?)
|
||||
ORDER BY capacity DESC
|
||||
`;
|
||||
|
||||
const [rows]: any = await DB.query(query, [ISPId]);
|
||||
const [rows]: any = await DB.query(query, [ISPId.split(',')]);
|
||||
for (let i = 0; i < rows.length; ++i) {
|
||||
rows[i].country = JSON.parse(rows[i].country);
|
||||
rows[i].city = JSON.parse(rows[i].city);
|
||||
@@ -219,6 +323,66 @@ class NodesApi {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save or update a node present in the graph
|
||||
*/
|
||||
public async $saveNode(node: ILightningApi.Node): Promise<void> {
|
||||
try {
|
||||
const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? '';
|
||||
const query = `INSERT INTO nodes(
|
||||
public_key,
|
||||
first_seen,
|
||||
updated_at,
|
||||
alias,
|
||||
color,
|
||||
sockets,
|
||||
status
|
||||
)
|
||||
VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?, 1)
|
||||
ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?, status = 1`;
|
||||
|
||||
await DB.query(query, [
|
||||
node.pub_key,
|
||||
node.last_update,
|
||||
node.alias,
|
||||
node.color,
|
||||
sockets,
|
||||
node.last_update,
|
||||
node.alias,
|
||||
node.color,
|
||||
sockets,
|
||||
]);
|
||||
} catch (e) {
|
||||
logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set all nodes not in `nodesPubkeys` as inactive (status = 0)
|
||||
*/
|
||||
public async $setNodesInactive(graphNodesPubkeys: string[]): Promise<void> {
|
||||
if (graphNodesPubkeys.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await DB.query<ResultSetHeader>(`
|
||||
UPDATE nodes
|
||||
SET status = 0
|
||||
WHERE public_key NOT IN (
|
||||
${graphNodesPubkeys.map(pubkey => `"${pubkey}"`).join(',')}
|
||||
)
|
||||
`);
|
||||
if (result[0].changedRows ?? 0 > 0) {
|
||||
logger.info(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`);
|
||||
} else {
|
||||
logger.debug(`Marked ${result[0].changedRows} nodes as inactive because they are not in the graph`);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err('$setNodesInactive() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new NodesApi();
|
||||
|
||||
@@ -9,10 +9,10 @@ class NodesRoutes {
|
||||
public initRoutes(app: Application) {
|
||||
app
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/country/:country', this.$getNodesPerCountry)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp/:isp', this.$getNodesPerISP)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/search/:search', this.$searchNode)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/top', this.$getTopNodes)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp', this.$getNodesISP)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp-ranking', this.$getISPRanking)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp/:isp', this.$getNodesPerISP)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/countries', this.$getNodesCountries)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key/statistics', this.$getHistoricalNodeStats)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key', this.$getNode)
|
||||
@@ -35,6 +35,9 @@ class NodesRoutes {
|
||||
res.status(404).send('Node not found');
|
||||
return;
|
||||
}
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
|
||||
res.json(node);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
@@ -44,6 +47,9 @@ class NodesRoutes {
|
||||
private async $getHistoricalNodeStats(req: Request, res: Response) {
|
||||
try {
|
||||
const statistics = await nodesApi.$getNodeStats(req.params.public_key);
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
|
||||
res.json(statistics);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
@@ -63,9 +69,18 @@ class NodesRoutes {
|
||||
}
|
||||
}
|
||||
|
||||
private async $getNodesISP(req: Request, res: Response) {
|
||||
private async $getISPRanking(req: Request, res: Response): Promise<void> {
|
||||
try {
|
||||
const nodesPerAs = await nodesApi.$getNodesISP();
|
||||
const groupBy = req.query.groupBy as string;
|
||||
const showTor = req.query.showTor as string === 'true' ? true : false;
|
||||
|
||||
if (!['capacity', 'node-count'].includes(groupBy)) {
|
||||
res.status(400).send(`groupBy must be one of 'capacity' or 'node-count'`);
|
||||
return;
|
||||
}
|
||||
|
||||
const nodesPerAs = await nodesApi.$getNodesISP(groupBy, showTor);
|
||||
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 600).toUTCString());
|
||||
|
||||
@@ -6,14 +6,14 @@ class StatisticsApi {
|
||||
public async $getStatistics(interval: string | null = null): Promise<any> {
|
||||
interval = Common.getSqlInterval(interval);
|
||||
|
||||
let query = `SELECT UNIX_TIMESTAMP(added) AS added, channel_count, node_count, total_capacity, tor_nodes, clearnet_nodes, unannounced_nodes
|
||||
let query = `SELECT UNIX_TIMESTAMP(added) AS added, channel_count, total_capacity, tor_nodes, clearnet_nodes, unannounced_nodes
|
||||
FROM lightning_stats`;
|
||||
|
||||
if (interval) {
|
||||
query += ` WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW()`;
|
||||
}
|
||||
|
||||
query += ` ORDER BY id DESC`;
|
||||
query += ` ORDER BY added DESC`;
|
||||
|
||||
try {
|
||||
const [rows]: any = await DB.query(query);
|
||||
@@ -26,8 +26,8 @@ class StatisticsApi {
|
||||
|
||||
public async $getLatestStatistics(): Promise<any> {
|
||||
try {
|
||||
const [rows]: any = await DB.query(`SELECT * FROM lightning_stats ORDER BY id DESC LIMIT 1`);
|
||||
const [rows2]: any = await DB.query(`SELECT * FROM lightning_stats ORDER BY id DESC LIMIT 1 OFFSET 7`);
|
||||
const [rows]: any = await DB.query(`SELECT * FROM lightning_stats ORDER BY added DESC LIMIT 1`);
|
||||
const [rows2]: any = await DB.query(`SELECT * FROM lightning_stats ORDER BY added DESC LIMIT 1 OFFSET 7`);
|
||||
return {
|
||||
latest: rows[0],
|
||||
previous: rows2[0],
|
||||
|
||||
272
backend/src/api/lightning/clightning/clightning-client.ts
Normal file
272
backend/src/api/lightning/clightning/clightning-client.ts
Normal file
@@ -0,0 +1,272 @@
|
||||
// Imported from https://github.com/shesek/lightning-client-js
|
||||
|
||||
'use strict';
|
||||
|
||||
const methods = [
|
||||
'addgossip',
|
||||
'autocleaninvoice',
|
||||
'check',
|
||||
'checkmessage',
|
||||
'close',
|
||||
'connect',
|
||||
'createinvoice',
|
||||
'createinvoicerequest',
|
||||
'createoffer',
|
||||
'createonion',
|
||||
'decode',
|
||||
'decodepay',
|
||||
'delexpiredinvoice',
|
||||
'delinvoice',
|
||||
'delpay',
|
||||
'dev-listaddrs',
|
||||
'dev-rescan-outputs',
|
||||
'disableoffer',
|
||||
'disconnect',
|
||||
'estimatefees',
|
||||
'feerates',
|
||||
'fetchinvoice',
|
||||
'fundchannel',
|
||||
'fundchannel_cancel',
|
||||
'fundchannel_complete',
|
||||
'fundchannel_start',
|
||||
'fundpsbt',
|
||||
'getchaininfo',
|
||||
'getinfo',
|
||||
'getlog',
|
||||
'getrawblockbyheight',
|
||||
'getroute',
|
||||
'getsharedsecret',
|
||||
'getutxout',
|
||||
'help',
|
||||
'invoice',
|
||||
'keysend',
|
||||
'legacypay',
|
||||
'listchannels',
|
||||
'listconfigs',
|
||||
'listforwards',
|
||||
'listfunds',
|
||||
'listinvoices',
|
||||
'listnodes',
|
||||
'listoffers',
|
||||
'listpays',
|
||||
'listpeers',
|
||||
'listsendpays',
|
||||
'listtransactions',
|
||||
'multifundchannel',
|
||||
'multiwithdraw',
|
||||
'newaddr',
|
||||
'notifications',
|
||||
'offer',
|
||||
'offerout',
|
||||
'openchannel_abort',
|
||||
'openchannel_bump',
|
||||
'openchannel_init',
|
||||
'openchannel_signed',
|
||||
'openchannel_update',
|
||||
'pay',
|
||||
'payersign',
|
||||
'paystatus',
|
||||
'ping',
|
||||
'plugin',
|
||||
'reserveinputs',
|
||||
'sendinvoice',
|
||||
'sendonion',
|
||||
'sendonionmessage',
|
||||
'sendpay',
|
||||
'sendpsbt',
|
||||
'sendrawtransaction',
|
||||
'setchannelfee',
|
||||
'signmessage',
|
||||
'signpsbt',
|
||||
'stop',
|
||||
'txdiscard',
|
||||
'txprepare',
|
||||
'txsend',
|
||||
'unreserveinputs',
|
||||
'utxopsbt',
|
||||
'waitanyinvoice',
|
||||
'waitblockheight',
|
||||
'waitinvoice',
|
||||
'waitsendpay',
|
||||
'withdraw'
|
||||
];
|
||||
|
||||
|
||||
import EventEmitter from 'events';
|
||||
import { existsSync, statSync } from 'fs';
|
||||
import { createConnection, Socket } from 'net';
|
||||
import { homedir } from 'os';
|
||||
import path from 'path';
|
||||
import { createInterface, Interface } from 'readline';
|
||||
import logger from '../../../logger';
|
||||
import { AbstractLightningApi } from '../lightning-api-abstract-factory';
|
||||
import { ILightningApi } from '../lightning-api.interface';
|
||||
import { convertAndmergeBidirectionalChannels, convertNode } from './clightning-convert';
|
||||
|
||||
class LightningError extends Error {
|
||||
type: string = 'lightning';
|
||||
message: string = 'lightning-client error';
|
||||
|
||||
constructor(error) {
|
||||
super();
|
||||
this.type = error.type;
|
||||
this.message = error.message;
|
||||
}
|
||||
}
|
||||
|
||||
const defaultRpcPath = path.join(homedir(), '.lightning')
|
||||
, fStat = (...p) => statSync(path.join(...p))
|
||||
, fExists = (...p) => existsSync(path.join(...p))
|
||||
|
||||
export default class CLightningClient extends EventEmitter implements AbstractLightningApi {
|
||||
private rpcPath: string;
|
||||
private reconnectWait: number;
|
||||
private reconnectTimeout;
|
||||
private reqcount: number;
|
||||
private client: Socket;
|
||||
private rl: Interface;
|
||||
private clientConnectionPromise: Promise<unknown>;
|
||||
|
||||
constructor(rpcPath = defaultRpcPath) {
|
||||
if (!path.isAbsolute(rpcPath)) {
|
||||
throw new Error('The rpcPath must be an absolute path');
|
||||
}
|
||||
|
||||
if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) {
|
||||
// network directory provided, use the lightning-rpc within in
|
||||
if (fExists(rpcPath, 'lightning-rpc')) {
|
||||
rpcPath = path.join(rpcPath, 'lightning-rpc');
|
||||
}
|
||||
|
||||
// main data directory provided, default to using the bitcoin mainnet subdirectory
|
||||
// to be removed in v0.2.0
|
||||
else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) {
|
||||
logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`)
|
||||
logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`)
|
||||
rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc')
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(`[CLightningClient] Connecting to ${rpcPath}`);
|
||||
|
||||
super();
|
||||
this.rpcPath = rpcPath;
|
||||
this.reconnectWait = 0.5;
|
||||
this.reconnectTimeout = null;
|
||||
this.reqcount = 0;
|
||||
|
||||
const _self = this;
|
||||
|
||||
this.client = createConnection(rpcPath).on(
|
||||
'error', () => {
|
||||
_self.increaseWaitTime();
|
||||
_self.reconnect();
|
||||
}
|
||||
);
|
||||
this.rl = createInterface({ input: this.client }).on(
|
||||
'error', () => {
|
||||
_self.increaseWaitTime();
|
||||
_self.reconnect();
|
||||
}
|
||||
);
|
||||
|
||||
this.clientConnectionPromise = new Promise<void>(resolve => {
|
||||
_self.client.on('connect', () => {
|
||||
logger.info(`[CLightningClient] Lightning client connected`);
|
||||
_self.reconnectWait = 1;
|
||||
resolve();
|
||||
});
|
||||
|
||||
_self.client.on('end', () => {
|
||||
logger.err('[CLightningClient] Lightning client connection closed, reconnecting');
|
||||
_self.increaseWaitTime();
|
||||
_self.reconnect();
|
||||
});
|
||||
|
||||
_self.client.on('error', error => {
|
||||
logger.err(`[CLightningClient] Lightning client connection error: ${error}`);
|
||||
_self.increaseWaitTime();
|
||||
_self.reconnect();
|
||||
});
|
||||
});
|
||||
|
||||
this.rl.on('line', line => {
|
||||
line = line.trim();
|
||||
if (!line) {
|
||||
return;
|
||||
}
|
||||
const data = JSON.parse(line);
|
||||
// logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`);
|
||||
_self.emit('res:' + data.id, data);
|
||||
});
|
||||
}
|
||||
|
||||
increaseWaitTime(): void {
|
||||
if (this.reconnectWait >= 16) {
|
||||
this.reconnectWait = 16;
|
||||
} else {
|
||||
this.reconnectWait *= 2;
|
||||
}
|
||||
}
|
||||
|
||||
reconnect(): void {
|
||||
const _self = this;
|
||||
|
||||
if (this.reconnectTimeout) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.reconnectTimeout = setTimeout(() => {
|
||||
logger.debug('[CLightningClient] Trying to reconnect...');
|
||||
|
||||
_self.client.connect(_self.rpcPath);
|
||||
_self.reconnectTimeout = null;
|
||||
}, this.reconnectWait * 1000);
|
||||
}
|
||||
|
||||
call(method, args = []): Promise<any> {
|
||||
const _self = this;
|
||||
|
||||
const callInt = ++this.reqcount;
|
||||
const sendObj = {
|
||||
jsonrpc: '2.0',
|
||||
method,
|
||||
params: args,
|
||||
id: '' + callInt
|
||||
};
|
||||
|
||||
// logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`);
|
||||
|
||||
// Wait for the client to connect
|
||||
return this.clientConnectionPromise
|
||||
.then(() => new Promise((resolve, reject) => {
|
||||
// Wait for a response
|
||||
this.once('res:' + callInt, res => res.error == null
|
||||
? resolve(res.result)
|
||||
: reject(new LightningError(res.error))
|
||||
);
|
||||
|
||||
// Send the command
|
||||
_self.client.write(JSON.stringify(sendObj));
|
||||
}));
|
||||
}
|
||||
|
||||
async $getNetworkGraph(): Promise<ILightningApi.NetworkGraph> {
|
||||
const listnodes: any[] = await this.call('listnodes');
|
||||
const listchannels: any[] = await this.call('listchannels');
|
||||
const channelsList = await convertAndmergeBidirectionalChannels(listchannels['channels']);
|
||||
|
||||
return {
|
||||
nodes: listnodes['nodes'].map(node => convertNode(node)),
|
||||
edges: channelsList,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase());
|
||||
|
||||
methods.forEach(k => {
|
||||
CLightningClient.prototype[protify(k)] = function (...args: any) {
|
||||
return this.call(k, args);
|
||||
};
|
||||
});
|
||||
138
backend/src/api/lightning/clightning/clightning-convert.ts
Normal file
138
backend/src/api/lightning/clightning/clightning-convert.ts
Normal file
@@ -0,0 +1,138 @@
|
||||
import { ILightningApi } from '../lightning-api.interface';
|
||||
import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher';
|
||||
import logger from '../../../logger';
|
||||
|
||||
/**
|
||||
* Convert a clightning "listnode" entry to a lnd node entry
|
||||
*/
|
||||
export function convertNode(clNode: any): ILightningApi.Node {
|
||||
return {
|
||||
alias: clNode.alias ?? '',
|
||||
color: `#${clNode.color ?? ''}`,
|
||||
features: [], // TODO parse and return clNode.feature
|
||||
pub_key: clNode.nodeid,
|
||||
addresses: clNode.addresses?.map((addr) => {
|
||||
return {
|
||||
network: addr.type,
|
||||
addr: `${addr.address}:${addr.port}`
|
||||
};
|
||||
}),
|
||||
last_update: clNode?.last_timestamp ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert clightning "listchannels" response to lnd "describegraph.edges" format
|
||||
*/
|
||||
export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> {
|
||||
logger.info('Converting clightning nodes and channels to lnd graph format');
|
||||
|
||||
let loggerTimer = new Date().getTime() / 1000;
|
||||
let channelProcessed = 0;
|
||||
|
||||
const consolidatedChannelList: ILightningApi.Channel[] = [];
|
||||
const clChannelsDict = {};
|
||||
const clChannelsDictCount = {};
|
||||
|
||||
for (const clChannel of clChannels) {
|
||||
if (!clChannelsDict[clChannel.short_channel_id]) {
|
||||
clChannelsDict[clChannel.short_channel_id] = clChannel;
|
||||
clChannelsDictCount[clChannel.short_channel_id] = 1;
|
||||
} else {
|
||||
consolidatedChannelList.push(
|
||||
await buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id])
|
||||
);
|
||||
delete clChannelsDict[clChannel.short_channel_id];
|
||||
clChannelsDictCount[clChannel.short_channel_id]++;
|
||||
}
|
||||
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Building complete channels from clightning output. Channels processed: ${channelProcessed + 1} of ${clChannels.length}`);
|
||||
loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
|
||||
++channelProcessed;
|
||||
}
|
||||
|
||||
channelProcessed = 0;
|
||||
const keys = Object.keys(clChannelsDict);
|
||||
for (const short_channel_id of keys) {
|
||||
consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id]));
|
||||
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Building partial channels from clightning output. Channels processed: ${channelProcessed + 1} of ${keys.length}`);
|
||||
loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
|
||||
return consolidatedChannelList;
|
||||
}
|
||||
|
||||
export function convertChannelId(channelId): string {
|
||||
if (channelId.indexOf('/') !== -1) {
|
||||
channelId = channelId.slice(0, -2);
|
||||
}
|
||||
const s = channelId.split('x').map(part => BigInt(part));
|
||||
return ((s[0] << 40n) | (s[1] << 16n) | s[2]).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert two clightning "getchannels" entries into a full a lnd "describegraph.edges" format
|
||||
* In this case, clightning knows the channel policy for both nodes
|
||||
*/
|
||||
async function buildFullChannel(clChannelA: any, clChannelB: any): Promise<ILightningApi.Channel> {
|
||||
const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0);
|
||||
|
||||
const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannelA.short_channel_id);
|
||||
const parts = clChannelA.short_channel_id.split('x');
|
||||
const outputIdx = parts[2];
|
||||
|
||||
return {
|
||||
channel_id: clChannelA.short_channel_id,
|
||||
capacity: clChannelA.satoshis,
|
||||
last_update: lastUpdate,
|
||||
node1_policy: convertPolicy(clChannelA),
|
||||
node2_policy: convertPolicy(clChannelB),
|
||||
chan_point: `${tx.txid}:${outputIdx}`,
|
||||
node1_pub: clChannelA.source,
|
||||
node2_pub: clChannelB.source,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format
|
||||
* In this case, clightning knows the channel policy of only one node
|
||||
*/
|
||||
async function buildIncompleteChannel(clChannel: any): Promise<ILightningApi.Channel> {
|
||||
const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id);
|
||||
const parts = clChannel.short_channel_id.split('x');
|
||||
const outputIdx = parts[2];
|
||||
|
||||
return {
|
||||
channel_id: clChannel.short_channel_id,
|
||||
capacity: clChannel.satoshis,
|
||||
last_update: clChannel.last_update ?? 0,
|
||||
node1_policy: convertPolicy(clChannel),
|
||||
node2_policy: null,
|
||||
chan_point: `${tx.txid}:${outputIdx}`,
|
||||
node1_pub: clChannel.source,
|
||||
node2_pub: clChannel.destination,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a clightning "listnode" response to a lnd channel policy format
|
||||
*/
|
||||
function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy {
|
||||
return {
|
||||
time_lock_delta: 0, // TODO
|
||||
min_htlc: clChannel.htlc_minimum_msat.slice(0, -4),
|
||||
max_htlc_msat: clChannel.htlc_maximum_msat.slice(0, -4),
|
||||
fee_base_msat: clChannel.base_fee_millisatoshi,
|
||||
fee_rate_milli_msat: clChannel.fee_per_millionth,
|
||||
disabled: !clChannel.active,
|
||||
last_update: clChannel.last_update ?? 0,
|
||||
};
|
||||
}
|
||||
@@ -1,7 +1,5 @@
|
||||
import { ILightningApi } from './lightning-api.interface';
|
||||
|
||||
export interface AbstractLightningApi {
|
||||
$getNetworkInfo(): Promise<ILightningApi.NetworkInfo>;
|
||||
$getNetworkGraph(): Promise<ILightningApi.NetworkGraph>;
|
||||
$getInfo(): Promise<ILightningApi.Info>;
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
import config from '../../config';
|
||||
import CLightningClient from './clightning/clightning-client';
|
||||
import { AbstractLightningApi } from './lightning-api-abstract-factory';
|
||||
import LndApi from './lnd/lnd-api';
|
||||
|
||||
function lightningApiFactory(): AbstractLightningApi {
|
||||
switch (config.LIGHTNING.BACKEND) {
|
||||
switch (config.LIGHTNING.ENABLED === true && config.LIGHTNING.BACKEND) {
|
||||
case 'cln':
|
||||
return new CLightningClient(config.CLIGHTNING.SOCKET);
|
||||
case 'lnd':
|
||||
default:
|
||||
return new LndApi();
|
||||
|
||||
@@ -1,71 +1,85 @@
|
||||
export namespace ILightningApi {
|
||||
export interface NetworkInfo {
|
||||
average_channel_size: number;
|
||||
channel_count: number;
|
||||
max_channel_size: number;
|
||||
median_channel_size: number;
|
||||
min_channel_size: number;
|
||||
node_count: number;
|
||||
not_recently_updated_policy_count: number;
|
||||
total_capacity: number;
|
||||
graph_diameter: number;
|
||||
avg_out_degree: number;
|
||||
max_out_degree: number;
|
||||
num_nodes: number;
|
||||
num_channels: number;
|
||||
total_network_capacity: string;
|
||||
avg_channel_size: number;
|
||||
min_channel_size: string;
|
||||
max_channel_size: string;
|
||||
median_channel_size_sat: string;
|
||||
num_zombie_chans: string;
|
||||
}
|
||||
|
||||
export interface NetworkGraph {
|
||||
channels: Channel[];
|
||||
nodes: Node[];
|
||||
edges: Channel[];
|
||||
}
|
||||
|
||||
export interface Channel {
|
||||
id: string;
|
||||
capacity: number;
|
||||
policies: Policy[];
|
||||
transaction_id: string;
|
||||
transaction_vout: number;
|
||||
updated_at?: string;
|
||||
channel_id: string;
|
||||
chan_point: string;
|
||||
last_update: number;
|
||||
node1_pub: string;
|
||||
node2_pub: string;
|
||||
capacity: string;
|
||||
node1_policy: RoutingPolicy | null;
|
||||
node2_policy: RoutingPolicy | null;
|
||||
}
|
||||
|
||||
interface Policy {
|
||||
public_key: string;
|
||||
base_fee_mtokens?: string;
|
||||
cltv_delta?: number;
|
||||
fee_rate?: number;
|
||||
is_disabled?: boolean;
|
||||
max_htlc_mtokens?: string;
|
||||
min_htlc_mtokens?: string;
|
||||
updated_at?: string;
|
||||
export interface RoutingPolicy {
|
||||
time_lock_delta: number;
|
||||
min_htlc: string;
|
||||
fee_base_msat: string;
|
||||
fee_rate_milli_msat: string;
|
||||
disabled: boolean;
|
||||
max_htlc_msat: string;
|
||||
last_update: number;
|
||||
}
|
||||
|
||||
export interface Node {
|
||||
last_update: number;
|
||||
pub_key: string;
|
||||
alias: string;
|
||||
addresses: {
|
||||
network: string;
|
||||
addr: string;
|
||||
}[];
|
||||
color: string;
|
||||
features: Feature[];
|
||||
public_key: string;
|
||||
sockets: string[];
|
||||
updated_at?: string;
|
||||
features: { [key: number]: Feature };
|
||||
}
|
||||
|
||||
export interface Info {
|
||||
chains: string[];
|
||||
color: string;
|
||||
active_channels_count: number;
|
||||
identity_pubkey: string;
|
||||
alias: string;
|
||||
current_block_hash: string;
|
||||
current_block_height: number;
|
||||
features: Feature[];
|
||||
is_synced_to_chain: boolean;
|
||||
is_synced_to_graph: boolean;
|
||||
latest_block_at: string;
|
||||
peers_count: number;
|
||||
pending_channels_count: number;
|
||||
public_key: string;
|
||||
uris: any[];
|
||||
num_pending_channels: number;
|
||||
num_active_channels: number;
|
||||
num_peers: number;
|
||||
block_height: number;
|
||||
block_hash: string;
|
||||
synced_to_chain: boolean;
|
||||
testnet: boolean;
|
||||
uris: string[];
|
||||
best_header_timestamp: string;
|
||||
version: string;
|
||||
num_inactive_channels: number;
|
||||
chains: {
|
||||
chain: string;
|
||||
network: string;
|
||||
}[];
|
||||
color: string;
|
||||
synced_to_graph: boolean;
|
||||
features: { [key: number]: Feature };
|
||||
commit_hash: string;
|
||||
/** Available on LND since v0.15.0-beta */
|
||||
require_htlc_interceptor?: boolean;
|
||||
}
|
||||
|
||||
|
||||
export interface Feature {
|
||||
bit: number;
|
||||
is_known: boolean;
|
||||
name: string;
|
||||
is_required: boolean;
|
||||
type?: string;
|
||||
is_known: boolean;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,44 +1,40 @@
|
||||
import axios, { AxiosRequestConfig } from 'axios';
|
||||
import { Agent } from 'https';
|
||||
import * as fs from 'fs';
|
||||
import { AbstractLightningApi } from '../lightning-api-abstract-factory';
|
||||
import { ILightningApi } from '../lightning-api.interface';
|
||||
import * as fs from 'fs';
|
||||
import { authenticatedLndGrpc, getWalletInfo, getNetworkGraph, getNetworkInfo } from 'lightning';
|
||||
import config from '../../../config';
|
||||
import logger from '../../../logger';
|
||||
|
||||
class LndApi implements AbstractLightningApi {
|
||||
private lnd: any;
|
||||
axiosConfig: AxiosRequestConfig = {};
|
||||
|
||||
constructor() {
|
||||
if (!config.LIGHTNING.ENABLED) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const tls = fs.readFileSync(config.LND.TLS_CERT_PATH).toString('base64');
|
||||
const macaroon = fs.readFileSync(config.LND.MACAROON_PATH).toString('base64');
|
||||
|
||||
const { lnd } = authenticatedLndGrpc({
|
||||
cert: tls,
|
||||
macaroon: macaroon,
|
||||
socket: config.LND.SOCKET,
|
||||
});
|
||||
|
||||
this.lnd = lnd;
|
||||
} catch (e) {
|
||||
logger.err('Could not initiate the LND service handler: ' + (e instanceof Error ? e.message : e));
|
||||
process.exit(1);
|
||||
if (config.LIGHTNING.ENABLED) {
|
||||
this.axiosConfig = {
|
||||
headers: {
|
||||
'Grpc-Metadata-macaroon': fs.readFileSync(config.LND.MACAROON_PATH).toString('hex')
|
||||
},
|
||||
httpsAgent: new Agent({
|
||||
ca: fs.readFileSync(config.LND.TLS_CERT_PATH)
|
||||
}),
|
||||
timeout: 10000
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async $getNetworkInfo(): Promise<ILightningApi.NetworkInfo> {
|
||||
return await getNetworkInfo({ lnd: this.lnd });
|
||||
return axios.get<ILightningApi.NetworkInfo>(config.LND.REST_API_URL + '/v1/graph/info', this.axiosConfig)
|
||||
.then((response) => response.data);
|
||||
}
|
||||
|
||||
async $getInfo(): Promise<ILightningApi.Info> {
|
||||
// @ts-ignore
|
||||
return await getWalletInfo({ lnd: this.lnd });
|
||||
return axios.get<ILightningApi.Info>(config.LND.REST_API_URL + '/v1/getinfo', this.axiosConfig)
|
||||
.then((response) => response.data);
|
||||
}
|
||||
|
||||
async $getNetworkGraph(): Promise<ILightningApi.NetworkGraph> {
|
||||
return await getNetworkGraph({ lnd: this.lnd });
|
||||
return axios.get<ILightningApi.NetworkGraph>(config.LND.REST_API_URL + '/v1/graph', this.axiosConfig)
|
||||
.then((response) => response.data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,10 +31,16 @@ interface IConfig {
|
||||
LIGHTNING: {
|
||||
ENABLED: boolean;
|
||||
BACKEND: 'lnd' | 'cln' | 'ldk';
|
||||
TOPOLOGY_FOLDER: string;
|
||||
STATS_REFRESH_INTERVAL: number;
|
||||
GRAPH_REFRESH_INTERVAL: number;
|
||||
};
|
||||
LND: {
|
||||
TLS_CERT_PATH: string;
|
||||
MACAROON_PATH: string;
|
||||
REST_API_URL: string;
|
||||
};
|
||||
CLIGHTNING: {
|
||||
SOCKET: string;
|
||||
};
|
||||
ELECTRUM: {
|
||||
@@ -102,6 +108,7 @@ interface IConfig {
|
||||
ENABLED: boolean;
|
||||
GEOLITE2_CITY: string;
|
||||
GEOLITE2_ASN: string;
|
||||
GEOIP2_ISP: string;
|
||||
},
|
||||
}
|
||||
|
||||
@@ -176,12 +183,18 @@ const defaults: IConfig = {
|
||||
},
|
||||
'LIGHTNING': {
|
||||
'ENABLED': false,
|
||||
'BACKEND': 'lnd'
|
||||
'BACKEND': 'lnd',
|
||||
'TOPOLOGY_FOLDER': '',
|
||||
'STATS_REFRESH_INTERVAL': 600,
|
||||
'GRAPH_REFRESH_INTERVAL': 600,
|
||||
},
|
||||
'LND': {
|
||||
'TLS_CERT_PATH': '',
|
||||
'MACAROON_PATH': '',
|
||||
'SOCKET': 'localhost:10009',
|
||||
'REST_API_URL': 'https://localhost:8080',
|
||||
},
|
||||
'CLIGHTNING': {
|
||||
'SOCKET': '',
|
||||
},
|
||||
'SOCKS5PROXY': {
|
||||
'ENABLED': false,
|
||||
@@ -206,7 +219,8 @@ const defaults: IConfig = {
|
||||
"MAXMIND": {
|
||||
'ENABLED': false,
|
||||
"GEOLITE2_CITY": "/usr/local/share/GeoIP/GeoLite2-City.mmdb",
|
||||
"GEOLITE2_ASN": "/usr/local/share/GeoIP/GeoLite2-ASN.mmdb"
|
||||
"GEOLITE2_ASN": "/usr/local/share/GeoIP/GeoLite2-ASN.mmdb",
|
||||
"GEOIP2_ISP": "/usr/local/share/GeoIP/GeoIP2-ISP.mmdb"
|
||||
},
|
||||
};
|
||||
|
||||
@@ -222,6 +236,7 @@ class Config implements IConfig {
|
||||
BISQ: IConfig['BISQ'];
|
||||
LIGHTNING: IConfig['LIGHTNING'];
|
||||
LND: IConfig['LND'];
|
||||
CLIGHTNING: IConfig['CLIGHTNING'];
|
||||
SOCKS5PROXY: IConfig['SOCKS5PROXY'];
|
||||
PRICE_DATA_SERVER: IConfig['PRICE_DATA_SERVER'];
|
||||
EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER'];
|
||||
@@ -240,6 +255,7 @@ class Config implements IConfig {
|
||||
this.BISQ = configs.BISQ;
|
||||
this.LIGHTNING = configs.LIGHTNING;
|
||||
this.LND = configs.LND;
|
||||
this.CLIGHTNING = configs.CLIGHTNING;
|
||||
this.SOCKS5PROXY = configs.SOCKS5PROXY;
|
||||
this.PRICE_DATA_SERVER = configs.PRICE_DATA_SERVER;
|
||||
this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import config from './config';
|
||||
import { createPool, Pool, PoolConnection } from 'mysql2/promise';
|
||||
import logger from './logger';
|
||||
import { PoolOptions } from 'mysql2/typings/mysql';
|
||||
import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } from 'mysql2/typings/mysql';
|
||||
|
||||
class DB {
|
||||
constructor() {
|
||||
@@ -28,7 +28,9 @@ import { PoolOptions } from 'mysql2/typings/mysql';
|
||||
}
|
||||
}
|
||||
|
||||
public async query(query, params?) {
|
||||
public async query<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
|
||||
OkPacket[] | ResultSetHeader>(query, params?): Promise<[T, FieldPacket[]]>
|
||||
{
|
||||
this.checkDBFlag();
|
||||
const pool = await this.getPool();
|
||||
return pool.query(query, params);
|
||||
|
||||
@@ -28,12 +28,13 @@ import nodesRoutes from './api/explorer/nodes.routes';
|
||||
import channelsRoutes from './api/explorer/channels.routes';
|
||||
import generalLightningRoutes from './api/explorer/general.routes';
|
||||
import lightningStatsUpdater from './tasks/lightning/stats-updater.service';
|
||||
import nodeSyncService from './tasks/lightning/node-sync.service';
|
||||
import statisticsRoutes from "./api/statistics/statistics.routes";
|
||||
import miningRoutes from "./api/mining/mining-routes";
|
||||
import bisqRoutes from "./api/bisq/bisq.routes";
|
||||
import liquidRoutes from "./api/liquid/liquid.routes";
|
||||
import bitcoinRoutes from "./api/bitcoin/bitcoin.routes";
|
||||
import networkSyncService from './tasks/lightning/network-sync.service';
|
||||
import statisticsRoutes from './api/statistics/statistics.routes';
|
||||
import miningRoutes from './api/mining/mining-routes';
|
||||
import bisqRoutes from './api/bisq/bisq.routes';
|
||||
import liquidRoutes from './api/liquid/liquid.routes';
|
||||
import bitcoinRoutes from './api/bitcoin/bitcoin.routes';
|
||||
import fundingTxFetcher from "./tasks/lightning/sync-tasks/funding-tx-fetcher";
|
||||
|
||||
class Server {
|
||||
private wss: WebSocket.Server | undefined;
|
||||
@@ -136,8 +137,7 @@ class Server {
|
||||
}
|
||||
|
||||
if (config.LIGHTNING.ENABLED) {
|
||||
nodeSyncService.$startService()
|
||||
.then(() => lightningStatsUpdater.$startService());
|
||||
this.$runLightningBackend();
|
||||
}
|
||||
|
||||
this.server.listen(config.MEMPOOL.HTTP_PORT, () => {
|
||||
@@ -183,6 +183,18 @@ class Server {
|
||||
}
|
||||
}
|
||||
|
||||
async $runLightningBackend() {
|
||||
try {
|
||||
await fundingTxFetcher.$init();
|
||||
await networkSyncService.$startService();
|
||||
await lightningStatsUpdater.$startService();
|
||||
} catch(e) {
|
||||
logger.err(`Lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||
await Common.sleep$(1000 * 60);
|
||||
this.$runLightningBackend();
|
||||
};
|
||||
}
|
||||
|
||||
setUpWebsocketHandling() {
|
||||
if (this.wss) {
|
||||
websocketHandler.setWebsocketServer(this.wss);
|
||||
|
||||
@@ -1,51 +1,43 @@
|
||||
import { chanNumber } from 'bolt07';
|
||||
import DB from '../../database';
|
||||
import logger from '../../logger';
|
||||
import channelsApi from '../../api/explorer/channels.api';
|
||||
import bitcoinClient from '../../api/bitcoin/bitcoin-client';
|
||||
import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory';
|
||||
import config from '../../config';
|
||||
import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface';
|
||||
import lightningApi from '../../api/lightning/lightning-api-factory';
|
||||
import { ILightningApi } from '../../api/lightning/lightning-api.interface';
|
||||
import { $lookupNodeLocation } from './sync-tasks/node-locations';
|
||||
import lightningApi from '../../api/lightning/lightning-api-factory';
|
||||
import nodesApi from '../../api/explorer/nodes.api';
|
||||
import { ResultSetHeader } from 'mysql2';
|
||||
import fundingTxFetcher from './sync-tasks/funding-tx-fetcher';
|
||||
|
||||
class NetworkSyncService {
|
||||
loggerTimer = 0;
|
||||
|
||||
class NodeSyncService {
|
||||
constructor() {}
|
||||
|
||||
public async $startService() {
|
||||
logger.info('Starting node sync service');
|
||||
public async $startService(): Promise<void> {
|
||||
logger.info('Starting lightning network sync service');
|
||||
|
||||
await this.$runUpdater();
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
|
||||
setInterval(async () => {
|
||||
await this.$runUpdater();
|
||||
}, 1000 * 60 * 60);
|
||||
await this.$runTasks();
|
||||
}
|
||||
|
||||
private async $runUpdater() {
|
||||
private async $runTasks(): Promise<void> {
|
||||
try {
|
||||
logger.info(`Updating nodes and channels...`);
|
||||
logger.info(`Updating nodes and channels`);
|
||||
|
||||
const networkGraph = await lightningApi.$getNetworkGraph();
|
||||
|
||||
for (const node of networkGraph.nodes) {
|
||||
await this.$saveNode(node);
|
||||
}
|
||||
logger.info(`Nodes updated.`);
|
||||
|
||||
if (config.MAXMIND.ENABLED) {
|
||||
await $lookupNodeLocation();
|
||||
if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) {
|
||||
logger.info(`LN Network graph is empty, retrying in 10 seconds`);
|
||||
setTimeout(() => { this.$runTasks(); }, 10000);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.$setChannelsInactive();
|
||||
|
||||
for (const channel of networkGraph.channels) {
|
||||
await this.$saveChannel(channel);
|
||||
}
|
||||
logger.info(`Channels updated.`);
|
||||
|
||||
await this.$findInactiveNodesAndChannels();
|
||||
await this.$updateNodesList(networkGraph.nodes);
|
||||
await this.$updateChannelsList(networkGraph.edges);
|
||||
await this.$deactivateChannelsWithoutActiveNodes();
|
||||
await this.$lookUpCreationDateFromChain();
|
||||
await this.$updateNodeFirstSeen();
|
||||
await this.$scanForClosedChannels();
|
||||
@@ -54,70 +46,183 @@ class NodeSyncService {
|
||||
}
|
||||
|
||||
} catch (e) {
|
||||
logger.err('$updateNodes() error: ' + (e instanceof Error ? e.message : e));
|
||||
logger.err('$runTasks() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
|
||||
setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.GRAPH_REFRESH_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the `nodes` table to reflect the current network graph state
|
||||
*/
|
||||
private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> {
|
||||
let progress = 0;
|
||||
|
||||
const graphNodesPubkeys: string[] = [];
|
||||
for (const node of nodes) {
|
||||
await nodesApi.$saveNode(node);
|
||||
graphNodesPubkeys.push(node.pub_key);
|
||||
++progress;
|
||||
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating node ${progress}/${nodes.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
logger.info(`${progress} nodes updated`);
|
||||
|
||||
// If a channel if not present in the graph, mark it as inactive
|
||||
nodesApi.$setNodesInactive(graphNodesPubkeys);
|
||||
|
||||
if (config.MAXMIND.ENABLED) {
|
||||
$lookupNodeLocation();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the `channels` table to reflect the current network graph state
|
||||
*/
|
||||
private async $updateChannelsList(channels: ILightningApi.Channel[]): Promise<void> {
|
||||
try {
|
||||
let progress = 0;
|
||||
|
||||
const graphChannelsIds: string[] = [];
|
||||
for (const channel of channels) {
|
||||
await channelsApi.$saveChannel(channel);
|
||||
graphChannelsIds.push(channel.channel_id);
|
||||
++progress;
|
||||
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating channel ${progress}/${channels.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`${progress} channels updated`);
|
||||
|
||||
// If a channel if not present in the graph, mark it as inactive
|
||||
channelsApi.$setChannelsInactive(graphChannelsIds);
|
||||
} catch (e) {
|
||||
logger.err(`Cannot update channel list. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||
}
|
||||
|
||||
setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL);
|
||||
}
|
||||
|
||||
// This method look up the creation date of the earliest channel of the node
|
||||
// and update the node to that date in order to get the earliest first seen date
|
||||
private async $updateNodeFirstSeen() {
|
||||
private async $updateNodeFirstSeen(): Promise<void> {
|
||||
let progress = 0;
|
||||
let updated = 0;
|
||||
|
||||
try {
|
||||
const [nodes]: any[] = await DB.query(`SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node1_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created1, (SELECT UNIX_TIMESTAMP(created) FROM channels WHERE channels.node2_public_key = nodes.public_key ORDER BY created ASC LIMIT 1) AS created2 FROM nodes`);
|
||||
const [nodes]: any[] = await DB.query(`
|
||||
SELECT nodes.public_key, UNIX_TIMESTAMP(nodes.first_seen) AS first_seen,
|
||||
(
|
||||
SELECT MIN(UNIX_TIMESTAMP(created))
|
||||
FROM channels
|
||||
WHERE channels.node1_public_key = nodes.public_key
|
||||
) AS created1,
|
||||
(
|
||||
SELECT MIN(UNIX_TIMESTAMP(created))
|
||||
FROM channels
|
||||
WHERE channels.node2_public_key = nodes.public_key
|
||||
) AS created2
|
||||
FROM nodes
|
||||
`);
|
||||
|
||||
for (const node of nodes) {
|
||||
let lowest = 0;
|
||||
if (node.created1) {
|
||||
if (node.created2 && node.created2 < node.created1) {
|
||||
lowest = node.created2;
|
||||
} else {
|
||||
lowest = node.created1;
|
||||
}
|
||||
} else if (node.created2) {
|
||||
lowest = node.created2;
|
||||
}
|
||||
if (lowest && lowest < node.first_seen) {
|
||||
const lowest = Math.min(
|
||||
node.created1 ?? Number.MAX_SAFE_INTEGER,
|
||||
node.created2 ?? Number.MAX_SAFE_INTEGER,
|
||||
node.first_seen ?? Number.MAX_SAFE_INTEGER
|
||||
);
|
||||
if (lowest < node.first_seen) {
|
||||
const query = `UPDATE nodes SET first_seen = FROM_UNIXTIME(?) WHERE public_key = ?`;
|
||||
const params = [lowest, node.public_key];
|
||||
await DB.query(query, params);
|
||||
}
|
||||
++progress;
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating node first seen date ${progress}/${nodes.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
++updated;
|
||||
}
|
||||
}
|
||||
logger.info(`Node first seen dates scan complete.`);
|
||||
logger.info(`Updated ${updated} node first seen dates`);
|
||||
} catch (e) {
|
||||
logger.err('$updateNodeFirstSeen() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $lookUpCreationDateFromChain() {
|
||||
logger.info(`Running channel creation date lookup...`);
|
||||
private async $lookUpCreationDateFromChain(): Promise<void> {
|
||||
let progress = 0;
|
||||
|
||||
logger.info(`Running channel creation date lookup`);
|
||||
try {
|
||||
const channels = await channelsApi.$getChannelsWithoutCreatedDate();
|
||||
for (const channel of channels) {
|
||||
const transaction = await bitcoinClient.getRawTransaction(channel.transaction_id, 1);
|
||||
await DB.query(`UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`, [transaction.blocktime, channel.id]);
|
||||
const transaction = await fundingTxFetcher.$fetchChannelOpenTx(channel.short_id);
|
||||
await DB.query(`
|
||||
UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.id = ?`,
|
||||
[transaction.timestamp, channel.id]
|
||||
);
|
||||
++progress;
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating channel creation date ${progress}/${channels.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
logger.info(`Channel creation dates scan complete.`);
|
||||
logger.info(`Updated ${channels.length} channels' creation date`);
|
||||
} catch (e) {
|
||||
logger.err('$setCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e));
|
||||
logger.err('$lookUpCreationDateFromChain() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
// Looking for channels whos nodes are inactive
|
||||
private async $findInactiveNodesAndChannels(): Promise<void> {
|
||||
logger.info(`Running inactive channels scan...`);
|
||||
/**
|
||||
* If a channel does not have any active node linked to it, then also
|
||||
* mark that channel as inactive
|
||||
*/
|
||||
private async $deactivateChannelsWithoutActiveNodes(): Promise<void> {
|
||||
logger.info(`Find channels which nodes are offline`);
|
||||
|
||||
try {
|
||||
// @ts-ignore
|
||||
const [channels]: [ILightningApi.Channel[]] = await DB.query(`SELECT channels.id FROM channels WHERE channels.status = 1 AND ((SELECT COUNT(*) FROM nodes WHERE nodes.public_key = channels.node1_public_key) = 0 OR (SELECT COUNT(*) FROM nodes WHERE nodes.public_key = channels.node2_public_key) = 0)`);
|
||||
const result = await DB.query<ResultSetHeader>(`
|
||||
UPDATE channels
|
||||
SET status = 0
|
||||
WHERE channels.status = 1
|
||||
AND (
|
||||
(
|
||||
SELECT COUNT(*)
|
||||
FROM nodes
|
||||
WHERE nodes.public_key = channels.node1_public_key
|
||||
AND nodes.status = 1
|
||||
) = 0
|
||||
OR (
|
||||
SELECT COUNT(*)
|
||||
FROM nodes
|
||||
WHERE nodes.public_key = channels.node2_public_key
|
||||
AND nodes.status = 1
|
||||
) = 0)
|
||||
`);
|
||||
|
||||
for (const channel of channels) {
|
||||
await this.$updateChannelStatus(channel.id, 0);
|
||||
if (result[0].changedRows ?? 0 > 0) {
|
||||
logger.info(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`);
|
||||
} else {
|
||||
logger.debug(`Marked ${result[0].changedRows} channels as inactive because they are not linked to any active node`);
|
||||
}
|
||||
logger.info(`Inactive channels scan complete.`);
|
||||
} catch (e) {
|
||||
logger.err('$findInactiveNodesAndChannels() error: ' + (e instanceof Error ? e.message : e));
|
||||
logger.err('$deactivateChannelsWithoutActiveNodes() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $scanForClosedChannels(): Promise<void> {
|
||||
let progress = 0;
|
||||
|
||||
try {
|
||||
logger.info(`Starting closed channels scan...`);
|
||||
const channels = await channelsApi.$getChannelsByStatus(0);
|
||||
@@ -131,6 +236,13 @@ class NodeSyncService {
|
||||
await DB.query(`UPDATE channels SET closing_transaction_id = ? WHERE id = ?`, [spendingTx.txid, channel.id]);
|
||||
}
|
||||
}
|
||||
|
||||
++progress;
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Checking if channel has been closed ${progress}/${channels.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
logger.info(`Closed channels scan complete.`);
|
||||
} catch (e) {
|
||||
@@ -148,6 +260,9 @@ class NodeSyncService {
|
||||
if (!config.ESPLORA.REST_API_URL) {
|
||||
return;
|
||||
}
|
||||
|
||||
let progress = 0;
|
||||
|
||||
try {
|
||||
logger.info(`Started running closed channel forensics...`);
|
||||
const channels = await channelsApi.$getClosedChannelsWithoutReason();
|
||||
@@ -193,6 +308,13 @@ class NodeSyncService {
|
||||
logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.');
|
||||
await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]);
|
||||
}
|
||||
|
||||
++progress;
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating channel closed channel forensics ${progress}/${channels.length}`);
|
||||
this.loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
logger.info(`Closed channels forensics scan complete.`);
|
||||
} catch (e) {
|
||||
@@ -247,157 +369,6 @@ class NodeSyncService {
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
private async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
|
||||
const fromChannel = chanNumber({ channel: channel.id }).number;
|
||||
|
||||
try {
|
||||
const query = `INSERT INTO channels
|
||||
(
|
||||
id,
|
||||
short_id,
|
||||
capacity,
|
||||
transaction_id,
|
||||
transaction_vout,
|
||||
updated_at,
|
||||
status,
|
||||
node1_public_key,
|
||||
node1_base_fee_mtokens,
|
||||
node1_cltv_delta,
|
||||
node1_fee_rate,
|
||||
node1_is_disabled,
|
||||
node1_max_htlc_mtokens,
|
||||
node1_min_htlc_mtokens,
|
||||
node1_updated_at,
|
||||
node2_public_key,
|
||||
node2_base_fee_mtokens,
|
||||
node2_cltv_delta,
|
||||
node2_fee_rate,
|
||||
node2_is_disabled,
|
||||
node2_max_htlc_mtokens,
|
||||
node2_min_htlc_mtokens,
|
||||
node2_updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
capacity = ?,
|
||||
updated_at = ?,
|
||||
status = 1,
|
||||
node1_public_key = ?,
|
||||
node1_base_fee_mtokens = ?,
|
||||
node1_cltv_delta = ?,
|
||||
node1_fee_rate = ?,
|
||||
node1_is_disabled = ?,
|
||||
node1_max_htlc_mtokens = ?,
|
||||
node1_min_htlc_mtokens = ?,
|
||||
node1_updated_at = ?,
|
||||
node2_public_key = ?,
|
||||
node2_base_fee_mtokens = ?,
|
||||
node2_cltv_delta = ?,
|
||||
node2_fee_rate = ?,
|
||||
node2_is_disabled = ?,
|
||||
node2_max_htlc_mtokens = ?,
|
||||
node2_min_htlc_mtokens = ?,
|
||||
node2_updated_at = ?
|
||||
;`;
|
||||
|
||||
await DB.query(query, [
|
||||
fromChannel,
|
||||
channel.id,
|
||||
channel.capacity,
|
||||
channel.transaction_id,
|
||||
channel.transaction_vout,
|
||||
channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0,
|
||||
channel.policies[0].public_key,
|
||||
channel.policies[0].base_fee_mtokens,
|
||||
channel.policies[0].cltv_delta,
|
||||
channel.policies[0].fee_rate,
|
||||
channel.policies[0].is_disabled,
|
||||
channel.policies[0].max_htlc_mtokens,
|
||||
channel.policies[0].min_htlc_mtokens,
|
||||
channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0,
|
||||
channel.policies[1].public_key,
|
||||
channel.policies[1].base_fee_mtokens,
|
||||
channel.policies[1].cltv_delta,
|
||||
channel.policies[1].fee_rate,
|
||||
channel.policies[1].is_disabled,
|
||||
channel.policies[1].max_htlc_mtokens,
|
||||
channel.policies[1].min_htlc_mtokens,
|
||||
channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0,
|
||||
channel.capacity,
|
||||
channel.updated_at ? this.utcDateToMysql(channel.updated_at) : 0,
|
||||
channel.policies[0].public_key,
|
||||
channel.policies[0].base_fee_mtokens,
|
||||
channel.policies[0].cltv_delta,
|
||||
channel.policies[0].fee_rate,
|
||||
channel.policies[0].is_disabled,
|
||||
channel.policies[0].max_htlc_mtokens,
|
||||
channel.policies[0].min_htlc_mtokens,
|
||||
channel.policies[0].updated_at ? this.utcDateToMysql(channel.policies[0].updated_at) : 0,
|
||||
channel.policies[1].public_key,
|
||||
channel.policies[1].base_fee_mtokens,
|
||||
channel.policies[1].cltv_delta,
|
||||
channel.policies[1].fee_rate,
|
||||
channel.policies[1].is_disabled,
|
||||
channel.policies[1].max_htlc_mtokens,
|
||||
channel.policies[1].min_htlc_mtokens,
|
||||
channel.policies[1].updated_at ? this.utcDateToMysql(channel.policies[1].updated_at) : 0,
|
||||
]);
|
||||
} catch (e) {
|
||||
logger.err('$saveChannel() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $updateChannelStatus(channelShortId: string, status: number): Promise<void> {
|
||||
try {
|
||||
await DB.query(`UPDATE channels SET status = ? WHERE id = ?`, [status, channelShortId]);
|
||||
} catch (e) {
|
||||
logger.err('$updateChannelStatus() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $setChannelsInactive(): Promise<void> {
|
||||
try {
|
||||
await DB.query(`UPDATE channels SET status = 0 WHERE status = 1`);
|
||||
} catch (e) {
|
||||
logger.err('$setChannelsInactive() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $saveNode(node: ILightningApi.Node): Promise<void> {
|
||||
try {
|
||||
const updatedAt = node.updated_at ? this.utcDateToMysql(node.updated_at) : '0000-00-00 00:00:00';
|
||||
const sockets = node.sockets.join(',');
|
||||
const query = `INSERT INTO nodes(
|
||||
public_key,
|
||||
first_seen,
|
||||
updated_at,
|
||||
alias,
|
||||
color,
|
||||
sockets
|
||||
)
|
||||
VALUES (?, NOW(), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?, sockets = ?;`;
|
||||
|
||||
await DB.query(query, [
|
||||
node.public_key,
|
||||
updatedAt,
|
||||
node.alias,
|
||||
node.color,
|
||||
sockets,
|
||||
updatedAt,
|
||||
node.alias,
|
||||
node.color,
|
||||
sockets,
|
||||
]);
|
||||
} catch (e) {
|
||||
logger.err('$saveNode() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private utcDateToMysql(dateString: string): string {
|
||||
const d = new Date(Date.parse(dateString));
|
||||
return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0];
|
||||
}
|
||||
}
|
||||
|
||||
export default new NodeSyncService();
|
||||
export default new NetworkSyncService();
|
||||
@@ -1,335 +1,33 @@
|
||||
|
||||
import DB from '../../database';
|
||||
import logger from '../../logger';
|
||||
import lightningApi from '../../api/lightning/lightning-api-factory';
|
||||
import channelsApi from '../../api/explorer/channels.api';
|
||||
import * as net from 'net';
|
||||
import LightningStatsImporter from './sync-tasks/stats-importer';
|
||||
import config from '../../config';
|
||||
import { Common } from '../../api/common';
|
||||
|
||||
class LightningStatsUpdater {
|
||||
hardCodedStartTime = '2018-01-12';
|
||||
|
||||
public async $startService() {
|
||||
public async $startService(): Promise<void> {
|
||||
logger.info('Starting Lightning Stats service');
|
||||
let isInSync = false;
|
||||
let error: any;
|
||||
try {
|
||||
error = null;
|
||||
isInSync = await this.$lightningIsSynced();
|
||||
} catch (e) {
|
||||
error = e;
|
||||
}
|
||||
if (!isInSync) {
|
||||
if (error) {
|
||||
logger.warn('Was not able to fetch Lightning Node status: ' + (error instanceof Error ? error.message : error) + '. Retrying in 1 minute...');
|
||||
} else {
|
||||
logger.notice('The Lightning graph is not yet in sync. Retrying in 1 minute...');
|
||||
}
|
||||
setTimeout(() => this.$startService(), 60 * 1000);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.$populateHistoricalStatistics();
|
||||
await this.$populateHistoricalNodeStatistics();
|
||||
|
||||
setTimeout(() => {
|
||||
this.$runTasks();
|
||||
}, this.timeUntilMidnight());
|
||||
}
|
||||
|
||||
private timeUntilMidnight(): number {
|
||||
const date = new Date();
|
||||
this.setDateMidnight(date);
|
||||
date.setUTCHours(24);
|
||||
return date.getTime() - new Date().getTime();
|
||||
}
|
||||
|
||||
private setDateMidnight(date: Date): void {
|
||||
date.setUTCHours(0);
|
||||
date.setUTCMinutes(0);
|
||||
date.setUTCSeconds(0);
|
||||
date.setUTCMilliseconds(0);
|
||||
}
|
||||
|
||||
private async $lightningIsSynced(): Promise<boolean> {
|
||||
const nodeInfo = await lightningApi.$getInfo();
|
||||
return nodeInfo.is_synced_to_chain && nodeInfo.is_synced_to_graph;
|
||||
await this.$runTasks();
|
||||
LightningStatsImporter.$run();
|
||||
}
|
||||
|
||||
private async $runTasks(): Promise<void> {
|
||||
await this.$logLightningStatsDaily();
|
||||
await this.$logNodeStatsDaily();
|
||||
await this.$logStatsDaily();
|
||||
|
||||
setTimeout(() => {
|
||||
this.$runTasks();
|
||||
}, this.timeUntilMidnight());
|
||||
setTimeout(() => { this.$runTasks(); }, 1000 * config.LIGHTNING.STATS_REFRESH_INTERVAL);
|
||||
}
|
||||
|
||||
private async $logLightningStatsDaily() {
|
||||
try {
|
||||
logger.info(`Running lightning daily stats log...`);
|
||||
|
||||
const networkGraph = await lightningApi.$getNetworkGraph();
|
||||
let total_capacity = 0;
|
||||
for (const channel of networkGraph.channels) {
|
||||
if (channel.capacity) {
|
||||
total_capacity += channel.capacity;
|
||||
}
|
||||
}
|
||||
|
||||
let clearnetNodes = 0;
|
||||
let torNodes = 0;
|
||||
let unannouncedNodes = 0;
|
||||
for (const node of networkGraph.nodes) {
|
||||
let isUnnanounced = true;
|
||||
for (const socket of node.sockets) {
|
||||
const hasOnion = socket.indexOf('.onion') !== -1;
|
||||
if (hasOnion) {
|
||||
torNodes++;
|
||||
isUnnanounced = false;
|
||||
}
|
||||
const hasClearnet = [4, 6].includes(net.isIP(socket.split(':')[0]));
|
||||
if (hasClearnet) {
|
||||
clearnetNodes++;
|
||||
isUnnanounced = false;
|
||||
}
|
||||
}
|
||||
if (isUnnanounced) {
|
||||
unannouncedNodes++;
|
||||
}
|
||||
}
|
||||
|
||||
const channelStats = await channelsApi.$getChannelsStats();
|
||||
|
||||
const query = `INSERT INTO lightning_stats(
|
||||
added,
|
||||
channel_count,
|
||||
node_count,
|
||||
total_capacity,
|
||||
tor_nodes,
|
||||
clearnet_nodes,
|
||||
unannounced_nodes,
|
||||
avg_capacity,
|
||||
avg_fee_rate,
|
||||
avg_base_fee_mtokens,
|
||||
med_capacity,
|
||||
med_fee_rate,
|
||||
med_base_fee_mtokens
|
||||
)
|
||||
VALUES (NOW() - INTERVAL 1 DAY, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
|
||||
|
||||
await DB.query(query, [
|
||||
networkGraph.channels.length,
|
||||
networkGraph.nodes.length,
|
||||
total_capacity,
|
||||
torNodes,
|
||||
clearnetNodes,
|
||||
unannouncedNodes,
|
||||
channelStats.avgCapacity,
|
||||
channelStats.avgFeeRate,
|
||||
channelStats.avgBaseFee,
|
||||
channelStats.medianCapacity,
|
||||
channelStats.medianFeeRate,
|
||||
channelStats.medianBaseFee,
|
||||
]);
|
||||
logger.info(`Lightning daily stats done.`);
|
||||
} catch (e) {
|
||||
logger.err('$logLightningStatsDaily() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $logNodeStatsDaily() {
|
||||
try {
|
||||
logger.info(`Running daily node stats update...`);
|
||||
|
||||
const query = `SELECT nodes.public_key, c1.channels_count_left, c2.channels_count_right, c1.channels_capacity_left, c2.channels_capacity_right FROM nodes LEFT JOIN (SELECT node1_public_key, COUNT(id) AS channels_count_left, SUM(capacity) AS channels_capacity_left FROM channels WHERE channels.status < 2 GROUP BY node1_public_key) c1 ON c1.node1_public_key = nodes.public_key LEFT JOIN (SELECT node2_public_key, COUNT(id) AS channels_count_right, SUM(capacity) AS channels_capacity_right FROM channels WHERE channels.status < 2 GROUP BY node2_public_key) c2 ON c2.node2_public_key = nodes.public_key`;
|
||||
const [nodes]: any = await DB.query(query);
|
||||
|
||||
for (const node of nodes) {
|
||||
await DB.query(
|
||||
`INSERT INTO node_stats(public_key, added, capacity, channels) VALUES (?, NOW() - INTERVAL 1 DAY, ?, ?)`,
|
||||
[node.public_key, (parseInt(node.channels_capacity_left || 0, 10)) + (parseInt(node.channels_capacity_right || 0, 10)),
|
||||
node.channels_count_left + node.channels_count_right]);
|
||||
}
|
||||
logger.info('Daily node stats has updated.');
|
||||
} catch (e) {
|
||||
logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
// We only run this on first launch
|
||||
private async $populateHistoricalStatistics() {
|
||||
try {
|
||||
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM lightning_stats`);
|
||||
// Only run if table is empty
|
||||
if (rows[0]['COUNT(*)'] > 0) {
|
||||
return;
|
||||
}
|
||||
logger.info(`Running historical stats population...`);
|
||||
|
||||
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels ORDER BY created ASC`);
|
||||
const [nodes]: any = await DB.query(`SELECT first_seen, sockets FROM nodes ORDER BY first_seen ASC`);
|
||||
|
||||
const date: Date = new Date(this.hardCodedStartTime);
|
||||
const currentDate = new Date();
|
||||
this.setDateMidnight(currentDate);
|
||||
|
||||
while (date < currentDate) {
|
||||
let totalCapacity = 0;
|
||||
let channelsCount = 0;
|
||||
|
||||
for (const channel of channels) {
|
||||
if (new Date(channel.created) > date) {
|
||||
break;
|
||||
}
|
||||
if (channel.closing_date === null || new Date(channel.closing_date) > date) {
|
||||
totalCapacity += channel.capacity;
|
||||
channelsCount++;
|
||||
}
|
||||
}
|
||||
|
||||
let nodeCount = 0;
|
||||
let clearnetNodes = 0;
|
||||
let torNodes = 0;
|
||||
let unannouncedNodes = 0;
|
||||
|
||||
for (const node of nodes) {
|
||||
if (new Date(node.first_seen) > date) {
|
||||
break;
|
||||
}
|
||||
nodeCount++;
|
||||
|
||||
const sockets = node.sockets.split(',');
|
||||
let isUnnanounced = true;
|
||||
for (const socket of sockets) {
|
||||
const hasOnion = socket.indexOf('.onion') !== -1;
|
||||
if (hasOnion) {
|
||||
torNodes++;
|
||||
isUnnanounced = false;
|
||||
}
|
||||
const hasClearnet = [4, 6].includes(net.isIP(socket.substring(0, socket.lastIndexOf(':'))));
|
||||
if (hasClearnet) {
|
||||
clearnetNodes++;
|
||||
isUnnanounced = false;
|
||||
}
|
||||
}
|
||||
if (isUnnanounced) {
|
||||
unannouncedNodes++;
|
||||
}
|
||||
}
|
||||
|
||||
const query = `INSERT INTO lightning_stats(
|
||||
added,
|
||||
channel_count,
|
||||
node_count,
|
||||
total_capacity,
|
||||
tor_nodes,
|
||||
clearnet_nodes,
|
||||
unannounced_nodes,
|
||||
avg_capacity,
|
||||
avg_fee_rate,
|
||||
avg_base_fee_mtokens,
|
||||
med_capacity,
|
||||
med_fee_rate,
|
||||
med_base_fee_mtokens
|
||||
)
|
||||
VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
|
||||
|
||||
const rowTimestamp = date.getTime() / 1000; // Save timestamp for the row insertion down below
|
||||
|
||||
date.setUTCDate(date.getUTCDate() + 1);
|
||||
|
||||
// Last iteration, save channels stats
|
||||
const channelStats = (date >= currentDate ? await channelsApi.$getChannelsStats() : undefined);
|
||||
|
||||
await DB.query(query, [
|
||||
rowTimestamp,
|
||||
channelsCount,
|
||||
nodeCount,
|
||||
totalCapacity,
|
||||
torNodes,
|
||||
clearnetNodes,
|
||||
unannouncedNodes,
|
||||
channelStats?.avgCapacity ?? 0,
|
||||
channelStats?.avgFeeRate ?? 0,
|
||||
channelStats?.avgBaseFee ?? 0,
|
||||
channelStats?.medianCapacity ?? 0,
|
||||
channelStats?.medianFeeRate ?? 0,
|
||||
channelStats?.medianBaseFee ?? 0,
|
||||
]);
|
||||
}
|
||||
|
||||
logger.info('Historical stats populated.');
|
||||
} catch (e) {
|
||||
logger.err('$populateHistoricalData() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
private async $populateHistoricalNodeStatistics() {
|
||||
try {
|
||||
const [rows]: any = await DB.query(`SELECT COUNT(*) FROM node_stats`);
|
||||
// Only run if table is empty
|
||||
if (rows[0]['COUNT(*)'] > 0) {
|
||||
return;
|
||||
}
|
||||
logger.info(`Running historical node stats population...`);
|
||||
|
||||
const [nodes]: any = await DB.query(`SELECT public_key, first_seen, alias FROM nodes ORDER BY first_seen ASC`);
|
||||
|
||||
for (const node of nodes) {
|
||||
const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels WHERE node1_public_key = ? OR node2_public_key = ? ORDER BY created ASC`, [node.public_key, node.public_key]);
|
||||
|
||||
const date: Date = new Date(this.hardCodedStartTime);
|
||||
const currentDate = new Date();
|
||||
this.setDateMidnight(currentDate);
|
||||
|
||||
let lastTotalCapacity = 0;
|
||||
let lastChannelsCount = 0;
|
||||
|
||||
while (date < currentDate) {
|
||||
let totalCapacity = 0;
|
||||
let channelsCount = 0;
|
||||
for (const channel of channels) {
|
||||
if (new Date(channel.created) > date) {
|
||||
break;
|
||||
}
|
||||
if (channel.closing_date !== null && new Date(channel.closing_date) < date) {
|
||||
date.setUTCDate(date.getUTCDate() + 1);
|
||||
continue;
|
||||
}
|
||||
totalCapacity += channel.capacity;
|
||||
channelsCount++;
|
||||
}
|
||||
|
||||
if (lastTotalCapacity === totalCapacity && lastChannelsCount === channelsCount) {
|
||||
date.setUTCDate(date.getUTCDate() + 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
lastTotalCapacity = totalCapacity;
|
||||
lastChannelsCount = channelsCount;
|
||||
|
||||
const query = `INSERT INTO node_stats(
|
||||
public_key,
|
||||
added,
|
||||
capacity,
|
||||
channels
|
||||
)
|
||||
VALUES (?, FROM_UNIXTIME(?), ?, ?)`;
|
||||
|
||||
await DB.query(query, [
|
||||
node.public_key,
|
||||
date.getTime() / 1000,
|
||||
totalCapacity,
|
||||
channelsCount,
|
||||
]);
|
||||
date.setUTCDate(date.getUTCDate() + 1);
|
||||
}
|
||||
logger.debug('Updated node_stats for: ' + node.alias);
|
||||
}
|
||||
logger.info('Historical stats populated.');
|
||||
} catch (e) {
|
||||
logger.err('$populateHistoricalNodeData() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
/**
|
||||
* Update the latest entry for each node every config.LIGHTNING.STATS_REFRESH_INTERVAL seconds
|
||||
*/
|
||||
private async $logStatsDaily(): Promise<void> {
|
||||
const date = new Date();
|
||||
Common.setDateMidnight(date);
|
||||
const networkGraph = await lightningApi.$getNetworkGraph();
|
||||
LightningStatsImporter.computeNetworkStats(date.getTime() / 1000, networkGraph);
|
||||
|
||||
logger.info(`Updated latest network stats`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
118
backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts
Normal file
118
backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts
Normal file
@@ -0,0 +1,118 @@
|
||||
import { existsSync, promises } from 'fs';
|
||||
import bitcoinClient from '../../../api/bitcoin/bitcoin-client';
|
||||
import { Common } from '../../../api/common';
|
||||
import config from '../../../config';
|
||||
import logger from '../../../logger';
|
||||
|
||||
const fsPromises = promises;
|
||||
|
||||
const BLOCKS_CACHE_MAX_SIZE = 100;
|
||||
const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json';
|
||||
|
||||
class FundingTxFetcher {
|
||||
private running = false;
|
||||
private blocksCache = {};
|
||||
private channelNewlyProcessed = 0;
|
||||
public fundingTxCache = {};
|
||||
|
||||
async $init(): Promise<void> {
|
||||
// Load funding tx disk cache
|
||||
if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) {
|
||||
try {
|
||||
this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8'));
|
||||
} catch (e) {
|
||||
logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`);
|
||||
this.fundingTxCache = {};
|
||||
}
|
||||
logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`);
|
||||
}
|
||||
}
|
||||
|
||||
async $fetchChannelsFundingTxs(channelIds: string[]): Promise<void> {
|
||||
if (this.running) {
|
||||
return;
|
||||
}
|
||||
this.running = true;
|
||||
|
||||
const globalTimer = new Date().getTime() / 1000;
|
||||
let cacheTimer = new Date().getTime() / 1000;
|
||||
let loggerTimer = new Date().getTime() / 1000;
|
||||
let channelProcessed = 0;
|
||||
this.channelNewlyProcessed = 0;
|
||||
for (const channelId of channelIds) {
|
||||
await this.$fetchChannelOpenTx(channelId);
|
||||
++channelProcessed;
|
||||
|
||||
let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer);
|
||||
logger.info(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` +
|
||||
`(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` +
|
||||
`elapsed: ${elapsedSeconds} seconds`
|
||||
);
|
||||
loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
|
||||
elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer);
|
||||
if (elapsedSeconds > 60) {
|
||||
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
|
||||
fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
|
||||
cacheTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channelNewlyProcessed > 0) {
|
||||
logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`);
|
||||
logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`);
|
||||
fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache));
|
||||
}
|
||||
|
||||
this.running = false;
|
||||
}
|
||||
|
||||
public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> {
|
||||
if (channelId.indexOf('x') === -1) {
|
||||
channelId = Common.channelIntegerIdToShortId(channelId);
|
||||
}
|
||||
|
||||
if (this.fundingTxCache[channelId]) {
|
||||
return this.fundingTxCache[channelId];
|
||||
}
|
||||
|
||||
const parts = channelId.split('x');
|
||||
const blockHeight = parts[0];
|
||||
const txIdx = parts[1];
|
||||
const outputIdx = parts[2];
|
||||
|
||||
let block = this.blocksCache[blockHeight];
|
||||
// Fetch it from core
|
||||
if (!block) {
|
||||
const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10));
|
||||
block = await bitcoinClient.getBlock(blockHash, 1);
|
||||
}
|
||||
this.blocksCache[block.height] = block;
|
||||
|
||||
const blocksCacheHashes = Object.keys(this.blocksCache).sort((a, b) => parseInt(b) - parseInt(a)).reverse();
|
||||
if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) {
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
delete this.blocksCache[blocksCacheHashes[i]];
|
||||
}
|
||||
}
|
||||
|
||||
const txid = block.tx[txIdx];
|
||||
const rawTx = await bitcoinClient.getRawTransaction(txid);
|
||||
const tx = await bitcoinClient.decodeRawTransaction(rawTx);
|
||||
|
||||
this.fundingTxCache[channelId] = {
|
||||
timestamp: block.time,
|
||||
txid: txid,
|
||||
value: tx.vout[outputIdx].value,
|
||||
};
|
||||
|
||||
++this.channelNewlyProcessed;
|
||||
|
||||
return this.fundingTxCache[channelId];
|
||||
}
|
||||
}
|
||||
|
||||
export default new FundingTxFetcher;
|
||||
@@ -1,49 +1,76 @@
|
||||
import * as net from 'net';
|
||||
import maxmind, { CityResponse, AsnResponse } from 'maxmind';
|
||||
import maxmind, { CityResponse, AsnResponse, IspResponse } from 'maxmind';
|
||||
import nodesApi from '../../../api/explorer/nodes.api';
|
||||
import config from '../../../config';
|
||||
import DB from '../../../database';
|
||||
import logger from '../../../logger';
|
||||
|
||||
export async function $lookupNodeLocation(): Promise<void> {
|
||||
logger.info(`Running node location updater using Maxmind...`);
|
||||
let loggerTimer = new Date().getTime() / 1000;
|
||||
let progress = 0;
|
||||
|
||||
logger.info(`Running node location updater using Maxmind`);
|
||||
try {
|
||||
const nodes = await nodesApi.$getAllNodes();
|
||||
const lookupCity = await maxmind.open<CityResponse>(config.MAXMIND.GEOLITE2_CITY);
|
||||
const lookupAsn = await maxmind.open<AsnResponse>(config.MAXMIND.GEOLITE2_ASN);
|
||||
const lookupIsp = await maxmind.open<IspResponse>(config.MAXMIND.GEOIP2_ISP);
|
||||
|
||||
for (const node of nodes) {
|
||||
const sockets: string[] = node.sockets.split(',');
|
||||
for (const socket of sockets) {
|
||||
const ip = socket.substring(0, socket.lastIndexOf(':')).replace('[', '').replace(']', '');
|
||||
const hasClearnet = [4, 6].includes(net.isIP(ip));
|
||||
|
||||
if (hasClearnet && ip !== '127.0.1.1' && ip !== '127.0.0.1') {
|
||||
const city = lookupCity.get(ip);
|
||||
const asn = lookupAsn.get(ip);
|
||||
if (city && asn) {
|
||||
const query = `UPDATE nodes SET as_number = ?, city_id = ?, country_id = ?, subdivision_id = ?, longitude = ?, latitude = ?, accuracy_radius = ? WHERE public_key = ?`;
|
||||
const params = [asn.autonomous_system_number, city.city?.geoname_id, city.country?.geoname_id, city.subdivisions ? city.subdivisions[0].geoname_id : null, city.location?.longitude, city.location?.latitude, city.location?.accuracy_radius, node.public_key];
|
||||
const isp = lookupIsp.get(ip);
|
||||
|
||||
if (city && (asn || isp)) {
|
||||
const query = `
|
||||
UPDATE nodes SET
|
||||
as_number = ?,
|
||||
city_id = ?,
|
||||
country_id = ?,
|
||||
subdivision_id = ?,
|
||||
longitude = ?,
|
||||
latitude = ?,
|
||||
accuracy_radius = ?
|
||||
WHERE public_key = ?
|
||||
`;
|
||||
|
||||
const params = [
|
||||
isp?.autonomous_system_number ?? asn?.autonomous_system_number,
|
||||
city.city?.geoname_id,
|
||||
city.country?.geoname_id,
|
||||
city.subdivisions ? city.subdivisions[0].geoname_id : null,
|
||||
city.location?.longitude,
|
||||
city.location?.latitude,
|
||||
city.location?.accuracy_radius,
|
||||
node.public_key
|
||||
];
|
||||
await DB.query(query, params);
|
||||
|
||||
// Store Continent
|
||||
if (city.continent?.geoname_id) {
|
||||
await DB.query(
|
||||
// Store Continent
|
||||
if (city.continent?.geoname_id) {
|
||||
await DB.query(
|
||||
`INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'continent', ?)`,
|
||||
[city.continent?.geoname_id, JSON.stringify(city.continent?.names)]);
|
||||
}
|
||||
}
|
||||
|
||||
// Store Country
|
||||
if (city.country?.geoname_id) {
|
||||
await DB.query(
|
||||
// Store Country
|
||||
if (city.country?.geoname_id) {
|
||||
await DB.query(
|
||||
`INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country', ?)`,
|
||||
[city.country?.geoname_id, JSON.stringify(city.country?.names)]);
|
||||
}
|
||||
}
|
||||
|
||||
// Store Country ISO code
|
||||
if (city.country?.iso_code) {
|
||||
await DB.query(
|
||||
`INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`,
|
||||
[city.country?.geoname_id, city.country?.iso_code]);
|
||||
`INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'country_iso_code', ?)`,
|
||||
[city.country?.geoname_id, city.country?.iso_code]);
|
||||
}
|
||||
|
||||
// Store Division
|
||||
@@ -61,17 +88,24 @@ export async function $lookupNodeLocation(): Promise<void> {
|
||||
}
|
||||
|
||||
// Store AS name
|
||||
if (asn.autonomous_system_organization) {
|
||||
if (isp?.autonomous_system_organization ?? asn?.autonomous_system_organization) {
|
||||
await DB.query(
|
||||
`INSERT IGNORE INTO geo_names (id, type, names) VALUES (?, 'as_organization', ?)`,
|
||||
[asn.autonomous_system_number, JSON.stringify(asn.autonomous_system_organization)]);
|
||||
[isp?.autonomous_system_number ?? asn?.autonomous_system_number, JSON.stringify(isp?.isp ?? asn?.autonomous_system_organization)]);
|
||||
}
|
||||
}
|
||||
|
||||
++progress;
|
||||
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer);
|
||||
if (elapsedSeconds > 10) {
|
||||
logger.info(`Updating node location data ${progress}/${nodes.length}`);
|
||||
loggerTimer = new Date().getTime() / 1000;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(`Node location data updated.`);
|
||||
logger.info(`${progress} nodes location data updated`);
|
||||
} catch (e) {
|
||||
logger.err('$lookupNodeLocation() error: ' + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
411
backend/src/tasks/lightning/sync-tasks/stats-importer.ts
Normal file
411
backend/src/tasks/lightning/sync-tasks/stats-importer.ts
Normal file
@@ -0,0 +1,411 @@
|
||||
import DB from '../../../database';
|
||||
import { promises } from 'fs';
|
||||
import { XMLParser } from 'fast-xml-parser';
|
||||
import logger from '../../../logger';
|
||||
import fundingTxFetcher from './funding-tx-fetcher';
|
||||
import config from '../../../config';
|
||||
|
||||
const fsPromises = promises;
|
||||
|
||||
interface Node {
|
||||
id: string;
|
||||
timestamp: number;
|
||||
features: string;
|
||||
rgb_color: string;
|
||||
alias: string;
|
||||
addresses: unknown[];
|
||||
out_degree: number;
|
||||
in_degree: number;
|
||||
}
|
||||
|
||||
interface Channel {
|
||||
channel_id: string;
|
||||
node1_pub: string;
|
||||
node2_pub: string;
|
||||
timestamp: number;
|
||||
features: string;
|
||||
fee_base_msat: number;
|
||||
fee_rate_milli_msat: number;
|
||||
htlc_minimim_msat: number;
|
||||
cltv_expiry_delta: number;
|
||||
htlc_maximum_msat: number;
|
||||
}
|
||||
|
||||
class LightningStatsImporter {
|
||||
topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER;
|
||||
parser = new XMLParser();
|
||||
|
||||
async $run(): Promise<void> {
|
||||
logger.info(`Importing historical lightning stats`);
|
||||
|
||||
const [channels]: any[] = await DB.query('SELECT short_id from channels;');
|
||||
logger.info('Caching funding txs for currently existing channels');
|
||||
await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id));
|
||||
|
||||
await this.$importHistoricalLightningStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate LN network stats for one day
|
||||
*/
|
||||
public async computeNetworkStats(timestamp: number, networkGraph): Promise<unknown> {
|
||||
// Node counts and network shares
|
||||
let clearnetNodes = 0;
|
||||
let torNodes = 0;
|
||||
let clearnetTorNodes = 0;
|
||||
let unannouncedNodes = 0;
|
||||
|
||||
for (const node of networkGraph.nodes) {
|
||||
let hasOnion = false;
|
||||
let hasClearnet = false;
|
||||
let isUnnanounced = true;
|
||||
|
||||
for (const socket of (node.addresses ?? [])) {
|
||||
hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network);
|
||||
hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network);
|
||||
}
|
||||
if (hasOnion && hasClearnet) {
|
||||
clearnetTorNodes++;
|
||||
isUnnanounced = false;
|
||||
} else if (hasOnion) {
|
||||
torNodes++;
|
||||
isUnnanounced = false;
|
||||
} else if (hasClearnet) {
|
||||
clearnetNodes++;
|
||||
isUnnanounced = false;
|
||||
}
|
||||
if (isUnnanounced) {
|
||||
unannouncedNodes++;
|
||||
}
|
||||
}
|
||||
|
||||
// Channels and node historical stats
|
||||
const nodeStats = {};
|
||||
let capacity = 0;
|
||||
let avgFeeRate = 0;
|
||||
let avgBaseFee = 0;
|
||||
const capacities: number[] = [];
|
||||
const feeRates: number[] = [];
|
||||
const baseFees: number[] = [];
|
||||
const alreadyCountedChannels = {};
|
||||
|
||||
for (const channel of networkGraph.edges) {
|
||||
let short_id = channel.channel_id;
|
||||
if (short_id.indexOf('/') !== -1) {
|
||||
short_id = short_id.slice(0, -2);
|
||||
}
|
||||
|
||||
const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
|
||||
if (!tx) {
|
||||
logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!nodeStats[channel.node1_pub]) {
|
||||
nodeStats[channel.node1_pub] = {
|
||||
capacity: 0,
|
||||
channels: 0,
|
||||
};
|
||||
}
|
||||
if (!nodeStats[channel.node2_pub]) {
|
||||
nodeStats[channel.node2_pub] = {
|
||||
capacity: 0,
|
||||
channels: 0,
|
||||
};
|
||||
}
|
||||
|
||||
if (!alreadyCountedChannels[short_id]) {
|
||||
capacity += Math.round(tx.value * 100000000);
|
||||
capacities.push(Math.round(tx.value * 100000000));
|
||||
alreadyCountedChannels[short_id] = true;
|
||||
|
||||
nodeStats[channel.node1_pub].capacity += Math.round(tx.value * 100000000);
|
||||
nodeStats[channel.node1_pub].channels++;
|
||||
nodeStats[channel.node2_pub].capacity += Math.round(tx.value * 100000000);
|
||||
nodeStats[channel.node2_pub].channels++;
|
||||
}
|
||||
|
||||
if (channel.node1_policy !== undefined) { // Coming from the node
|
||||
for (const policy of [channel.node1_policy, channel.node2_policy]) {
|
||||
if (policy && policy.fee_rate_milli_msat < 5000) {
|
||||
avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10);
|
||||
feeRates.push(parseInt(policy.fee_rate_milli_msat, 10));
|
||||
}
|
||||
if (policy && policy.fee_base_msat < 5000) {
|
||||
avgBaseFee += parseInt(policy.fee_base_msat, 10);
|
||||
baseFees.push(parseInt(policy.fee_base_msat, 10));
|
||||
}
|
||||
}
|
||||
} else { // Coming from the historical import
|
||||
if (channel.fee_rate_milli_msat < 5000) {
|
||||
avgFeeRate += parseInt(channel.fee_rate_milli_msat, 10);
|
||||
feeRates.push(parseInt(channel.fee_rate_milli_msat), 10);
|
||||
}
|
||||
if (channel.fee_base_msat < 5000) {
|
||||
avgBaseFee += parseInt(channel.fee_base_msat, 10);
|
||||
baseFees.push(parseInt(channel.fee_base_msat), 10);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
avgFeeRate /= Math.max(networkGraph.edges.length, 1);
|
||||
avgBaseFee /= Math.max(networkGraph.edges.length, 1);
|
||||
const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
|
||||
const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
|
||||
const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
|
||||
const avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
|
||||
|
||||
let query = `INSERT INTO lightning_stats(
|
||||
added,
|
||||
channel_count,
|
||||
node_count,
|
||||
total_capacity,
|
||||
tor_nodes,
|
||||
clearnet_nodes,
|
||||
unannounced_nodes,
|
||||
clearnet_tor_nodes,
|
||||
avg_capacity,
|
||||
avg_fee_rate,
|
||||
avg_base_fee_mtokens,
|
||||
med_capacity,
|
||||
med_fee_rate,
|
||||
med_base_fee_mtokens
|
||||
)
|
||||
VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
added = FROM_UNIXTIME(?),
|
||||
channel_count = ?,
|
||||
node_count = ?,
|
||||
total_capacity = ?,
|
||||
tor_nodes = ?,
|
||||
clearnet_nodes = ?,
|
||||
unannounced_nodes = ?,
|
||||
clearnet_tor_nodes = ?,
|
||||
avg_capacity = ?,
|
||||
avg_fee_rate = ?,
|
||||
avg_base_fee_mtokens = ?,
|
||||
med_capacity = ?,
|
||||
med_fee_rate = ?,
|
||||
med_base_fee_mtokens = ?
|
||||
`;
|
||||
|
||||
await DB.query(query, [
|
||||
timestamp,
|
||||
capacities.length,
|
||||
networkGraph.nodes.length,
|
||||
capacity,
|
||||
torNodes,
|
||||
clearnetNodes,
|
||||
unannouncedNodes,
|
||||
clearnetTorNodes,
|
||||
avgCapacity,
|
||||
avgFeeRate,
|
||||
avgBaseFee,
|
||||
medCapacity,
|
||||
medFeeRate,
|
||||
medBaseFee,
|
||||
timestamp,
|
||||
capacities.length,
|
||||
networkGraph.nodes.length,
|
||||
capacity,
|
||||
torNodes,
|
||||
clearnetNodes,
|
||||
unannouncedNodes,
|
||||
clearnetTorNodes,
|
||||
avgCapacity,
|
||||
avgFeeRate,
|
||||
avgBaseFee,
|
||||
medCapacity,
|
||||
medFeeRate,
|
||||
medBaseFee,
|
||||
]);
|
||||
|
||||
for (const public_key of Object.keys(nodeStats)) {
|
||||
query = `INSERT INTO node_stats(
|
||||
public_key,
|
||||
added,
|
||||
capacity,
|
||||
channels
|
||||
)
|
||||
VALUES (?, FROM_UNIXTIME(?), ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
added = FROM_UNIXTIME(?),
|
||||
capacity = ?,
|
||||
channels = ?
|
||||
`;
|
||||
|
||||
await DB.query(query, [
|
||||
public_key,
|
||||
timestamp,
|
||||
nodeStats[public_key].capacity,
|
||||
nodeStats[public_key].channels,
|
||||
timestamp,
|
||||
nodeStats[public_key].capacity,
|
||||
nodeStats[public_key].channels,
|
||||
]);
|
||||
}
|
||||
|
||||
return {
|
||||
added: timestamp,
|
||||
node_count: networkGraph.nodes.length
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Import topology files LN historical data into the database
|
||||
*/
|
||||
async $importHistoricalLightningStats(): Promise<void> {
|
||||
let latestNodeCount = 1;
|
||||
|
||||
const fileList = await fsPromises.readdir(this.topologiesFolder);
|
||||
// Insert history from the most recent to the oldest
|
||||
// This also put the .json cached files first
|
||||
fileList.sort().reverse();
|
||||
|
||||
const [rows]: any[] = await DB.query(`
|
||||
SELECT UNIX_TIMESTAMP(added) AS added, node_count
|
||||
FROM lightning_stats
|
||||
ORDER BY added DESC
|
||||
`);
|
||||
const existingStatsTimestamps = {};
|
||||
for (const row of rows) {
|
||||
existingStatsTimestamps[row.added] = row;
|
||||
}
|
||||
|
||||
// For logging purpose
|
||||
let processed = 10;
|
||||
let totalProcessed = -1;
|
||||
|
||||
for (const filename of fileList) {
|
||||
processed++;
|
||||
totalProcessed++;
|
||||
|
||||
const timestamp = parseInt(filename.split('_')[1], 10);
|
||||
|
||||
// Stats exist already, don't calculate/insert them
|
||||
if (existingStatsTimestamps[timestamp] !== undefined) {
|
||||
latestNodeCount = existingStatsTimestamps[timestamp].node_count;
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
|
||||
const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
|
||||
|
||||
let graph;
|
||||
if (filename.indexOf('.json') !== -1) {
|
||||
try {
|
||||
graph = JSON.parse(fileContent);
|
||||
} catch (e) {
|
||||
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
graph = this.parseFile(fileContent);
|
||||
if (!graph) {
|
||||
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
|
||||
continue;
|
||||
}
|
||||
await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph));
|
||||
}
|
||||
|
||||
if (timestamp > 1556316000) {
|
||||
// "No, the reason most likely is just that I started collection in 2019,
|
||||
// so what I had before that is just the survivors from before, which weren't that many"
|
||||
const diffRatio = graph.nodes.length / latestNodeCount;
|
||||
if (diffRatio < 0.9) {
|
||||
// Ignore drop of more than 90% of the node count as it's probably a missing data point
|
||||
logger.debug(`Nodes count diff ratio threshold reached, ignore the data for this day ${graph.nodes.length} nodes vs ${latestNodeCount}`);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
latestNodeCount = graph.nodes.length;
|
||||
|
||||
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
|
||||
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
|
||||
|
||||
if (processed > 10) {
|
||||
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
|
||||
processed = 0;
|
||||
} else {
|
||||
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
|
||||
}
|
||||
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
|
||||
const stat = await this.computeNetworkStats(timestamp, graph);
|
||||
|
||||
existingStatsTimestamps[timestamp] = stat;
|
||||
}
|
||||
|
||||
logger.info(`Lightning network stats historical import completed`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the file content into XML, and return a list of nodes and channels
|
||||
*/
|
||||
private parseFile(fileContent): any {
|
||||
const graph = this.parser.parse(fileContent);
|
||||
if (Object.keys(graph).length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nodes: Node[] = [];
|
||||
const channels: Channel[] = [];
|
||||
|
||||
// If there is only one entry, the parser does not return an array, so we override this
|
||||
if (!Array.isArray(graph.graphml.graph.node)) {
|
||||
graph.graphml.graph.node = [graph.graphml.graph.node];
|
||||
}
|
||||
if (!Array.isArray(graph.graphml.graph.edge)) {
|
||||
graph.graphml.graph.edge = [graph.graphml.graph.edge];
|
||||
}
|
||||
|
||||
for (const node of graph.graphml.graph.node) {
|
||||
if (!node.data) {
|
||||
continue;
|
||||
}
|
||||
const addresses: unknown[] = [];
|
||||
const sockets = node.data[5].split(',');
|
||||
for (const socket of sockets) {
|
||||
const parts = socket.split('://');
|
||||
addresses.push({
|
||||
network: parts[0],
|
||||
addr: parts[1],
|
||||
});
|
||||
}
|
||||
nodes.push({
|
||||
id: node.data[0],
|
||||
timestamp: node.data[1],
|
||||
features: node.data[2],
|
||||
rgb_color: node.data[3],
|
||||
alias: node.data[4],
|
||||
addresses: addresses,
|
||||
out_degree: node.data[6],
|
||||
in_degree: node.data[7],
|
||||
});
|
||||
}
|
||||
|
||||
for (const channel of graph.graphml.graph.edge) {
|
||||
if (!channel.data) {
|
||||
continue;
|
||||
}
|
||||
channels.push({
|
||||
channel_id: channel.data[0],
|
||||
node1_pub: channel.data[1],
|
||||
node2_pub: channel.data[2],
|
||||
timestamp: channel.data[3],
|
||||
features: channel.data[4],
|
||||
fee_base_msat: channel.data[5],
|
||||
fee_rate_milli_msat: channel.data[6],
|
||||
htlc_minimim_msat: channel.data[7],
|
||||
cltv_expiry_delta: channel.data[8],
|
||||
htlc_maximum_msat: channel.data[9],
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
nodes: nodes,
|
||||
edges: channels,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export default new LightningStatsImporter;
|
||||
Reference in New Issue
Block a user