Merge branch 'master' into nymkappa/feature/improve-location

This commit is contained in:
wiz
2022-08-21 18:51:06 +09:00
committed by GitHub
49 changed files with 1457 additions and 297 deletions

View File

@@ -207,6 +207,10 @@ export class Common {
/** Decodes a channel id returned by lnd as uint64 to a short channel id */
static channelIntegerIdToShortId(id: string): string {
if (id.indexOf('/') !== -1) {
id = id.slice(0, -2);
}
if (id.indexOf('x') !== -1) { // Already a short id
return id;
}

View File

@@ -4,7 +4,7 @@ import logger from '../logger';
import { Common } from './common';
class DatabaseMigration {
private static currentVersion = 37;
private static currentVersion = 38;
private queryTimeout = 120000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@@ -248,7 +248,6 @@ class DatabaseMigration {
}
if (databaseSchemaVersion < 25 && isBitcoin === true) {
await this.$executeQuery(`INSERT INTO state VALUES('last_node_stats', 0, '1970-01-01');`);
await this.$executeQuery(this.getCreateLightningStatisticsQuery(), await this.$checkIfTableExists('lightning_stats'));
await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes'));
await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels'));
@@ -328,6 +327,16 @@ class DatabaseMigration {
if (databaseSchemaVersion < 37 && isBitcoin == true) {
await this.$executeQuery(this.getCreateLNNodesSocketsTableQuery(), await this.$checkIfTableExists('nodes_sockets'));
}
if (databaseSchemaVersion < 38 && isBitcoin == true) {
if (config.LIGHTNING.ENABLED) {
this.uniqueLog(logger.notice, `'lightning_stats' and 'node_stats' tables have been truncated.`);
}
await this.$executeQuery(`TRUNCATE lightning_stats`);
await this.$executeQuery(`TRUNCATE node_stats`);
await this.$executeQuery('ALTER TABLE `lightning_stats` CHANGE `added` `added` timestamp NULL');
await this.$executeQuery('ALTER TABLE `node_stats` CHANGE `added` `added` timestamp NULL');
}
}
/**

View File

@@ -61,9 +61,14 @@ class ChannelsApi {
}
}
public async $getChannelsByStatus(status: number): Promise<any[]> {
public async $getChannelsByStatus(status: number | number[]): Promise<any[]> {
try {
const query = `SELECT * FROM channels WHERE status = ?`;
let query: string;
if (Array.isArray(status)) {
query = `SELECT * FROM channels WHERE status IN (${status.join(',')})`;
} else {
query = `SELECT * FROM channels WHERE status = ?`;
}
const [rows]: any = await DB.query(query, [status]);
return rows;
} catch (e) {
@@ -218,23 +223,25 @@ class ChannelsApi {
// Channels originating from node
let query = `
SELECT node2.alias, node2.public_key, channels.status, channels.node1_fee_rate,
SELECT COALESCE(node2.alias, SUBSTRING(node2_public_key, 0, 20)) AS alias, COALESCE(node2.public_key, node2_public_key) AS public_key,
channels.status, channels.node1_fee_rate,
channels.capacity, channels.short_id, channels.id
FROM channels
JOIN nodes AS node2 ON node2.public_key = channels.node2_public_key
LEFT JOIN nodes AS node2 ON node2.public_key = channels.node2_public_key
WHERE node1_public_key = ? AND channels.status ${channelStatusFilter}
`;
const [channelsFromNode]: any = await DB.query(query, [public_key, index, length]);
const [channelsFromNode]: any = await DB.query(query, [public_key]);
// Channels incoming to node
query = `
SELECT node1.alias, node1.public_key, channels.status, channels.node2_fee_rate,
SELECT COALESCE(node1.alias, SUBSTRING(node1_public_key, 0, 20)) AS alias, COALESCE(node1.public_key, node1_public_key) AS public_key,
channels.status, channels.node2_fee_rate,
channels.capacity, channels.short_id, channels.id
FROM channels
JOIN nodes AS node1 ON node1.public_key = channels.node1_public_key
LEFT JOIN nodes AS node1 ON node1.public_key = channels.node1_public_key
WHERE node2_public_key = ? AND channels.status ${channelStatusFilter}
`;
const [channelsToNode]: any = await DB.query(query, [public_key, index, length]);
const [channelsToNode]: any = await DB.query(query, [public_key]);
let allChannels = channelsFromNode.concat(channelsToNode);
allChannels.sort((a, b) => {
@@ -337,7 +344,7 @@ class ChannelsApi {
/**
* Save or update a channel present in the graph
*/
public async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
public async $saveChannel(channel: ILightningApi.Channel, status = 1): Promise<void> {
const [ txid, vout ] = channel.chan_point.split(':');
const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {};
@@ -369,11 +376,11 @@ class ChannelsApi {
node2_min_htlc_mtokens,
node2_updated_at
)
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ${status}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
capacity = ?,
updated_at = ?,
status = 1,
status = ${status},
node1_public_key = ?,
node1_base_fee_mtokens = ?,
node1_cltv_delta = ?,

View File

@@ -2,6 +2,7 @@ import logger from '../../logger';
import DB from '../../database';
import { ResultSetHeader } from 'mysql2';
import { ILightningApi } from '../lightning/lightning-api.interface';
import { ITopNodesPerCapacity, ITopNodesPerChannels } from '../../mempool.interfaces';
class NodesApi {
public async $getNode(public_key: string): Promise<any> {
@@ -9,10 +10,10 @@ class NodesApi {
// General info
let query = `
SELECT public_key, alias, UNIX_TIMESTAMP(first_seen) AS first_seen,
UNIX_TIMESTAMP(updated_at) AS updated_at, color, sockets as sockets,
as_number, city_id, country_id, subdivision_id, longitude, latitude,
geo_names_iso.names as iso_code, geo_names_as.names as as_organization, geo_names_city.names as city,
geo_names_country.names as country, geo_names_subdivision.names as subdivision
UNIX_TIMESTAMP(updated_at) AS updated_at, color, sockets as sockets,
as_number, city_id, country_id, subdivision_id, longitude, latitude,
geo_names_iso.names as iso_code, geo_names_as.names as as_organization, geo_names_city.names as city,
geo_names_country.names as country, geo_names_subdivision.names as subdivision
FROM nodes
LEFT JOIN geo_names geo_names_as on geo_names_as.id = as_number
LEFT JOIN geo_names geo_names_city on geo_names_city.id = city_id
@@ -112,20 +113,46 @@ class NodesApi {
}
}
public async $getTopCapacityNodes(): Promise<any> {
public async $getTopCapacityNodes(full: boolean): Promise<ITopNodesPerCapacity[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
const query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 10;
`;
[rows] = await DB.query(query);
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.capacity
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 100
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {
@@ -134,20 +161,94 @@ class NodesApi {
}
}
public async $getTopChannelsNodes(): Promise<any> {
public async $getTopChannelsNodes(full: boolean): Promise<ITopNodesPerChannels[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
const query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 10;
`;
[rows] = await DB.query(query);
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key as publicKey, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 100;
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {
logger.err('$getTopChannelsNodes error: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $getOldestNodes(full: boolean): Promise<ITopNodesPerChannels[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY first_seen
LIMIT 100;
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY first_seen
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {

View File

@@ -2,6 +2,7 @@ import config from '../../config';
import { Application, Request, Response } from 'express';
import nodesApi from './nodes.api';
import DB from '../../database';
import { INodesRanking } from '../../mempool.interfaces';
class NodesRoutes {
constructor() { }
@@ -10,10 +11,13 @@ class NodesRoutes {
app
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/country/:country', this.$getNodesPerCountry)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/search/:search', this.$searchNode)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/top', this.$getTopNodes)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp-ranking', this.$getISPRanking)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp/:isp', this.$getNodesPerISP)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/countries', this.$getNodesCountries)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings', this.$getNodesRanking)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/capacity', this.$getTopNodesByCapacity)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/channels', this.$getTopNodesByChannels)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/age', this.$getOldestNodes)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key/statistics', this.$getHistoricalNodeStats)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key', this.$getNode)
;
@@ -56,11 +60,14 @@ class NodesRoutes {
}
}
private async $getTopNodes(req: Request, res: Response) {
private async $getNodesRanking(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopCapacityNodes();
const topChannelsNodes = await nodesApi.$getTopChannelsNodes();
res.json({
const topCapacityNodes = await nodesApi.$getTopCapacityNodes(false);
const topChannelsNodes = await nodesApi.$getTopChannelsNodes(false);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(<INodesRanking>{
topByCapacity: topCapacityNodes,
topByChannels: topChannelsNodes,
});
@@ -69,6 +76,42 @@ class NodesRoutes {
}
}
private async $getTopNodesByCapacity(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopCapacityNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getTopNodesByChannels(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopChannelsNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getOldestNodes(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getOldestNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getISPRanking(req: Request, res: Response): Promise<void> {
try {
const nodesPerAs = await nodesApi.$getNodesISPRanking();

View File

@@ -189,7 +189,7 @@ class Server {
await networkSyncService.$startService();
await lightningStatsUpdater.$startService();
} catch(e) {
logger.err(`Lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(`Nodejs lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
await Common.sleep$(1000 * 60);
this.$runLightningBackend();
};

View File

@@ -251,3 +251,41 @@ export interface RewardStats {
totalFee: number;
totalTx: number;
}
export interface ITopNodesPerChannels {
publicKey: string,
alias: string,
channels?: number,
capacity: number,
firstSeen?: number,
updatedAt?: number,
city?: any,
country?: any,
}
export interface ITopNodesPerCapacity {
publicKey: string,
alias: string,
capacity: number,
channels?: number,
firstSeen?: number,
updatedAt?: number,
city?: any,
country?: any,
}
export interface INodesRanking {
topByCapacity: ITopNodesPerCapacity[];
topByChannels: ITopNodesPerChannels[];
}
export interface IOldestNodes {
publicKey: string,
alias: string,
firstSeen: number,
channels?: number,
capacity: number,
updatedAt?: number,
city?: any,
country?: any,
}

View File

@@ -232,8 +232,8 @@ class NetworkSyncService {
let progress = 0;
try {
logger.info(`Starting closed channels scan...`);
const channels = await channelsApi.$getChannelsByStatus(0);
logger.info(`Starting closed channels scan`);
const channels = await channelsApi.$getChannelsByStatus([0, 1]);
for (const channel of channels) {
const spendingTx = await bitcoinApi.$getOutspend(channel.transaction_id, channel.transaction_vout);
if (spendingTx.spent === true && spendingTx.status?.confirmed === true) {

View File

@@ -71,9 +71,7 @@ class FundingTxFetcher {
}
public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> {
if (channelId.indexOf('x') === -1) {
channelId = Common.channelIntegerIdToShortId(channelId);
}
channelId = Common.channelIntegerIdToShortId(channelId);
if (this.fundingTxCache[channelId]) {
return this.fundingTxCache[channelId];

View File

@@ -5,33 +5,11 @@ import fundingTxFetcher from './funding-tx-fetcher';
import config from '../../../config';
import { ILightningApi } from '../../../api/lightning/lightning-api.interface';
import { isIP } from 'net';
import { Common } from '../../../api/common';
import channelsApi from '../../../api/explorer/channels.api';
const fsPromises = promises;
interface Node {
id: string;
timestamp: number;
features: string;
rgb_color: string;
alias: string;
addresses: unknown[];
out_degree: number;
in_degree: number;
}
interface Channel {
channel_id: string;
node1_pub: string;
node2_pub: string;
timestamp: number;
features: string;
fee_base_msat: number;
fee_rate_milli_msat: number;
htlc_minimim_msat: number;
cltv_expiry_delta: number;
htlc_maximum_msat: number;
}
class LightningStatsImporter {
topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER;
@@ -46,7 +24,8 @@ class LightningStatsImporter {
/**
* Generate LN network stats for one day
*/
public async computeNetworkStats(timestamp: number, networkGraph: ILightningApi.NetworkGraph): Promise<unknown> {
public async computeNetworkStats(timestamp: number,
networkGraph: ILightningApi.NetworkGraph, isHistorical: boolean = false): Promise<unknown> {
// Node counts and network shares
let clearnetNodes = 0;
let torNodes = 0;
@@ -59,11 +38,11 @@ class LightningStatsImporter {
let isUnnanounced = true;
for (const socket of (node.addresses ?? [])) {
if (!socket.network?.length || !socket.addr?.length) {
if (!socket.network?.length && !socket.addr?.length) {
continue;
}
hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1;
hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0]));
hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1 || socket.addr.indexOf('torv2') !== -1 || socket.addr.indexOf('torv3') !== -1;
hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0])) || socket.addr.indexOf('ipv4') !== -1 || socket.addr.indexOf('ipv6') !== -1;;
}
if (hasOnion && hasClearnet) {
clearnetTorNodes++;
@@ -90,11 +69,14 @@ class LightningStatsImporter {
const baseFees: number[] = [];
const alreadyCountedChannels = {};
const [channelsInDbRaw]: any[] = await DB.query(`SELECT short_id, created FROM channels`);
const channelsInDb = {};
for (const channel of channelsInDbRaw) {
channelsInDb[channel.short_id] = channel;
}
for (const channel of networkGraph.edges) {
let short_id = channel.channel_id;
if (short_id.indexOf('/') !== -1) {
short_id = short_id.slice(0, -2);
}
const short_id = Common.channelIntegerIdToShortId(channel.channel_id);
const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
if (!tx) {
@@ -102,6 +84,31 @@ class LightningStatsImporter {
continue;
}
// Channel is already in db, check if we need to update 'created' field
if (isHistorical === true) {
//@ts-ignore
if (channelsInDb[short_id] && channel.timestamp < channel.created) {
await DB.query(`
UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.short_id = ?`,
//@ts-ignore
[channel.timestamp, short_id]
);
} else if (!channelsInDb[short_id]) {
await channelsApi.$saveChannel({
channel_id: short_id,
chan_point: `${tx.txid}:${short_id.split('x')[2]}`,
//@ts-ignore
last_update: channel.timestamp,
node1_pub: channel.node1_pub,
node2_pub: channel.node2_pub,
capacity: (tx.value * 100000000).toString(),
node1_policy: null,
node2_policy: null,
}, 0);
channelsInDb[channel.channel_id] = channel;
}
}
if (!nodeStats[channel.node1_pub]) {
nodeStats[channel.node1_pub] = {
capacity: 0,
@@ -126,7 +133,7 @@ class LightningStatsImporter {
nodeStats[channel.node2_pub].channels++;
}
if (channel.node1_policy !== undefined) { // Coming from the node
if (isHistorical === false) { // Coming from the node
for (const policy of [channel.node1_policy, channel.node2_policy]) {
if (policy && parseInt(policy.fee_rate_milli_msat, 10) < 5000) {
avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10);
@@ -137,30 +144,42 @@ class LightningStatsImporter {
baseFees.push(parseInt(policy.fee_base_msat, 10));
}
}
} else { // Coming from the historical import
} else {
// @ts-ignore
if (channel.fee_rate_milli_msat < 5000) {
if (channel.node1_policy.fee_rate_milli_msat < 5000) {
// @ts-ignore
avgFeeRate += parseInt(channel.fee_rate_milli_msat, 10);
avgFeeRate += parseInt(channel.node1_policy.fee_rate_milli_msat, 10);
// @ts-ignore
feeRates.push(parseInt(channel.fee_rate_milli_msat), 10);
feeRates.push(parseInt(channel.node1_policy.fee_rate_milli_msat), 10);
}
// @ts-ignore
if (channel.fee_base_msat < 5000) {
if (channel.node1_policy.fee_base_msat < 5000) {
// @ts-ignore
avgBaseFee += parseInt(channel.fee_base_msat, 10);
avgBaseFee += parseInt(channel.node1_policy.fee_base_msat, 10);
// @ts-ignore
baseFees.push(parseInt(channel.fee_base_msat), 10);
baseFees.push(parseInt(channel.node1_policy.fee_base_msat), 10);
}
}
}
let medCapacity = 0;
let medFeeRate = 0;
let medBaseFee = 0;
let avgCapacity = 0;
avgFeeRate /= Math.max(networkGraph.edges.length, 1);
avgBaseFee /= Math.max(networkGraph.edges.length, 1);
const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
const avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
if (capacities.length > 0) {
medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
}
if (feeRates.length > 0) {
medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
}
if (baseFees.length > 0) {
medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
}
let query = `INSERT INTO lightning_stats(
added,
@@ -262,83 +281,154 @@ class LightningStatsImporter {
* Import topology files LN historical data into the database
*/
async $importHistoricalLightningStats(): Promise<void> {
const fileList = await fsPromises.readdir(this.topologiesFolder);
// Insert history from the most recent to the oldest
// This also put the .json cached files first
fileList.sort().reverse();
const [rows]: any[] = await DB.query(`
SELECT UNIX_TIMESTAMP(added) AS added, node_count
FROM lightning_stats
ORDER BY added DESC
`);
const existingStatsTimestamps = {};
for (const row of rows) {
existingStatsTimestamps[row.added] = row;
}
// For logging purpose
let processed = 10;
let totalProcessed = 0;
let logStarted = false;
for (const filename of fileList) {
processed++;
const timestamp = parseInt(filename.split('_')[1], 10);
// Stats exist already, don't calculate/insert them
if (existingStatsTimestamps[timestamp] !== undefined) {
continue;
}
if (filename.indexOf('.topology') === -1) {
continue;
}
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
let fileContent = '';
try {
let fileList: string[] = [];
try {
fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
} catch (e: any) {
if (e.errno == -1) { // EISDIR - Ignore directorie
fileList = await fsPromises.readdir(this.topologiesFolder);
} catch (e) {
logger.err(`Unable to open topology folder at ${this.topologiesFolder}`);
throw e;
}
// Insert history from the most recent to the oldest
// This also put the .json cached files first
fileList.sort().reverse();
const [rows]: any[] = await DB.query(`
SELECT UNIX_TIMESTAMP(added) AS added, node_count
FROM lightning_stats
ORDER BY added DESC
`);
const existingStatsTimestamps = {};
for (const row of rows) {
existingStatsTimestamps[row.added] = row;
}
// For logging purpose
let processed = 10;
let totalProcessed = 0;
let logStarted = false;
for (const filename of fileList) {
processed++;
const timestamp = parseInt(filename.split('_')[1], 10);
// Stats exist already, don't calculate/insert them
if (existingStatsTimestamps[timestamp] !== undefined) {
totalProcessed++;
continue;
}
if (filename.indexOf('topology_') === -1) {
totalProcessed++;
continue;
}
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
let fileContent = '';
try {
fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
} catch (e: any) {
if (e.errno == -1) { // EISDIR - Ignore directorie
continue;
}
logger.err(`Unable to open ${this.topologiesFolder}/${filename}`);
continue;
}
let graph;
try {
graph = JSON.parse(fileContent);
graph = await this.cleanupTopology(graph);
} catch (e) {
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
continue;
}
if (!logStarted) {
logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`);
logStarted = true;
}
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
totalProcessed++;
if (processed > 10) {
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
processed = 0;
} else {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph, true);
existingStatsTimestamps[timestamp] = stat;
}
let graph;
try {
graph = JSON.parse(fileContent);
} catch (e) {
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
if (totalProcessed > 0) {
logger.info(`Lightning network stats historical import completed`);
}
} catch (e) {
logger.err(`Lightning network stats historical failed. Reason: ${e instanceof Error ? e.message : e}`);
}
}
async cleanupTopology(graph) {
const newGraph = {
nodes: <ILightningApi.Node[]>[],
edges: <ILightningApi.Channel[]>[],
};
for (const node of graph.nodes) {
const addressesParts = (node.addresses ?? '').split(',');
const addresses: any[] = [];
for (const address of addressesParts) {
addresses.push({
network: '',
addr: address
});
}
newGraph.nodes.push({
last_update: node.timestamp ?? 0,
pub_key: node.id ?? null,
alias: node.alias ?? null,
addresses: addresses,
color: node.rgb_color ?? null,
features: {},
});
}
for (const adjacency of graph.adjacency) {
if (adjacency.length === 0) {
continue;
}
if (!logStarted) {
logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`);
logStarted = true;
}
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
totalProcessed++;
if (processed > 10) {
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
processed = 0;
} else {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
for (const edge of adjacency) {
newGraph.edges.push({
channel_id: edge.scid,
chan_point: '',
last_update: edge.timestamp,
node1_pub: edge.source ?? null,
node2_pub: edge.destination ?? null,
capacity: '0', // Will be fetch later
node1_policy: {
time_lock_delta: edge.cltv_expiry_delta,
min_htlc: edge.htlc_minimim_msat,
fee_base_msat: edge.fee_base_msat,
fee_rate_milli_msat: edge.fee_proportional_millionths,
max_htlc_msat: edge.htlc_maximum_msat,
last_update: edge.timestamp,
disabled: false,
},
node2_policy: null,
});
}
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph);
existingStatsTimestamps[timestamp] = stat;
}
if (totalProcessed > 0) {
logger.info(`Lightning network stats historical import completed`);
}
return newGraph;
}
}