Merge pull request #2311 from mempool/nymkappa/feature/closed-channel-import

Import historical channels
This commit is contained in:
wiz 2022-08-20 19:09:16 +09:00 committed by GitHub
commit dd2c226354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 27 deletions

View File

@ -248,7 +248,6 @@ class DatabaseMigration {
}
if (databaseSchemaVersion < 25 && isBitcoin === true) {
await this.$executeQuery(`INSERT INTO state VALUES('last_node_stats', 0, '1970-01-01');`);
await this.$executeQuery(this.getCreateLightningStatisticsQuery(), await this.$checkIfTableExists('lightning_stats'));
await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes'));
await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels'));

View File

@ -61,9 +61,14 @@ class ChannelsApi {
}
}
public async $getChannelsByStatus(status: number): Promise<any[]> {
public async $getChannelsByStatus(status: number | number[]): Promise<any[]> {
try {
const query = `SELECT * FROM channels WHERE status = ?`;
let query: string;
if (Array.isArray(status)) {
query = `SELECT * FROM channels WHERE status IN (${status.join(',')})`;
} else {
query = `SELECT * FROM channels WHERE status = ?`;
}
const [rows]: any = await DB.query(query, [status]);
return rows;
} catch (e) {
@ -337,7 +342,7 @@ class ChannelsApi {
/**
* Save or update a channel present in the graph
*/
public async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
public async $saveChannel(channel: ILightningApi.Channel, status = 1): Promise<void> {
const [ txid, vout ] = channel.chan_point.split(':');
const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {};
@ -369,11 +374,11 @@ class ChannelsApi {
node2_min_htlc_mtokens,
node2_updated_at
)
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ${status}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
capacity = ?,
updated_at = ?,
status = 1,
status = ${status},
node1_public_key = ?,
node1_base_fee_mtokens = ?,
node1_cltv_delta = ?,

View File

@ -189,7 +189,7 @@ class Server {
await networkSyncService.$startService();
await lightningStatsUpdater.$startService();
} catch(e) {
logger.err(`Lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
logger.err(`Nodejs lightning backend crashed. Restarting in 1 minute. Reason: ${(e instanceof Error ? e.message : e)}`);
await Common.sleep$(1000 * 60);
this.$runLightningBackend();
};

View File

@ -232,8 +232,8 @@ class NetworkSyncService {
let progress = 0;
try {
logger.info(`Starting closed channels scan...`);
const channels = await channelsApi.$getChannelsByStatus(0);
logger.info(`Starting closed channels scan`);
const channels = await channelsApi.$getChannelsByStatus([0, 1]);
for (const channel of channels) {
const spendingTx = await bitcoinApi.$getOutspend(channel.transaction_id, channel.transaction_vout);
if (spendingTx.spent === true && spendingTx.status?.confirmed === true) {

View File

@ -5,6 +5,8 @@ import fundingTxFetcher from './funding-tx-fetcher';
import config from '../../../config';
import { ILightningApi } from '../../../api/lightning/lightning-api.interface';
import { isIP } from 'net';
import { Common } from '../../../api/common';
import channelsApi from '../../../api/explorer/channels.api';
const fsPromises = promises;
@ -22,7 +24,8 @@ class LightningStatsImporter {
/**
* Generate LN network stats for one day
*/
public async computeNetworkStats(timestamp: number, networkGraph: ILightningApi.NetworkGraph): Promise<unknown> {
public async computeNetworkStats(timestamp: number,
networkGraph: ILightningApi.NetworkGraph, isHistorical: boolean = false): Promise<unknown> {
// Node counts and network shares
let clearnetNodes = 0;
let torNodes = 0;
@ -66,11 +69,14 @@ class LightningStatsImporter {
const baseFees: number[] = [];
const alreadyCountedChannels = {};
const [channelsInDbRaw]: any[] = await DB.query(`SELECT short_id, created FROM channels`);
const channelsInDb = {};
for (const channel of channelsInDbRaw) {
channelsInDb[channel.short_id] = channel;
}
for (const channel of networkGraph.edges) {
let short_id = channel.channel_id;
if (short_id.indexOf('/') !== -1) {
short_id = short_id.slice(0, -2);
}
const short_id = Common.channelIntegerIdToShortId(channel.channel_id);
const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
if (!tx) {
@ -78,6 +84,31 @@ class LightningStatsImporter {
continue;
}
// Channel is already in db, check if we need to update 'created' field
if (isHistorical === true) {
//@ts-ignore
if (channelsInDb[short_id] && channel.timestamp < channel.created) {
await DB.query(`
UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.short_id = ?`,
//@ts-ignore
[channel.timestamp, short_id]
);
} else if (!channelsInDb[short_id]) {
await channelsApi.$saveChannel({
channel_id: short_id,
chan_point: `${tx.txid}:${short_id.split('x')[2]}`,
//@ts-ignore
last_update: channel.timestamp,
node1_pub: channel.node1_pub,
node2_pub: channel.node2_pub,
capacity: (tx.value * 100000000).toString(),
node1_policy: null,
node2_policy: null,
}, 0);
channelsInDb[channel.channel_id] = channel;
}
}
if (!nodeStats[channel.node1_pub]) {
nodeStats[channel.node1_pub] = {
capacity: 0,
@ -102,7 +133,7 @@ class LightningStatsImporter {
nodeStats[channel.node2_pub].channels++;
}
if (channel.node1_policy !== undefined) { // Coming from the node
if (isHistorical === false) { // Coming from the node
for (const policy of [channel.node1_policy, channel.node2_policy]) {
if (policy && parseInt(policy.fee_rate_milli_msat, 10) < 5000) {
avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10);
@ -113,30 +144,42 @@ class LightningStatsImporter {
baseFees.push(parseInt(policy.fee_base_msat, 10));
}
}
} else { // Coming from the historical import
} else {
// @ts-ignore
if (channel.fee_rate_milli_msat < 5000) {
if (channel.node1_policy.fee_rate_milli_msat < 5000) {
// @ts-ignore
avgFeeRate += parseInt(channel.fee_rate_milli_msat, 10);
avgFeeRate += parseInt(channel.node1_policy.fee_rate_milli_msat, 10);
// @ts-ignore
feeRates.push(parseInt(channel.fee_rate_milli_msat), 10);
feeRates.push(parseInt(channel.node1_policy.fee_rate_milli_msat), 10);
}
// @ts-ignore
if (channel.fee_base_msat < 5000) {
if (channel.node1_policy.fee_base_msat < 5000) {
// @ts-ignore
avgBaseFee += parseInt(channel.fee_base_msat, 10);
avgBaseFee += parseInt(channel.node1_policy.fee_base_msat, 10);
// @ts-ignore
baseFees.push(parseInt(channel.fee_base_msat), 10);
baseFees.push(parseInt(channel.node1_policy.fee_base_msat), 10);
}
}
}
let medCapacity = 0;
let medFeeRate = 0;
let medBaseFee = 0;
let avgCapacity = 0;
avgFeeRate /= Math.max(networkGraph.edges.length, 1);
avgBaseFee /= Math.max(networkGraph.edges.length, 1);
const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
const avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
if (capacities.length > 0) {
medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)];
avgCapacity = Math.round(capacity / Math.max(capacities.length, 1));
}
if (feeRates.length > 0) {
medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)];
}
if (baseFees.length > 0) {
medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)];
}
let query = `INSERT INTO lightning_stats(
added,
@ -319,7 +362,7 @@ class LightningStatsImporter {
logger.debug(`Generating LN network stats for ${datestr}. Processed ${totalProcessed}/${fileList.length} files`);
}
await fundingTxFetcher.$fetchChannelsFundingTxs(graph.edges.map(channel => channel.channel_id.slice(0, -2)));
const stat = await this.computeNetworkStats(timestamp, graph);
const stat = await this.computeNetworkStats(timestamp, graph, true);
existingStatsTimestamps[timestamp] = stat;
}