Merge branch 'master' into nymkappa/bugfix/channel-list-pagination

This commit is contained in:
wiz
2022-08-20 20:19:55 +09:00
committed by GitHub
43 changed files with 1479 additions and 422 deletions

View File

@@ -16,7 +16,6 @@
"bitcoinjs-lib": "6.0.2",
"crypto-js": "^4.0.0",
"express": "^4.18.0",
"fast-xml-parser": "^4.0.9",
"maxmind": "^4.3.6",
"mysql2": "2.3.3",
"node-worker-threads-pool": "^1.5.1",
@@ -3136,21 +3135,6 @@
"integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
"dev": true
},
"node_modules/fast-xml-parser": {
"version": "4.0.9",
"resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz",
"integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==",
"dependencies": {
"strnum": "^1.0.5"
},
"bin": {
"fxparser": "src/cli/cli.js"
},
"funding": {
"type": "paypal",
"url": "https://paypal.me/naturalintelligence"
}
},
"node_modules/fastq": {
"version": "1.13.0",
"resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz",
@@ -5636,11 +5620,6 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/strnum": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz",
"integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA=="
},
"node_modules/supports-color": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
@@ -8556,14 +8535,6 @@
"integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
"dev": true
},
"fast-xml-parser": {
"version": "4.0.9",
"resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz",
"integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==",
"requires": {
"strnum": "^1.0.5"
}
},
"fastq": {
"version": "1.13.0",
"resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz",
@@ -10398,11 +10369,6 @@
"integrity": "sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==",
"dev": true
},
"strnum": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz",
"integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA=="
},
"supports-color": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",

View File

@@ -38,7 +38,6 @@
"bitcoinjs-lib": "6.0.2",
"crypto-js": "^4.0.0",
"express": "^4.18.0",
"fast-xml-parser": "^4.0.9",
"maxmind": "^4.3.6",
"mysql2": "2.3.3",
"node-worker-threads-pool": "^1.5.1",

View File

