Merge pull request #2294 from mempool/nymkappa/feature/split-node-sockets
Create and populate nodes_socket table
This commit is contained in:
		
						commit
						0f4b11e455
					
				| @ -1,5 +1,7 @@ | ||||
| import { CpfpInfo, TransactionExtended, TransactionStripped } from '../mempool.interfaces'; | ||||
| import config from '../config'; | ||||
| import { NodeSocket } from '../repositories/NodesSocketsRepository'; | ||||
| import { isIP } from 'net'; | ||||
| export class Common { | ||||
|   static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? | ||||
|     '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' | ||||
| @ -221,4 +223,35 @@ export class Common { | ||||
|     const d = new Date((date || 0) * 1000); | ||||
|     return d.toISOString().split('T')[0] + ' ' + d.toTimeString().split(' ')[0]; | ||||
|   } | ||||
| 
 | ||||
|   static formatSocket(publicKey: string, socket: {network: string, addr: string}): NodeSocket { | ||||
|     let network: string | null = null; | ||||
| 
 | ||||
|     if (config.LIGHTNING.BACKEND === 'cln') { | ||||
|       network = socket.network; | ||||
|     } else if (config.LIGHTNING.BACKEND === 'lnd') { | ||||
|       if (socket.addr.indexOf('onion') !== -1) { | ||||
|         if (socket.addr.split('.')[0].length >= 56) { | ||||
|           network = 'torv3'; | ||||
|         } else { | ||||
|           network = 'torv2'; | ||||
|         } | ||||
|       } else if (socket.addr.indexOf('i2p') !== -1) { | ||||
|         network = 'i2p'; | ||||
|       } else { | ||||
|         const ipv = isIP(socket.addr.split(':')[0]); | ||||
|         if (ipv === 4) { | ||||
|           network = 'ipv4'; | ||||
|         } else if (ipv === 6) { | ||||
|           network = 'ipv6'; | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     return { | ||||
|       publicKey: publicKey, | ||||
|       network: network, | ||||
|       addr: socket.addr, | ||||
|     }; | ||||
|   } | ||||
| } | ||||
|  | ||||
| @ -4,7 +4,7 @@ import logger from '../logger'; | ||||
| import { Common } from './common'; | ||||
| 
 | ||||
| class DatabaseMigration { | ||||
|   private static currentVersion = 36; | ||||
|   private static currentVersion = 37; | ||||
|   private queryTimeout = 120000; | ||||
|   private statisticsAddedIndexed = false; | ||||
|   private uniqueLogs: string[] = []; | ||||
| @ -324,6 +324,10 @@ class DatabaseMigration { | ||||
|     if (databaseSchemaVersion < 36 && isBitcoin == true) { | ||||
|       await this.$executeQuery('ALTER TABLE `nodes` ADD status TINYINT NOT NULL DEFAULT "1"'); | ||||
|     } | ||||
| 
 | ||||
|     if (databaseSchemaVersion < 37 && isBitcoin == true) { | ||||
|       await this.$executeQuery(this.getCreateLNNodesSocketsTableQuery(), await this.$checkIfTableExists('nodes_sockets')); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
| @ -737,7 +741,7 @@ class DatabaseMigration { | ||||
|       names text DEFAULT NULL, | ||||
|       UNIQUE KEY id (id,type), | ||||
|       KEY id_2 (id) | ||||
|     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;` | ||||
|     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | ||||
|   } | ||||
| 
 | ||||
|   private getCreateBlocksPricesTableQuery(): string { | ||||
| @ -749,6 +753,16 @@ class DatabaseMigration { | ||||
|     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | ||||
|   } | ||||
| 
 | ||||
|   private getCreateLNNodesSocketsTableQuery(): string { | ||||
|     return `CREATE TABLE IF NOT EXISTS nodes_sockets (
 | ||||
|       public_key varchar(66) NOT NULL, | ||||
|       socket varchar(100) NOT NULL, | ||||
|       type enum('ipv4', 'ipv6', 'torv2', 'torv3', 'i2p', 'dns', 'websocket') NULL, | ||||
|       UNIQUE KEY public_key_socket (public_key, socket), | ||||
|       INDEX (public_key) | ||||
|     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | ||||
|   } | ||||
| 
 | ||||
|   public async $truncateIndexedData(tables: string[]) { | ||||
|     const allowedTables = ['blocks', 'hashrates', 'prices']; | ||||
| 
 | ||||
|  | ||||
| @ -17,7 +17,7 @@ export function convertNode(clNode: any): ILightningApi.Node { | ||||
|         network: addr.type, | ||||
|         addr: `${addr.address}:${addr.port}` | ||||
|       }; | ||||
|     }), | ||||
|     }) ?? [], | ||||
|     last_update: clNode?.last_timestamp ?? 0, | ||||
|   }; | ||||
| } | ||||
|  | ||||
| @ -1,4 +1,3 @@ | ||||
| import transactionUtils from '../api/transaction-utils'; | ||||
| import DB from '../database'; | ||||
| import logger from '../logger'; | ||||
| import { BlockAudit } from '../mempool.interfaces'; | ||||
|  | ||||
							
								
								
									
										45
									
								
								backend/src/repositories/NodesSocketsRepository.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								backend/src/repositories/NodesSocketsRepository.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,45 @@ | ||||
| import { ResultSetHeader } from 'mysql2'; | ||||
| import DB from '../database'; | ||||
| import logger from '../logger'; | ||||
| 
 | ||||
| export interface NodeSocket { | ||||
|   publicKey: string; | ||||
|   network: string | null; | ||||
|   addr: string; | ||||
| } | ||||
| 
 | ||||
| class NodesSocketsRepository { | ||||
|   public async $saveSocket(socket: NodeSocket): Promise<void> { | ||||
|     try { | ||||
|       await DB.query(` | ||||
|         INSERT INTO nodes_sockets(public_key, socket, type) | ||||
|         VALUE (?, ?, ?) | ||||
|       `, [socket.publicKey, socket.addr, socket.network]);
 | ||||
|     } catch (e: any) { | ||||
|       if (e.errno !== 1062) { // ER_DUP_ENTRY - Not an issue, just ignore this
 | ||||
|         logger.err(`Cannot save node socket (${[socket.publicKey, socket.addr, socket.network]}) into db. Reason: ` + (e instanceof Error ? e.message : e)); | ||||
|         // We don't throw, not a critical issue if we miss some nodes sockets
 | ||||
|       } | ||||
|     } | ||||
|    } | ||||
| 
 | ||||
|    public async $deleteUnusedSockets(publicKey: string, addresses: string[]): Promise<number> { | ||||
|     if (addresses.length === 0) { | ||||
|       return 0; | ||||
|     } | ||||
|     try { | ||||
|       const query = ` | ||||
|         DELETE FROM nodes_sockets | ||||
|         WHERE public_key = ? | ||||
|         AND socket NOT IN (${addresses.map(id => `"${id}"`).join(',')}) | ||||
|       `;
 | ||||
|       const [result] = await DB.query<ResultSetHeader>(query, [publicKey]); | ||||
|       return result.affectedRows; | ||||
|     } catch (e) { | ||||
|       logger.err(`Cannot delete unused sockets for ${publicKey} from db. Reason: ` + (e instanceof Error ? e.message : e)); | ||||
|       return 0; | ||||
|     } | ||||
|    } | ||||
| } | ||||
| 
 | ||||
| export default new NodesSocketsRepository(); | ||||
| @ -10,6 +10,8 @@ import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||
| import nodesApi from '../../api/explorer/nodes.api'; | ||||
| import { ResultSetHeader } from 'mysql2'; | ||||
| import fundingTxFetcher from './sync-tasks/funding-tx-fetcher'; | ||||
| import NodesSocketsRepository from '../../repositories/NodesSocketsRepository'; | ||||
| import { Common } from '../../api/common'; | ||||
| 
 | ||||
| class NetworkSyncService { | ||||
|   loggerTimer = 0; | ||||
| @ -58,6 +60,7 @@ class NetworkSyncService { | ||||
|   private async $updateNodesList(nodes: ILightningApi.Node[]): Promise<void> { | ||||
|     let progress = 0; | ||||
| 
 | ||||
|     let deletedSockets = 0; | ||||
|     const graphNodesPubkeys: string[] = []; | ||||
|     for (const node of nodes) { | ||||
|       await nodesApi.$saveNode(node); | ||||
| @ -69,8 +72,15 @@ class NetworkSyncService { | ||||
|         logger.info(`Updating node ${progress}/${nodes.length}`); | ||||
|         this.loggerTimer = new Date().getTime() / 1000; | ||||
|       } | ||||
| 
 | ||||
|       const addresses: string[] = []; | ||||
|       for (const socket of node.addresses) { | ||||
|         await NodesSocketsRepository.$saveSocket(Common.formatSocket(node.pub_key, socket)); | ||||
|         addresses.push(socket.addr); | ||||
|       } | ||||
|     logger.info(`${progress} nodes updated`); | ||||
|       deletedSockets += await NodesSocketsRepository.$deleteUnusedSockets(node.pub_key, addresses); | ||||
|     } | ||||
|     logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted`); | ||||
| 
 | ||||
|     // If a channel if not present in the graph, mark it as inactive
 | ||||
|     nodesApi.$setNodesInactive(graphNodesPubkeys); | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user