Insert channels from historical data
This commit is contained in:
		
							parent
							
								
									1ef4485a26
								
							
						
					
					
						commit
						e7d99e9653
					
				@ -248,7 +248,6 @@ class DatabaseMigration {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (databaseSchemaVersion < 25 && isBitcoin === true) {
 | 
			
		||||
      await this.$executeQuery(`INSERT INTO state VALUES('last_node_stats', 0, '1970-01-01');`);
 | 
			
		||||
      await this.$executeQuery(this.getCreateLightningStatisticsQuery(), await this.$checkIfTableExists('lightning_stats'));
 | 
			
		||||
      await this.$executeQuery(this.getCreateNodesQuery(), await this.$checkIfTableExists('nodes'));
 | 
			
		||||
      await this.$executeQuery(this.getCreateChannelsQuery(), await this.$checkIfTableExists('channels'));
 | 
			
		||||
 | 
			
		||||
@ -61,9 +61,14 @@ class ChannelsApi {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $getChannelsByStatus(status: number): Promise<any[]> {
 | 
			
		||||
  public async $getChannelsByStatus(status: number | number[]): Promise<any[]> {
 | 
			
		||||
    try {
 | 
			
		||||
      const query = `SELECT * FROM channels WHERE status = ?`;
 | 
			
		||||
      let query: string;
 | 
			
		||||
      if (Array.isArray(status)) {
 | 
			
		||||
        query = `SELECT * FROM channels WHERE status IN (${status.join(',')})`;
 | 
			
		||||
      } else {
 | 
			
		||||
        query = `SELECT * FROM channels WHERE status = ?`;
 | 
			
		||||
      }
 | 
			
		||||
      const [rows]: any = await DB.query(query, [status]);
 | 
			
		||||
      return rows;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
@ -337,7 +342,7 @@ class ChannelsApi {
 | 
			
		||||
  /**
 | 
			
		||||
   * Save or update a channel present in the graph
 | 
			
		||||
   */
 | 
			
		||||
  public async $saveChannel(channel: ILightningApi.Channel): Promise<void> {
 | 
			
		||||
  public async $saveChannel(channel: ILightningApi.Channel, status = 1): Promise<void> {
 | 
			
		||||
    const [ txid, vout ] = channel.chan_point.split(':');
 | 
			
		||||
 | 
			
		||||
    const policy1: Partial<ILightningApi.RoutingPolicy> = channel.node1_policy || {};
 | 
			
		||||
@ -369,11 +374,11 @@ class ChannelsApi {
 | 
			
		||||
        node2_min_htlc_mtokens,
 | 
			
		||||
        node2_updated_at
 | 
			
		||||
      )
 | 
			
		||||
      VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 | 
			
		||||
      VALUES (?, ?, ?, ?, ?, ?, ${status}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 | 
			
		||||
      ON DUPLICATE KEY UPDATE
 | 
			
		||||
        capacity = ?,
 | 
			
		||||
        updated_at = ?,
 | 
			
		||||
        status = 1,
 | 
			
		||||
        status = ${status},
 | 
			
		||||
        node1_public_key = ?,
 | 
			
		||||
        node1_base_fee_mtokens = ?,
 | 
			
		||||
        node1_cltv_delta = ?,
 | 
			
		||||
 | 
			
		||||
@ -232,8 +232,8 @@ class NetworkSyncService {
 | 
			
		||||
    let progress = 0;
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      logger.info(`Starting closed channels scan...`);
 | 
			
		||||
      const channels = await channelsApi.$getChannelsByStatus(0);
 | 
			
		||||
      logger.info(`Starting closed channels scan`);
 | 
			
		||||
      const channels = await channelsApi.$getChannelsByStatus([0, 1]);
 | 
			
		||||
      for (const channel of channels) {
 | 
			
		||||
        const spendingTx = await bitcoinApi.$getOutspend(channel.transaction_id, channel.transaction_vout);
 | 
			
		||||
        if (spendingTx.spent === true && spendingTx.status?.confirmed === true) {
 | 
			
		||||
 | 
			
		||||
@ -5,6 +5,8 @@ import fundingTxFetcher from './funding-tx-fetcher';
 | 
			
		||||
import config from '../../../config';
 | 
			
		||||
import { ILightningApi } from '../../../api/lightning/lightning-api.interface';
 | 
			
		||||
import { isIP } from 'net';
 | 
			
		||||
import { Common } from '../../../api/common';
 | 
			
		||||
import channelsApi from '../../../api/explorer/channels.api';
 | 
			
		||||
 | 
			
		||||
const fsPromises = promises;
 | 
			
		||||
 | 
			
		||||
@ -22,7 +24,8 @@ class LightningStatsImporter {
 | 
			
		||||
  /**
 | 
			
		||||
   * Generate LN network stats for one day
 | 
			
		||||
   */
 | 
			
		||||
  public async computeNetworkStats(timestamp: number, networkGraph: ILightningApi.NetworkGraph): Promise<unknown> {
 | 
			
		||||
  public async computeNetworkStats(timestamp: number,
 | 
			
		||||
    networkGraph: ILightningApi.NetworkGraph, isHistorical: boolean = false): Promise<unknown> {
 | 
			
		||||
    // Node counts and network shares
 | 
			
		||||
    let clearnetNodes = 0;
 | 
			
		||||
    let torNodes = 0;
 | 
			
		||||
@ -66,18 +69,46 @@ class LightningStatsImporter {
 | 
			
		||||
    const baseFees: number[] = [];
 | 
			
		||||
    const alreadyCountedChannels = {};
 | 
			
		||||
    
 | 
			
		||||
    for (const channel of networkGraph.edges) {
 | 
			
		||||
      let short_id = channel.channel_id;
 | 
			
		||||
      if (short_id.indexOf('/') !== -1) {
 | 
			
		||||
        short_id = short_id.slice(0, -2);
 | 
			
		||||
    const [channelsInDbRaw]: any[] = await DB.query(`SELECT short_id, created FROM channels`);
 | 
			
		||||
    const channelsInDb = {};
 | 
			
		||||
    for (const channel of channelsInDbRaw) {
 | 
			
		||||
      channelsInDb[channel.short_id] = channel;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (const channel of networkGraph.edges) {
 | 
			
		||||
      const short_id = Common.channelIntegerIdToShortId(channel.channel_id);
 | 
			
		||||
 | 
			
		||||
      const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id);
 | 
			
		||||
      if (!tx) {
 | 
			
		||||
        logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`);
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // Channel is already in db, check if we need to update 'created' field
 | 
			
		||||
      if (isHistorical === true) {
 | 
			
		||||
        //@ts-ignore
 | 
			
		||||
        if (channelsInDb[short_id] && channel.timestamp < channel.created) {
 | 
			
		||||
          await DB.query(`
 | 
			
		||||
            UPDATE channels SET created = FROM_UNIXTIME(?) WHERE channels.short_id = ?`,
 | 
			
		||||
            //@ts-ignore
 | 
			
		||||
            [channel.timestamp, short_id]
 | 
			
		||||
          );
 | 
			
		||||
        } else if (!channelsInDb[short_id]) {
 | 
			
		||||
          await channelsApi.$saveChannel({
 | 
			
		||||
            channel_id: short_id,
 | 
			
		||||
            chan_point: `${tx.txid}:${short_id.split('x')[2]}`,
 | 
			
		||||
            //@ts-ignore
 | 
			
		||||
            last_update: channel.timestamp,
 | 
			
		||||
            node1_pub: channel.node1_pub,
 | 
			
		||||
            node2_pub: channel.node2_pub,
 | 
			
		||||
            capacity: (tx.value * 100000000).toString(),
 | 
			
		||||
            node1_policy: null,
 | 
			
		||||
            node2_policy: null,
 | 
			
		||||
          }, 0);
 | 
			
		||||
          channelsInDb[channel.channel_id] = channel;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (!nodeStats[channel.node1_pub]) {
 | 
			
		||||
        nodeStats[channel.node1_pub] = {
 | 
			
		||||
          capacity: 0,
 | 
			
		||||
@ -102,7 +133,7 @@ class LightningStatsImporter {
 | 
			
		||||
        nodeStats[channel.node2_pub].channels++;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (channel.node1_policy !== undefined) { // Coming from the node
 | 
			
		||||
      if (isHistorical === false) { // Coming from the node
 | 
			
		||||
        for (const policy of [channel.node1_policy, channel.node2_policy]) {
 | 
			
		||||
          if (policy && parseInt(policy.fee_rate_milli_msat, 10) < 5000) {
 | 
			
		||||
            avgFeeRate += parseInt(policy.fee_rate_milli_msat, 10);
 | 
			
		||||
@ -113,7 +144,7 @@ class LightningStatsImporter {
 | 
			
		||||
            baseFees.push(parseInt(policy.fee_base_msat, 10));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } else { // Coming from the historical import
 | 
			
		||||
      } else {
 | 
			
		||||
        // @ts-ignore
 | 
			
		||||
        if (channel.fee_rate_milli_msat < 5000) {
 | 
			
		||||
          // @ts-ignore
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user