@@ -207,6 +207,10 @@ export class Common {
/** Decodes a channel id returned by lnd as uint64 to a short channel id */
static channelIntegerIdToShortId(id: string): string {
if (id.indexOf('/') !== -1) {
id = id.slice(0, -2);
}
if (id.indexOf('x') !== -1) { // Already a short id
return id;
}

View File

@@ -4,7 +4,7 @@ import logger from '../logger';
import { Common } from './common';
class DatabaseMigration {
private static currentVersion = 37;
private static currentVersion = 38;
private queryTimeout = 120000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@@ -248,7 +248,6 @@ class DatabaseMigration {
}
if (databaseSchemaVersion < 25 && isBitcoin === true) {
await this.$executeQuery(`INSERT INTO state VALUES('last_node_stats', 0, '1970-01-01');`);
await this.$executeQuery(this.getCreateLightningStatisticsQuery(), await this.$checkIfTableExists('lightning_stats'));
await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes'));
await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels'));
@@ -328,6 +327,16 @@ class DatabaseMigration {
if (databaseSchemaVersion < 37 && isBitcoin == true) {
await this.$executeQuery(this.getCreateLNNodesSocketsTableQuery(), await this.$checkIfTableExists('nodes_sockets'));
}
if (databaseSchemaVersion < 38 && isBitcoin == true) {
if (config.LIGHTNING.ENABLED) {
this.uniqueLog(logger.notice, `'lightning_stats' and 'node_stats' tables have been truncated.`);
}
await this.$executeQuery(`TRUNCATE lightning_stats`);
await this.$executeQuery(`TRUNCATE node_stats`);
await this.$executeQuery('ALTER TABLE `lightning_stats` CHANGE `added` `added` timestamp NULL');
await this.$executeQuery('ALTER TABLE `node_stats` CHANGE `added` `added` timestamp NULL');
}
}
/**

View File

@@ -61,9 +61,14 @@ class ChannelsApi {
}
}
public async $getChannelsByStatus(status: number): Promise<any[]> {
public async $getChannelsByStatus(status: number | number[]): Promise<any[]> {
try {
const query = `SELECT * FROM channels WHERE status = ?`;
let query: string;
if (Array.isArray(status)) {
query = `SELECT * FROM channels WHERE status IN (${status.join(',')})`;
} else {
query = `SELECT * FROM channels WHERE status = ?`;
}
const [rows]: any = await DB.query(query, [status]);
return rows;
} catch (e) {
@@ -339,7 +344,7 @@ class ChannelsApi {
/**
* Save or update a channel present in the graph
*/
public async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
public async $saveChannel(channel: ILightningApi.Channel, status = 1): Promise<void> {
const [ txid, vout ] = channel.chan_point.split(':');
const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {};
@@ -371,11 +376,11 @@ class ChannelsApi {
node2_min_htlc_mtokens,
node2_updated_at
)
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ${status}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
capacity = ?,
updated_at = ?,
status = 1,
status = ${status},
node1_public_key = ?,
node1_base_fee_mtokens = ?,
node1_cltv_delta = ?,

View File

@@ -2,6 +2,7 @@ import logger from '../../logger';
import DB from '../../database';
import { ResultSetHeader } from 'mysql2';
import { ILightningApi } from '../lightning/lightning-api.interface';
import { ITopNodesPerCapacity, ITopNodesPerChannels } from '../../mempool.interfaces';
class NodesApi {
public async $getNode(public_key: string): Promise<any> {
@@ -9,10 +10,10 @@ class NodesApi {
// General info
let query = `
SELECT public_key, alias, UNIX_TIMESTAMP(first_seen) AS first_seen,
UNIX_TIMESTAMP(updated_at) AS updated_at, color, sockets as sockets,
as_number, city_id, country_id, subdivision_id, longitude, latitude,
geo_names_iso.names as iso_code, geo_names_as.names as as_organization, geo_names_city.names as city,
geo_names_country.names as country, geo_names_subdivision.names as subdivision
UNIX_TIMESTAMP(updated_at) AS updated_at, color, sockets as sockets,
as_number, city_id, country_id, subdivision_id, longitude, latitude,
geo_names_iso.names as iso_code, geo_names_as.names as as_organization, geo_names_city.names as city,
geo_names_country.names as country, geo_names_subdivision.names as subdivision
FROM nodes
LEFT JOIN geo_names geo_names_as on geo_names_as.id = as_number
LEFT JOIN geo_names geo_names_city on geo_names_city.id = city_id
@@ -112,20 +113,46 @@ class NodesApi {
}
}
public async $getTopCapacityNodes(): Promise<any> {
public async $getTopCapacityNodes(full: boolean): Promise<ITopNodesPerCapacity[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
const query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 10;
`;
[rows] = await DB.query(query);
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.capacity
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 100
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY capacity DESC
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {
@@ -134,20 +161,94 @@ class NodesApi {
}
}
public async $getTopChannelsNodes(): Promise<any> {
public async $getTopChannelsNodes(full: boolean): Promise<ITopNodesPerChannels[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
const query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias, node_stats.capacity, node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 10;
`;
[rows] = await DB.query(query);
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key as publicKey, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 100;
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY channels DESC
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {
logger.err('$getTopChannelsNodes error: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $getOldestNodes(full: boolean): Promise<ITopNodesPerChannels[]> {
try {
let [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(MAX(added)) as maxAdded FROM node_stats');
const latestDate = rows[0].maxAdded;
let query: string;
if (full === false) {
query = `
SELECT nodes.public_key, IF(nodes.alias = '', SUBSTRING(nodes.public_key, 1, 20), alias) as alias,
node_stats.channels
FROM node_stats
JOIN nodes ON nodes.public_key = node_stats.public_key
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY first_seen
LIMIT 100;
`;
[rows] = await DB.query(query);
} else {
query = `
SELECT node_stats.public_key AS publicKey, IF(nodes.alias = '', SUBSTRING(node_stats.public_key, 1, 20), alias) as alias,
CAST(COALESCE(node_stats.channels, 0) as INT) as channels,
CAST(COALESCE(node_stats.capacity, 0) as INT) as capacity,
UNIX_TIMESTAMP(nodes.first_seen) as firstSeen, UNIX_TIMESTAMP(nodes.updated_at) as updatedAt,
geo_names_city.names as city, geo_names_country.names as country
FROM node_stats
RIGHT JOIN nodes ON nodes.public_key = node_stats.public_key
LEFT JOIN geo_names geo_names_country ON geo_names_country.id = nodes.country_id AND geo_names_country.type = 'country'
LEFT JOIN geo_names geo_names_city ON geo_names_city.id = nodes.city_id AND geo_names_city.type = 'city'
WHERE added = FROM_UNIXTIME(${latestDate})
ORDER BY first_seen
LIMIT 100
`;
[rows] = await DB.query(query);
for (let i = 0; i < rows.length; ++i) {
rows[i].country = JSON.parse(rows[i].country);
rows[i].city = JSON.parse(rows[i].city);
}
}
return rows;
} catch (e) {
@@ -168,64 +269,115 @@ class NodesApi {
}
}
public async $getNodesISPRanking(groupBy: string, showTor: boolean) {
public async $getNodesISPRanking() {
try {
const orderBy = groupBy === 'capacity' ? `CAST(SUM(capacity) as INT)` : `COUNT(DISTINCT nodes.public_key)`;
// Clearnet
let query = `SELECT GROUP_CONCAT(DISTINCT(nodes.as_number)) as ispId, geo_names.names as names,
COUNT(DISTINCT nodes.public_key) as nodesCount, CAST(SUM(capacity) as INT) as capacity
FROM nodes
JOIN geo_names ON geo_names.id = nodes.as_number
JOIN channels ON channels.node1_public_key = nodes.public_key OR channels.node2_public_key = nodes.public_key
GROUP BY geo_names.names
ORDER BY ${orderBy} DESC
`;
const [nodesCountPerAS]: any = await DB.query(query);
let query = '';
let total = 0;
const nodesPerAs: any[] = [];
// List all channels and the two linked ISP
query = `
SELECT short_id, capacity,
channels.node1_public_key AS node1PublicKey, isp1.names AS isp1, isp1.id as isp1ID,
channels.node2_public_key AS node2PublicKey, isp2.names AS isp2, isp2.id as isp2ID
FROM channels
JOIN nodes node1 ON node1.public_key = channels.node1_public_key
JOIN nodes node2 ON node2.public_key = channels.node2_public_key
JOIN geo_names isp1 ON isp1.id = node1.as_number
JOIN geo_names isp2 ON isp2.id = node2.as_number
WHERE channels.status = 1
ORDER BY short_id DESC
`;
const [channelsIsp]: any = await DB.query(query);
for (const asGroup of nodesCountPerAS) {
if (groupBy === 'capacity') {
total += asGroup.capacity;
} else {
total += asGroup.nodesCount;
// Sum channels capacity and node count per ISP
const ispList = {};
for (const channel of channelsIsp) {
const isp1 = JSON.parse(channel.isp1);
const isp2 = JSON.parse(channel.isp2);
if (!ispList[isp1]) {
ispList[isp1] = {
id: channel.isp1ID,
capacity: 0,
channels: 0,
nodes: {},
};
}
if (!ispList[isp2]) {
ispList[isp2] = {
id: channel.isp2ID,
capacity: 0,
channels: 0,
nodes: {},
};
}
ispList[isp1].capacity += channel.capacity;
ispList[isp1].channels += 1;
ispList[isp1].nodes[channel.node1PublicKey] = true;
ispList[isp2].capacity += channel.capacity;
ispList[isp2].channels += 1;
ispList[isp2].nodes[channel.node2PublicKey] = true;
}
// Tor
if (showTor) {
query = `SELECT COUNT(DISTINCT nodes.public_key) as nodesCount, CAST(SUM(capacity) as INT) as capacity
FROM nodes
JOIN channels ON channels.node1_public_key = nodes.public_key OR channels.node2_public_key = nodes.public_key
ORDER BY ${orderBy} DESC
`;
const [nodesCountTor]: any = await DB.query(query);
total += groupBy === 'capacity' ? nodesCountTor[0].capacity : nodesCountTor[0].nodesCount;
nodesPerAs.push({
ispId: null,
name: 'Tor',
count: nodesCountTor[0].nodesCount,
share: Math.floor((groupBy === 'capacity' ? nodesCountTor[0].capacity : nodesCountTor[0].nodesCount) / total * 10000) / 100,
capacity: nodesCountTor[0].capacity,
});
const ispRanking: any[] = [];
for (const isp of Object.keys(ispList)) {
ispRanking.push([
ispList[isp].id,
isp,
ispList[isp].capacity,
ispList[isp].channels,
Object.keys(ispList[isp].nodes).length,
]);
}
for (const as of nodesCountPerAS) {
nodesPerAs.push({
ispId: as.ispId,
name: JSON.parse(as.names),
count: as.nodesCount,
share: Math.floor((groupBy === 'capacity' ? as.capacity : as.nodesCount) / total * 10000) / 100,
capacity: as.capacity,
});
}
// Total active channels capacity
query = `SELECT SUM(capacity) AS capacity FROM channels WHERE status = 1`;
const [totalCapacity]: any = await DB.query(query);
return nodesPerAs;
// Get the total capacity of all channels which have at least one node on clearnet
query = `
SELECT SUM(capacity) as capacity
FROM (
SELECT capacity, GROUP_CONCAT(socket1.type, socket2.type) as networks
FROM channels
JOIN nodes_sockets socket1 ON node1_public_key = socket1.public_key
JOIN nodes_sockets socket2 ON node2_public_key = socket2.public_key
AND channels.status = 1
GROUP BY short_id
) channels_tmp
WHERE channels_tmp.networks LIKE '%ipv%'
`;
const [clearnetCapacity]: any = await DB.query(query);
// Get the total capacity of all channels which have both nodes on Tor
query = `
SELECT SUM(capacity) as capacity
FROM (
SELECT capacity, GROUP_CONCAT(socket1.type, socket2.type) as networks
FROM channels
JOIN nodes_sockets socket1 ON node1_public_key = socket1.public_key
JOIN nodes_sockets socket2 ON node2_public_key = socket2.public_key
AND channels.status = 1
GROUP BY short_id
) channels_tmp
WHERE channels_tmp.networks NOT LIKE '%ipv%' AND
channels_tmp.networks NOT LIKE '%dns%' AND
channels_tmp.networks NOT LIKE '%websocket%'
`;
const [torCapacity]: any = await DB.query(query);
const clearnetCapacityValue = parseInt(clearnetCapacity[0].capacity, 10);
const torCapacityValue = parseInt(torCapacity[0].capacity, 10);
const unknownCapacityValue = parseInt(totalCapacity[0].capacity) - clearnetCapacityValue - torCapacityValue;
return {
clearnetCapacity: clearnetCapacityValue,
torCapacity: torCapacityValue,
unknownCapacity: unknownCapacityValue,
ispRanking: ispRanking,
};
} catch (e) {
logger.err(`Cannot get nodes grouped by AS. Reason: ${e instanceof Error ? e.message : e}`);
logger.err(`Cannot get LN ISP ranking. Reason: ${e instanceof Error ? e.message : e}`);
throw e;
}
}

View File

@@ -2,6 +2,7 @@ import config from '../../config';
import { Application, Request, Response } from 'express';
import nodesApi from './nodes.api';
import DB from '../../database';
import { INodesRanking } from '../../mempool.interfaces';
class NodesRoutes {
constructor() { }
@@ -10,10 +11,13 @@ class NodesRoutes {
app
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/country/:country', this.$getNodesPerCountry)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/search/:search', this.$searchNode)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/top', this.$getTopNodes)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp-ranking', this.$getISPRanking)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/isp/:isp', this.$getNodesPerISP)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/countries', this.$getNodesCountries)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings', this.$getNodesRanking)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/capacity', this.$getTopNodesByCapacity)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/channels', this.$getTopNodesByChannels)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/rankings/age', this.$getOldestNodes)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key/statistics', this.$getHistoricalNodeStats)
.get(config.MEMPOOL.API_URL_PREFIX + 'lightning/nodes/:public_key', this.$getNode)
;
@@ -56,11 +60,14 @@ class NodesRoutes {
}
}
private async $getTopNodes(req: Request, res: Response) {
private async $getNodesRanking(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopCapacityNodes();
const topChannelsNodes = await nodesApi.$getTopChannelsNodes();
res.json({
const topCapacityNodes = await nodesApi.$getTopCapacityNodes(false);
const topChannelsNodes = await nodesApi.$getTopChannelsNodes(false);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(<INodesRanking>{
topByCapacity: topCapacityNodes,
topByChannels: topChannelsNodes,
});
@@ -69,17 +76,45 @@ class NodesRoutes {
}
}
private async $getTopNodesByCapacity(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopCapacityNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getTopNodesByChannels(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getTopChannelsNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getOldestNodes(req: Request, res: Response): Promise<void> {
try {
const topCapacityNodes = await nodesApi.$getOldestNodes(true);
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
res.json(topCapacityNodes);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}
}
private async $getISPRanking(req: Request, res: Response): Promise<void> {
try {
const groupBy = req.query.groupBy as string;
const showTor = req.query.showTor as string === 'true' ? true : false;
if (!['capacity', 'node-count'].includes(groupBy)) {
res.status(400).send(`groupBy must be one of 'capacity' or 'node-count'`);
return;
}
const nodesPerAs = await nodesApi.$getNodesISPRanking(groupBy, showTor);
const nodesPerAs = await nodesApi.$getNodesISPRanking();
res.header('Pragma', 'public');
res.header('Cache-control', 'public');

View File

@@ -189,7 +189,7 @@ class Server {
await networkSyncService.$startService();
await lightningStatsUpdater.$startService();
} catch(e) {
logger.err(`Lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(`Nodejs lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
await Common.sleep$(1000 * 60);
this.$runLightningBackend();
};

View File

@@ -251,3 +251,41 @@ export interface RewardStats {
totalFee: number;
totalTx: number;
}
export interface ITopNodesPerChannels {
publicKey: string,
alias: string,
channels?: number,
capacity: number,
firstSeen?: number,
updatedAt?: number,
city?: any,
country?: any,
}
export interface ITopNodesPerCapacity {
publicKey: string,
alias: string,
capacity: number,
channels?: number,
firstSeen?: number,
updatedAt?: number,
city?: any,
country?: any,
}
export interface INodesRanking {
topByCapacity: ITopNodesPerCapacity[];
topByChannels: ITopNodesPerChannels[];
}
export interface IOldestNodes {
publicKey: string,
alias: string,
firstSeen: number,
channels?: number,
capacity: number,
updatedAt?: number,
city?: any,
country?: any,
}

View File

@@ -232,8 +232,8 @@ class NetworkSyncService {
let progress = 0;
try {
logger.info(`Starting closed channels scan...`);
const channels = await channelsApi.$getChannelsByStatus(0);
logger.info(`Starting closed channels scan`);
const channels = await channelsApi.$getChannelsByStatus([0, 1]);
for (const channel of channels) {
const spendingTx = await bitcoinApi.$getOutspend(channel.transaction_id, channel.transaction_vout);
if (spendingTx.spent === true && spendingTx.status?.confirmed === true) {

View File

@@ -71,9 +71,7 @@ class FundingTxFetcher {
}
public async $fetchChannelOpenTx(channelId: string): Promise<{timestamp: number, txid: string, value: number}> {
if (channelId.indexOf('x') === -1) {
channelId = Common.channelIntegerIdToShortId(channelId);
}
channelId = Common.channelIntegerIdToShortId(channelId);
if (this.fundingTxCache[channelId]) {
return this.fundingTxCache[channelId];

View File

@@ -1,45 +1,19 @@
import DB from '../../../database';
import { promises } from 'fs';
import { XMLParser } from 'fast-xml-parser';
import logger from '../../../logger';
import fundingTxFetcher from './funding-tx-fetcher';
import config from '../../../config';
import { ILightningApi } from '../../../api/lightning/lightning-api.interface';
import { isIP } from 'net';
import { Common } from '../../../api/common';
import channelsApi from '../../../api/explorer/channels.api';
const fsPromises = promises;
interface Node {
id: string;
timestamp: number;
features: string;
rgb_color: string;
alias: string;
addresses: unknown[];
out_degree: number;
in_degree: number;
}
interface Channel {
channel_id: string;
node1_pub: string;
node2_pub: string;
timestamp: number;
features: string;
fee_base_msat: number;
fee_rate_milli_msat: number;
htlc_minimim_msat: number;
cltv_expiry_delta: number;
htlc_maximum_msat: number;
}
class LightningStatsImporter {
topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER;
parser = new XMLParser();
async $run(): Promise<void> {
logger.info(`Importing historical lightning stats`);
const [channels]: any[] = await DB.query('SELECT short_id from channels;');
logger.info('Caching funding txs for currently existing channels');
await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id));
@@ -50,7 +24,8 @@ class LightningStatsImporter {
/**
* Generate LN network stats for one day
*/
public async computeNetworkStats(timestamp: number, networkGraph: ILightningApi.NetworkGraph): Promise<unknown> {
public async computeNetworkStats(timestamp: number,
networkGraph: ILightningApi.NetworkGraph, isHistorical: boolean = false): Promise<unknown> {
// Node counts and network shares
let clearnetNodes = 0;
let torNodes = 0;
@@ -63,8 +38,11 @@ class LightningStatsImporter {
let isUnnanounced = true;
for (const socket of (node.addresses ?? [])) {
hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1;
hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0]));
if (!socket.network?.length && !socket.addr?.length) {
continue;
}
hasOnion = hasOnion || ['torv2', 'torv3'].includes(socket.network) || socket.addr.indexOf('onion') !== -1 || socket.addr.indexOf('torv2') !== -1 || socket.addr.indexOf('torv3') !== -1;
hasClearnet = hasClearnet || ['ipv4', 'ipv6'].includes(socket.network) || [4, 6].includes(isIP(socket.addr.split(':')[0])) || socket.addr.indexOf('ipv4') !== -1 || socket.addr.indexOf('ipv6') !== -1;;
}
if (hasOnion && hasClearnet) {
clearnetTorNodes++;
@@ -91,11 +69,14 @@ class LightningStatsImporter {
const baseFees: number[] = [];
const alreadyCountedChannels = {};
const [channelsInDbRaw]: any[] = await DB.query(`SELECT short_id, created FROM channels`);
const channelsInDb = {};
for (const channel of channelsInDbRaw) {
channelsInDb[channel.short_id] = channel;
}
for (const channel of networkGraph.edges) {
let short_id = channel.channel_id;
if (short_id.indexOf('/') !== -1) {
short_id = short_id.slice(0, -2);
}
const short_id = Common.channelIntegerIdToShortId(channel.channel_id);
const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
if (!tx) {
@@ -103,6 +84,31 @@ class LightningStatsImporter {
continue;
}
// Channel is already in db, check if we need to update 'created' field
if (isHistorical === true) {
//@ts-ignore
if (channelsInDb[short_id] && channel.timestamp < channel.created) {
await DB.query(`
UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.short_id = ?`,
//@ts-ignore
[channel.timestamp, short_id]
);
} else if (!channelsInDb[short_id]) {
await channelsApi.$saveChannel({
channel_id: short_id,
chan_point: `${tx.txid}:${short_id.split('x')[2]}`,
//@ts-ignore
last_update: channel.timestamp,
node1_pub: channel.node1_pub,
node2_pub: channel.node2_pub,
capacity: (tx.value * 100000000).toString(),
node1_policy: null,
node2_policy: null,
}, 0);
channelsInDb[channel.channel_id] = channel;
}
}
if (!nodeStats[channel.node1_pub]) {
nodeStats[channel.node1_pub] = {
capacity: 0,
@@ -127,7 +133,7 @@ class LightningStatsImporter {
nodeStats[channel.node2_pub].channels++;
}
if (channel.node1_policy !== undefined) { // Coming from the node
if (isHistorical === false) { // Coming from the node
for (const policy of [channel.node1_policy, channel.node2_policy]) {
if (policy && parseInt(policy.fee_rate_milli_msat, 10) < 5000) {
avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10);
@@ -138,30 +144,42 @@ class LightningStatsImporter {
baseFees.push(parseInt(policy.fee_base_msat, 10));
}
}
} else { // Coming from the historical import
} else {
// @ts-ignore
if (channel.fee_rate_milli_msat < 5000) {
if (channel.node1_policy.fee_rate_milli_msat < 5000) {
// @ts-ignore
avgFeeRate += parseInt(channel.fee_rate_milli_msat, 10);
avgFeeRate += parseInt(channel.node1_policy.fee_rate_milli_msat, 10);
// @ts-ignore
feeRates.push(parseInt(channel.fee_rate_milli_msat), 10);
feeRates.push(parseInt(channel.node1_policy.fee_rate_milli_msat), 10);
}
// @ts-ignore
if (channel.fee_base_msat < 5000) {
if (channel.node1_policy.fee_base_msat < 5000) {
// @ts-ignore
avgBaseFee += parseInt(channel.fee_base_msat, 10);
avgBaseFee += parseInt(channel.node1_policy.fee_base_msat, 10);
// @ts-ignore
baseFees.push(parseInt(channel.fee_base_msat), 10);
baseFees.push(parseInt(channel.node1_policy.fee_base_msat), 10);
}
}
}
let medCapacity = 0;
let medFeeRate = 0;
let medBaseFee = 0;
let avgCapacity = 0;
avgFeeRate /= Math.max(networkGraph.edges.length, 1);
avgBaseFee /= Math.max(networkGraph.edges.length, 1);
const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
const avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
if (capacities.length > 0) {
medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
}
if (feeRates.length > 0) {
medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
}
if (baseFees.length > 0) {
medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
}
let query = `INSERT INTO lightning_stats(
added,
@@ -263,156 +281,154 @@ class LightningStatsImporter {
* Import topology files LN historical data into the database
*/
async $importHistoricalLightningStats(): Promise<void> {
let latestNodeCount = 1;
try {
let fileList: string[] = [];
try {
fileList = await fsPromises.readdir(this.topologiesFolder);
} catch (e) {
logger.err(`Unable to open topology folder at ${this.topologiesFolder}`);
throw e;
}
// Insert history from the most recent to the oldest
// This also put the .json cached files first
fileList.sort().reverse();
const fileList = await fsPromises.readdir(this.topologiesFolder);
// Insert history from the most recent to the oldest
// This also put the .json cached files first
fileList.sort().reverse();
const [rows]: any[] = await DB.query(`
SELECT UNIX_TIMESTAMP(added) AS added, node_count
FROM lightning_stats
ORDER BY added DESC
`);
const existingStatsTimestamps = {};
for (const row of rows) {
existingStatsTimestamps[row.added] = row;
}
// For logging purpose
let processed = 10;
let totalProcessed = -1;
for (const filename of fileList) {
processed++;
totalProcessed++;
const timestamp = parseInt(filename.split('_')[1], 10);
// Stats exist already, don't calculate/insert them
if (existingStatsTimestamps[timestamp] !== undefined) {
latestNodeCount = existingStatsTimestamps[timestamp].node_count;
continue;
const [rows]: any[] = await DB.query(`
SELECT UNIX_TIMESTAMP(added) AS added, node_count
FROM lightning_stats
ORDER BY added DESC
`);
const existingStatsTimestamps = {};
for (const row of rows) {
existingStatsTimestamps[row.added] = row;
}
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
// For logging purpose
let processed = 10;
let totalProcessed = 0;
let logStarted = false;
let graph;
if (filename.indexOf('.json') !== -1) {
for (const filename of fileList) {
processed++;
const timestamp = parseInt(filename.split('_')[1], 10);
// Stats exist already, don't calculate/insert them
if (existingStatsTimestamps[timestamp] !== undefined) {
totalProcessed++;
continue;
}
if (filename.indexOf('topology_') === -1) {
totalProcessed++;
continue;
}
logger.debug(`Reading ${this.topologiesFolder}/${filename}`);
let fileContent = '';
try {
fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8');
} catch (e: any) {
if (e.errno == -1) { // EISDIR - Ignore directorie
continue;
}
logger.err(`Unable to open ${this.topologiesFolder}/${filename}`);
continue;
}
let graph;
try {
graph = JSON.parse(fileContent);
graph = await this.cleanupTopology(graph);
} catch (e) {
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
continue;
}
} else {
graph = this.parseFile(fileContent);
if (!graph) {
logger.debug(`Invalid topology file ${this.topologiesFolder}/${filename}, cannot parse the content`);
continue;
if (!logStarted) {
logger.info(`Founds a topology file that we did not import. Importing historical lightning stats now.`);
logStarted = true;
}
await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph));
}
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
if (timestamp > 1556316000) {
// "No, the reason most likely is just that I started collection in 2019,
// so what I had before that is just the survivors from before, which weren't that many"
const diffRatio = graph.nodes.length / latestNodeCount;
if (diffRatio < 0.9) {
// Ignore drop of more than 90% of the node count as it's probably a missing data point
logger.debug(`Nodes count diff ratio threshold reached, ignore the data for this day ${graph.nodes.length} nodes vs ${latestNodeCount}`);
continue;
totalProcessed++;
if (processed > 10) {
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
processed = 0;
} else {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph, true);
existingStatsTimestamps[timestamp] = stat;
}
latestNodeCount = graph.nodes.length;
const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`;
logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.edges.length} channels`);
if (processed > 10) {
logger.info(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
processed = 0;
} else {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
if (totalProcessed > 0) {
logger.info(`Lightning network stats historical import completed`);
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph);
existingStatsTimestamps[timestamp] = stat;
} catch (e) {
logger.err(`Lightning network stats historical failed. Reason: ${e instanceof Error ? e.message : e}`);
}
logger.info(`Lightning network stats historical import completed`);
}
/**
* Parse the file content into XML, and return a list of nodes and channels
*/
private parseFile(fileContent): any {
const graph = this.parser.parse(fileContent);
if (Object.keys(graph).length === 0) {
return null;
}
async cleanupTopology(graph) {
const newGraph = {
nodes: <ILightningApi.Node[]>[],
edges: <ILightningApi.Channel[]>[],
};
const nodes: Node[] = [];
const channels: Channel[] = [];
// If there is only one entry, the parser does not return an array, so we override this
if (!Array.isArray(graph.graphml.graph.node)) {
graph.graphml.graph.node = [graph.graphml.graph.node];
}
if (!Array.isArray(graph.graphml.graph.edge)) {
graph.graphml.graph.edge = [graph.graphml.graph.edge];
}
for (const node of graph.graphml.graph.node) {
if (!node.data) {
continue;
}
const addresses: unknown[] = [];
const sockets = node.data[5].split(',');
for (const socket of sockets) {
const parts = socket.split('://');
for (const node of graph.nodes) {
const addressesParts = (node.addresses ?? '').split(',');
const addresses: any[] = [];
for (const address of addressesParts) {
addresses.push({
network: parts[0],
addr: parts[1],
network: '',
addr: address
});
}
nodes.push({
id: node.data[0],
timestamp: node.data[1],
features: node.data[2],
rgb_color: node.data[3],
alias: node.data[4],
newGraph.nodes.push({
last_update: node.timestamp ?? 0,
pub_key: node.id ?? null,
alias: node.alias ?? null,
addresses: addresses,
out_degree: node.data[6],
in_degree: node.data[7],
color: node.rgb_color ?? null,
features: {},
});
}
for (const channel of graph.graphml.graph.edge) {
if (!channel.data) {
for (const adjacency of graph.adjacency) {
if (adjacency.length === 0) {
continue;
} else {
for (const edge of adjacency) {
newGraph.edges.push({
channel_id: edge.scid,
chan_point: '',
last_update: edge.timestamp,
node1_pub: edge.source ?? null,
node2_pub: edge.destination ?? null,
capacity: '0', // Will be fetch later
node1_policy: {
time_lock_delta: edge.cltv_expiry_delta,
min_htlc: edge.htlc_minimim_msat,
fee_base_msat: edge.fee_base_msat,
fee_rate_milli_msat: edge.fee_proportional_millionths,
max_htlc_msat: edge.htlc_maximum_msat,
last_update: edge.timestamp,
disabled: false,
},
node2_policy: null,
});
}
}
channels.push({
channel_id: channel.data[0],
node1_pub: channel.data[1],
node2_pub: channel.data[2],
timestamp: channel.data[3],
features: channel.data[4],
fee_base_msat: channel.data[5],
fee_rate_milli_msat: channel.data[6],
htlc_minimim_msat: channel.data[7],
cltv_expiry_delta: channel.data[8],
htlc_maximum_msat: channel.data[9],
});
}
return {
nodes: nodes,
edges: channels,
};
return newGraph;
}
}