Create and populate nodes_socket table
This commit is contained in:
		
							parent
							
								
									81ca980ccf
								
							
						
					
					
						commit
						34bd21aa8f
					
				@ -1,5 +1,7 @@
 | 
			
		||||
import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces';
 | 
			
		||||
import config from '../config';
 | 
			
		||||
import { NodeSocket } from '../repositories/NodesSocketsRepository';
 | 
			
		||||
import { isIP } from 'net';
 | 
			
		||||
export class Common {
 | 
			
		||||
  static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ?
 | 
			
		||||
    '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49'
 | 
			
		||||
@ -221,4 +223,35 @@ export class Common {
 | 
			
		||||
    const d = new Date((date || 0) * 1000);
 | 
			
		||||
    return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static formatSocket(publicKey: string, socket: {network: string, addr: string}): NodeSocket {
 | 
			
		||||
    let network: string | null = null;
 | 
			
		||||
 | 
			
		||||
    if (config.LIGHTNING.BACKEND === 'cln') {
 | 
			
		||||
      network = socket.network;
 | 
			
		||||
    } else if (config.LIGHTNING.BACKEND === 'lnd') {
 | 
			
		||||
      if (socket.addr.indexOf('onion') !== -1) {
 | 
			
		||||
        if (socket.addr.split('.')[0].length >= 56) {
 | 
			
		||||
          network = 'torv3';
 | 
			
		||||
        } else {
 | 
			
		||||
          network = 'torv2';
 | 
			
		||||
        }
 | 
			
		||||
      } else if (socket.addr.indexOf('i2p') !== -1) {
 | 
			
		||||
        network = 'i2p';
 | 
			
		||||
      } else {
 | 
			
		||||
        const ipv = isIP(socket.addr.split(':')[0]);
 | 
			
		||||
        if (ipv === 4) {
 | 
			
		||||
          network = 'ipv4';
 | 
			
		||||
        } else if (ipv === 6) {
 | 
			
		||||
          network = 'ipv6';
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return {
 | 
			
		||||
      publicKey: publicKey,
 | 
			
		||||
      network: network,
 | 
			
		||||
      addr: socket.addr,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ import logger from '../logger';
 | 
			
		||||
import { Common } from './common';
 | 
			
		||||
 | 
			
		||||
class DatabaseMigration {
 | 
			
		||||
  private static currentVersion = 36;
 | 
			
		||||
  private static currentVersion = 37;
 | 
			
		||||
  private queryTimeout = 120000;
 | 
			
		||||
  private statisticsAddedIndexed = false;
 | 
			
		||||
  private uniqueLogs: string[] = [];
 | 
			
		||||
@ -324,6 +324,10 @@ class DatabaseMigration {
 | 
			
		||||
    if (databaseSchemaVersion < 36 && isBitcoin == true) {
 | 
			
		||||
      await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"');
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (databaseSchemaVersion < 37 && isBitcoin == true) {
 | 
			
		||||
      await this.$executeQuery(this.getCreateLNNodesSocketsTableQuery(), await this.$checkIfTableExists('nodes_sockets'));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
@ -737,7 +741,7 @@ class DatabaseMigration {
 | 
			
		||||
      names text DEFAULT NULL,
 | 
			
		||||
      UNIQUE KEY id (id,type),
 | 
			
		||||
      KEY id_2 (id)
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getCreateBlocksPricesTableQuery(): string {
 | 
			
		||||
@ -749,6 +753,16 @@ class DatabaseMigration {
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getCreateLNNodesSocketsTableQuery(): string {
 | 
			
		||||
    return `CREATE TABLE IF NOT EXISTS nodes_sockets (
 | 
			
		||||
      public_key varchar(66) NOT NULL,
 | 
			
		||||
      socket varchar(100) NOT NULL,
 | 
			
		||||
      type enum('ipv4', 'ipv6', 'torv2', 'torv3', 'i2p', 'dns') NULL,
 | 
			
		||||
      UNIQUE KEY public_key_socket (public_key, socket),
 | 
			
		||||
      INDEX (public_key)
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $truncateIndexedData(tables: string[]) {
 | 
			
		||||
    const allowedTables = ['blocks', 'hashrates', 'prices'];
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,7 @@ export function convertNode(clNode: any): ILightningApi.Node {
 | 
			
		||||
        network: addr.type,
 | 
			
		||||
        addr: `${addr.address}:${addr.port}`
 | 
			
		||||
      };
 | 
			
		||||
    }),
 | 
			
		||||
    }) ?? [],
 | 
			
		||||
    last_update: clNode?.last_timestamp ?? 0,
 | 
			
		||||
  };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -82,4 +82,4 @@ export namespace ILightningApi {
 | 
			
		||||
    is_required: boolean;
 | 
			
		||||
    is_known: boolean;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
@ -1,4 +1,3 @@
 | 
			
		||||
import transactionUtils from '../api/transaction-utils';
 | 
			
		||||
import DB from '../database';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { BlockAudit } from '../mempool.interfaces';
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										45
									
								
								backend/src/repositories/NodesSocketsRepository.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								backend/src/repositories/NodesSocketsRepository.ts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,45 @@
 | 
			
		||||
import { ResultSetHeader } from 'mysql2';
 | 
			
		||||
import DB from '../database';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
 | 
			
		||||
export interface NodeSocket {
 | 
			
		||||
  publicKey: string;
 | 
			
		||||
  network: string | null;
 | 
			
		||||
  addr: string;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class NodesSocketsRepository {
 | 
			
		||||
  public async $saveSocket(socket: NodeSocket): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      await DB.query(`
 | 
			
		||||
        INSERT INTO nodes_sockets(public_key, socket, type)
 | 
			
		||||
        VALUE (?, ?, ?)
 | 
			
		||||
      `, [socket.publicKey, socket.addr, socket.network]);
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      if (e.errno !== 1062) { // ER_DUP_ENTRY - Not an issue, just ignore this
 | 
			
		||||
        logger.err(`Cannot save node socket (${[socket.publicKey, socket.addr, socket.network]}) into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
        // We don't throw, not a critical issue if we miss some nodes sockets
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   public async $deleteUnusedSockets(publicKey: string, addresses: string[]): Promise<number> {
 | 
			
		||||
    if (addresses.length === 0) {
 | 
			
		||||
      return 0;
 | 
			
		||||
    }
 | 
			
		||||
    try {
 | 
			
		||||
      const query = `
 | 
			
		||||
        DELETE FROM nodes_sockets
 | 
			
		||||
        WHERE public_key = ?
 | 
			
		||||
        AND socket NOT IN (${addresses.map(id => `"${id}"`).join(',')})
 | 
			
		||||
      `;
 | 
			
		||||
      const [result] = await DB.query<ResultSetHeader>(query, [publicKey]);
 | 
			
		||||
      return result.affectedRows;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.err(`Cannot delete unused sockets for ${publicKey} from db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
      return 0;
 | 
			
		||||
    }
 | 
			
		||||
   }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default new NodesSocketsRepository();
 | 
			
		||||
@ -10,6 +10,8 @@ import lightningApi from '../../api/lightning/lightning-api-factory';
 | 
			
		||||
import nodesApi from '../../api/explorer/nodes.api';
 | 
			
		||||
import { ResultSetHeader } from 'mysql2';
 | 
			
		||||
import fundingTxFetcher from './sync-tasks/funding-tx-fetcher';
 | 
			
		||||
import NodesSocketsRepository from '../../repositories/NodesSocketsRepository';
 | 
			
		||||
import { Common } from '../../api/common';
 | 
			
		||||
 | 
			
		||||
class NetworkSyncService {
 | 
			
		||||
  loggerTimer = 0;
 | 
			
		||||
@ -58,6 +60,7 @@ class NetworkSyncService {
 | 
			
		||||
  private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> {
 | 
			
		||||
    let progress = 0;
 | 
			
		||||
 | 
			
		||||
    let deletedSockets = 0;
 | 
			
		||||
    const graphNodesPubkeys: string[] = [];
 | 
			
		||||
    for (const node of nodes) {
 | 
			
		||||
      await nodesApi.$saveNode(node);
 | 
			
		||||
@ -69,8 +72,15 @@ class NetworkSyncService {
 | 
			
		||||
        logger.info(`Updating node ${progress}/${nodes.length}`);
 | 
			
		||||
        this.loggerTimer = new Date().getTime() / 1000;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const addresses: string[] = [];
 | 
			
		||||
      for (const socket of node.addresses) {
 | 
			
		||||
        await NodesSocketsRepository.$saveSocket(Common.formatSocket(node.pub_key, socket));
 | 
			
		||||
        addresses.push(socket.addr);
 | 
			
		||||
      }
 | 
			
		||||
      deletedSockets += await NodesSocketsRepository.$deleteUnusedSockets(node.pub_key, addresses);
 | 
			
		||||
    }
 | 
			
		||||
    logger.info(`${progress} nodes updated`);
 | 
			
		||||
    logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted`);
 | 
			
		||||
 | 
			
		||||
    // If a channel if not present in the graph, mark it as inactive
 | 
			
		||||
    nodesApi.$setNodesInactive(graphNodesPubkeys);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user