Merge branch 'master' into nymkappa/bugfix/top-nodes-queries
This commit is contained in:
		
						commit
						76600af698
					
				
							
								
								
									
										38
									
								
								backend/package-lock.json
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										38
									
								
								backend/package-lock.json
									
									
									
										generated
									
									
									
								
							| @ -31,6 +31,7 @@ | ||||
|         "@typescript-eslint/parser": "^5.30.5", | ||||
|         "eslint": "^8.19.0", | ||||
|         "eslint-config-prettier": "^8.5.0", | ||||
|         "fast-xml-parser": "^4.0.9", | ||||
|         "prettier": "^2.7.1" | ||||
|       } | ||||
|     }, | ||||
| @ -1496,6 +1497,22 @@ | ||||
|       "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", | ||||
|       "dev": true | ||||
|     }, | ||||
|     "node_modules/fast-xml-parser": { | ||||
|       "version": "4.0.9", | ||||
|       "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz", | ||||
|       "integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==", | ||||
|       "dev": true, | ||||
|       "dependencies": { | ||||
|         "strnum": "^1.0.5" | ||||
|       }, | ||||
|       "bin": { | ||||
|         "fxparser": "src/cli/cli.js" | ||||
|       }, | ||||
|       "funding": { | ||||
|         "type": "paypal", | ||||
|         "url": "https://paypal.me/naturalintelligence" | ||||
|       } | ||||
|     }, | ||||
|     "node_modules/fastq": { | ||||
|       "version": "1.13.0", | ||||
|       "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", | ||||
| @ -2665,6 +2682,12 @@ | ||||
|         "url": "https://github.com/sponsors/sindresorhus" | ||||
|       } | ||||
|     }, | ||||
|     "node_modules/strnum": { | ||||
|       "version": "1.0.5", | ||||
|       "resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz", | ||||
|       "integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==", | ||||
|       "dev": true | ||||
|     }, | ||||
|     "node_modules/text-table": { | ||||
|       "version": "0.2.0", | ||||
|       "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", | ||||
| @ -3973,6 +3996,15 @@ | ||||
|       "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", | ||||
|       "dev": true | ||||
|     }, | ||||
|     "fast-xml-parser": { | ||||
|       "version": "4.0.9", | ||||
|       "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.0.9.tgz", | ||||
|       "integrity": "sha512-4G8EzDg2Nb1Qurs3f7BpFV4+jpMVsdgLVuG1Uv8O2OHJfVCg7gcA53obuKbmVqzd4Y7YXVBK05oJG7hzGIdyzg==", | ||||
|       "dev": true, | ||||
|       "requires": { | ||||
|         "strnum": "^1.0.5" | ||||
|       } | ||||
|     }, | ||||
|     "fastq": { | ||||
|       "version": "1.13.0", | ||||
|       "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", | ||||
| @ -4817,6 +4849,12 @@ | ||||
|       "integrity": "sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==", | ||||
|       "dev": true | ||||
|     }, | ||||
|     "strnum": { | ||||
|       "version": "1.0.5", | ||||
|       "resolved": "https://registry.npmjs.org/strnum/-/strnum-1.0.5.tgz", | ||||
|       "integrity": "sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==", | ||||
|       "dev": true | ||||
|     }, | ||||
|     "text-table": { | ||||
|       "version": "0.2.0", | ||||
|       "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", | ||||
|  | ||||
| @ -37,6 +37,7 @@ | ||||
|     "bitcoinjs-lib": "6.0.1", | ||||
|     "crypto-js": "^4.0.0", | ||||
|     "express": "^4.18.0", | ||||
|     "fast-xml-parser": "^4.0.9", | ||||
|     "maxmind": "^4.3.6", | ||||
|     "mysql2": "2.3.3", | ||||
|     "node-worker-threads-pool": "^1.5.1", | ||||
|  | ||||
| @ -4,7 +4,7 @@ import logger from '../logger'; | ||||
| import { Common } from './common'; | ||||
| 
 | ||||
| class DatabaseMigration { | ||||
|   private static currentVersion = 33; | ||||
|   private static currentVersion = 34; | ||||
|   private queryTimeout = 120000; | ||||
|   private statisticsAddedIndexed = false; | ||||
|   private uniqueLogs: string[] = []; | ||||
| @ -311,6 +311,10 @@ class DatabaseMigration { | ||||
|     if (databaseSchemaVersion < 33 && isBitcoin == true) { | ||||
|       await this.$executeQuery('ALTER TABLE `geo_names` CHANGE `type` `type` enum("city","country","division","continent","as_organization", "country_iso_code") NOT NULL'); | ||||
|     } | ||||
| 
 | ||||
|     if (databaseSchemaVersion < 34 && isBitcoin == true) { | ||||
|       await this.$executeQuery('ALTER TABLE `lightning_stats` ADD clearnet_tor_nodes int(11) NOT NULL DEFAULT "0"'); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|  | ||||
							
								
								
									
										263
									
								
								backend/src/api/lightning/clightning/clightning-client.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										263
									
								
								backend/src/api/lightning/clightning/clightning-client.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,263 @@ | ||||
| // Imported from https://github.com/shesek/lightning-client-js
 | ||||
| 
 | ||||
| 'use strict'; | ||||
| 
 | ||||
| const methods = [ | ||||
|   'addgossip', | ||||
|   'autocleaninvoice', | ||||
|   'check', | ||||
|   'checkmessage', | ||||
|   'close', | ||||
|   'connect', | ||||
|   'createinvoice', | ||||
|   'createinvoicerequest', | ||||
|   'createoffer', | ||||
|   'createonion', | ||||
|   'decode', | ||||
|   'decodepay', | ||||
|   'delexpiredinvoice', | ||||
|   'delinvoice', | ||||
|   'delpay', | ||||
|   'dev-listaddrs', | ||||
|   'dev-rescan-outputs', | ||||
|   'disableoffer', | ||||
|   'disconnect', | ||||
|   'estimatefees', | ||||
|   'feerates', | ||||
|   'fetchinvoice', | ||||
|   'fundchannel', | ||||
|   'fundchannel_cancel', | ||||
|   'fundchannel_complete', | ||||
|   'fundchannel_start', | ||||
|   'fundpsbt', | ||||
|   'getchaininfo', | ||||
|   'getinfo', | ||||
|   'getlog', | ||||
|   'getrawblockbyheight', | ||||
|   'getroute', | ||||
|   'getsharedsecret', | ||||
|   'getutxout', | ||||
|   'help', | ||||
|   'invoice', | ||||
|   'keysend', | ||||
|   'legacypay', | ||||
|   'listchannels', | ||||
|   'listconfigs', | ||||
|   'listforwards', | ||||
|   'listfunds', | ||||
|   'listinvoices', | ||||
|   'listnodes', | ||||
|   'listoffers', | ||||
|   'listpays', | ||||
|   'listpeers', | ||||
|   'listsendpays', | ||||
|   'listtransactions', | ||||
|   'multifundchannel', | ||||
|   'multiwithdraw', | ||||
|   'newaddr', | ||||
|   'notifications', | ||||
|   'offer', | ||||
|   'offerout', | ||||
|   'openchannel_abort', | ||||
|   'openchannel_bump', | ||||
|   'openchannel_init', | ||||
|   'openchannel_signed', | ||||
|   'openchannel_update', | ||||
|   'pay', | ||||
|   'payersign', | ||||
|   'paystatus', | ||||
|   'ping', | ||||
|   'plugin', | ||||
|   'reserveinputs', | ||||
|   'sendinvoice', | ||||
|   'sendonion', | ||||
|   'sendonionmessage', | ||||
|   'sendpay', | ||||
|   'sendpsbt', | ||||
|   'sendrawtransaction', | ||||
|   'setchannelfee', | ||||
|   'signmessage', | ||||
|   'signpsbt', | ||||
|   'stop', | ||||
|   'txdiscard', | ||||
|   'txprepare', | ||||
|   'txsend', | ||||
|   'unreserveinputs', | ||||
|   'utxopsbt', | ||||
|   'waitanyinvoice', | ||||
|   'waitblockheight', | ||||
|   'waitinvoice', | ||||
|   'waitsendpay', | ||||
|   'withdraw' | ||||
| ]; | ||||
| 
 | ||||
| 
 | ||||
| import EventEmitter from 'events'; | ||||
| import { existsSync, statSync } from 'fs'; | ||||
| import { createConnection, Socket } from 'net'; | ||||
| import { homedir } from 'os'; | ||||
| import path from 'path'; | ||||
| import { createInterface, Interface } from 'readline'; | ||||
| import logger from '../../../logger'; | ||||
| import { AbstractLightningApi } from '../lightning-api-abstract-factory'; | ||||
| import { ILightningApi } from '../lightning-api.interface'; | ||||
| import { convertAndmergeBidirectionalChannels, convertNode } from './clightning-convert'; | ||||
| 
 | ||||
| class LightningError extends Error { | ||||
|   type: string = 'lightning'; | ||||
|   message: string = 'lightning-client error'; | ||||
| 
 | ||||
|   constructor(error) { | ||||
|     super(); | ||||
|     this.type = error.type; | ||||
|     this.message = error.message; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| const defaultRpcPath = path.join(homedir(), '.lightning') | ||||
|   , fStat = (...p) => statSync(path.join(...p)) | ||||
|   , fExists = (...p) => existsSync(path.join(...p)) | ||||
| 
 | ||||
| export default class CLightningClient extends EventEmitter implements AbstractLightningApi { | ||||
|   private rpcPath: string; | ||||
|   private reconnectWait: number; | ||||
|   private reconnectTimeout; | ||||
|   private reqcount: number; | ||||
|   private client: Socket; | ||||
|   private rl: Interface; | ||||
|   private clientConnectionPromise: Promise<unknown>; | ||||
| 
 | ||||
|   constructor(rpcPath = defaultRpcPath) { | ||||
|     if (!path.isAbsolute(rpcPath)) { | ||||
|       throw new Error('The rpcPath must be an absolute path'); | ||||
|     } | ||||
| 
 | ||||
|     if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) { | ||||
|       // network directory provided, use the lightning-rpc within in
 | ||||
|       if (fExists(rpcPath, 'lightning-rpc')) { | ||||
|         rpcPath = path.join(rpcPath, 'lightning-rpc'); | ||||
|       } | ||||
| 
 | ||||
|       // main data directory provided, default to using the bitcoin mainnet subdirectory
 | ||||
|       // to be removed in v0.2.0
 | ||||
|       else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) { | ||||
|         logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`) | ||||
|         logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`) | ||||
|         rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc') | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     logger.debug(`[CLightningClient] Connecting to ${rpcPath}`); | ||||
| 
 | ||||
|     super(); | ||||
|     this.rpcPath = rpcPath; | ||||
|     this.reconnectWait = 0.5; | ||||
|     this.reconnectTimeout = null; | ||||
|     this.reqcount = 0; | ||||
| 
 | ||||
|     const _self = this; | ||||
| 
 | ||||
|     this.client = createConnection(rpcPath); | ||||
|     this.rl = createInterface({ input: this.client }) | ||||
| 
 | ||||
|     this.clientConnectionPromise = new Promise<void>(resolve => { | ||||
|       _self.client.on('connect', () => { | ||||
|         logger.info(`[CLightningClient] Lightning client connected`); | ||||
|         _self.reconnectWait = 1; | ||||
|         resolve(); | ||||
|       }); | ||||
| 
 | ||||
|       _self.client.on('end', () => { | ||||
|         logger.err('[CLightningClient] Lightning client connection closed, reconnecting'); | ||||
|         _self.increaseWaitTime(); | ||||
|         _self.reconnect(); | ||||
|       }); | ||||
| 
 | ||||
|       _self.client.on('error', error => { | ||||
|         logger.err(`[CLightningClient] Lightning client connection error: ${error}`); | ||||
|         _self.emit('error', error); | ||||
|         _self.increaseWaitTime(); | ||||
|         _self.reconnect(); | ||||
|       }); | ||||
|     }); | ||||
| 
 | ||||
|     this.rl.on('line', line => { | ||||
|       line = line.trim(); | ||||
|       if (!line) { | ||||
|         return; | ||||
|       } | ||||
|       const data = JSON.parse(line); | ||||
|       // logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`);
 | ||||
|       _self.emit('res:' + data.id, data); | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   increaseWaitTime(): void { | ||||
|     if (this.reconnectWait >= 16) { | ||||
|       this.reconnectWait = 16; | ||||
|     } else { | ||||
|       this.reconnectWait *= 2; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   reconnect(): void { | ||||
|     const _self = this; | ||||
| 
 | ||||
|     if (this.reconnectTimeout) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     this.reconnectTimeout = setTimeout(() => { | ||||
|       logger.debug('[CLightningClient] Trying to reconnect...'); | ||||
| 
 | ||||
|       _self.client.connect(_self.rpcPath); | ||||
|       _self.reconnectTimeout = null; | ||||
|     }, this.reconnectWait * 1000); | ||||
|   } | ||||
| 
 | ||||
|   call(method, args = []): Promise<any> { | ||||
|     const _self = this; | ||||
| 
 | ||||
|     const callInt = ++this.reqcount; | ||||
|     const sendObj = { | ||||
|       jsonrpc: '2.0', | ||||
|       method, | ||||
|       params: args, | ||||
|       id: '' + callInt | ||||
|     }; | ||||
| 
 | ||||
|     // logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`);
 | ||||
| 
 | ||||
|     // Wait for the client to connect
 | ||||
|     return this.clientConnectionPromise | ||||
|       .then(() => new Promise((resolve, reject) => { | ||||
|         // Wait for a response
 | ||||
|         this.once('res:' + callInt, res => res.error == null | ||||
|           ? resolve(res.result) | ||||
|           : reject(new LightningError(res.error)) | ||||
|         ); | ||||
| 
 | ||||
|         // Send the command
 | ||||
|         _self.client.write(JSON.stringify(sendObj)); | ||||
|       })); | ||||
|   } | ||||
| 
 | ||||
|   async $getNetworkGraph(): Promise<ILightningApi.NetworkGraph> { | ||||
|     const listnodes: any[] = await this.call('listnodes'); | ||||
|     const listchannels: any[] = await this.call('listchannels'); | ||||
|     const channelsList = await convertAndmergeBidirectionalChannels(listchannels['channels']); | ||||
| 
 | ||||
|     return { | ||||
|       nodes: listnodes['nodes'].map(node => convertNode(node)), | ||||
|       edges: channelsList, | ||||
|     }; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase()); | ||||
| 
 | ||||
| methods.forEach(k => { | ||||
|   CLightningClient.prototype[protify(k)] = function (...args: any) { | ||||
|     return this.call(k, args); | ||||
|   }; | ||||
| }); | ||||
							
								
								
									
										112
									
								
								backend/src/api/lightning/clightning/clightning-convert.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										112
									
								
								backend/src/api/lightning/clightning/clightning-convert.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,112 @@ | ||||
| import { ILightningApi } from '../lightning-api.interface'; | ||||
| import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher'; | ||||
| 
 | ||||
| /** | ||||
|  * Convert a clightning "listnode" entry to a lnd node entry | ||||
|  */ | ||||
| export function convertNode(clNode: any): ILightningApi.Node { | ||||
|   return { | ||||
|     alias: clNode.alias ?? '', | ||||
|     color: `#${clNode.color ?? ''}`, | ||||
|     features: [], // TODO parse and return clNode.feature
 | ||||
|     pub_key: clNode.nodeid, | ||||
|     addresses: clNode.addresses?.map((addr) => { | ||||
|       return { | ||||
|         network: addr.type, | ||||
|         addr: `${addr.address}:${addr.port}` | ||||
|       }; | ||||
|     }), | ||||
|     last_update: clNode?.last_timestamp ?? 0, | ||||
|   }; | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * Convert clightning "listchannels" response to lnd "describegraph.edges" format | ||||
|  */ | ||||
|  export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> { | ||||
|   const consolidatedChannelList: ILightningApi.Channel[] = []; | ||||
|   const clChannelsDict = {}; | ||||
|   const clChannelsDictCount = {}; | ||||
| 
 | ||||
|   for (const clChannel of clChannels) {     | ||||
|     if (!clChannelsDict[clChannel.short_channel_id]) { | ||||
|       clChannelsDict[clChannel.short_channel_id] = clChannel; | ||||
|       clChannelsDictCount[clChannel.short_channel_id] = 1; | ||||
|     } else { | ||||
|       consolidatedChannelList.push( | ||||
|         await buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id]) | ||||
|       ); | ||||
|       delete clChannelsDict[clChannel.short_channel_id]; | ||||
|       clChannelsDictCount[clChannel.short_channel_id]++; | ||||
|     } | ||||
|   } | ||||
|   for (const short_channel_id of Object.keys(clChannelsDict)) { | ||||
|     consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id])); | ||||
|   } | ||||
| 
 | ||||
|   return consolidatedChannelList; | ||||
| } | ||||
| 
 | ||||
| export function convertChannelId(channelId): string { | ||||
|   const s = channelId.split('x').map(part => parseInt(part)); | ||||
|   return BigInt((s[0] << 40) | (s[1] << 16) | s[2]).toString(); | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * Convert two clightning "getchannels" entries into a full a lnd "describegraph.edges" format | ||||
|  * In this case, clightning knows the channel policy for both nodes | ||||
|  */ | ||||
| async function buildFullChannel(clChannelA: any, clChannelB: any): Promise<ILightningApi.Channel> { | ||||
|   const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0); | ||||
| 
 | ||||
|   const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannelA.short_channel_id); | ||||
|   const parts = clChannelA.short_channel_id.split('x'); | ||||
|   const outputIdx = parts[2]; | ||||
| 
 | ||||
|   return { | ||||
|     channel_id: clChannelA.short_channel_id, | ||||
|     capacity: clChannelA.satoshis, | ||||
|     last_update: lastUpdate, | ||||
|     node1_policy: convertPolicy(clChannelA), | ||||
|     node2_policy: convertPolicy(clChannelB), | ||||
|     chan_point: `${tx.txid}:${outputIdx}`, | ||||
|     node1_pub: clChannelA.source, | ||||
|     node2_pub: clChannelB.source, | ||||
|   }; | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format | ||||
|  * In this case, clightning knows the channel policy of only one node | ||||
|  */ | ||||
|  async function buildIncompleteChannel(clChannel: any): Promise<ILightningApi.Channel> { | ||||
|   const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id); | ||||
|   const parts = clChannel.short_channel_id.split('x'); | ||||
|   const outputIdx = parts[2]; | ||||
| 
 | ||||
|   return { | ||||
|     channel_id: clChannel.short_channel_id, | ||||
|     capacity: clChannel.satoshis, | ||||
|     last_update: clChannel.last_update ?? 0, | ||||
|     node1_policy: convertPolicy(clChannel), | ||||
|     node2_policy: null, | ||||
|     chan_point: `${tx.txid}:${outputIdx}`, | ||||
|     node1_pub: clChannel.source, | ||||
|     node2_pub: clChannel.destination, | ||||
|   }; | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * Convert a clightning "listnode" response to a lnd channel policy format | ||||
|  */ | ||||
|  function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy { | ||||
|   return { | ||||
|     time_lock_delta: 0, // TODO
 | ||||
|     min_htlc: clChannel.htlc_minimum_msat.slice(0, -4), | ||||
|     max_htlc_msat: clChannel.htlc_maximum_msat.slice(0, -4), | ||||
|     fee_base_msat: clChannel.base_fee_millisatoshi, | ||||
|     fee_rate_milli_msat: clChannel.fee_per_millionth, | ||||
|     disabled: !clChannel.active, | ||||
|     last_update: clChannel.last_update ?? 0, | ||||
|   }; | ||||
| } | ||||
| @ -1,7 +1,5 @@ | ||||
| import { ILightningApi } from './lightning-api.interface'; | ||||
| 
 | ||||
| export interface AbstractLightningApi { | ||||
|   $getNetworkInfo(): Promise<ILightningApi.NetworkInfo>; | ||||
|   $getNetworkGraph(): Promise<ILightningApi.NetworkGraph>; | ||||
|   $getInfo(): Promise<ILightningApi.Info>; | ||||
| } | ||||
|  | ||||
| @ -1,9 +1,12 @@ | ||||
| import config from '../../config'; | ||||
| import CLightningClient from './clightning/clightning-client'; | ||||
| import { AbstractLightningApi } from './lightning-api-abstract-factory'; | ||||
| import LndApi from './lnd/lnd-api'; | ||||
| 
 | ||||
| function lightningApiFactory(): AbstractLightningApi { | ||||
|   switch (config.LIGHTNING.BACKEND) { | ||||
|   switch (config.LIGHTNING.ENABLED === true && config.LIGHTNING.BACKEND) { | ||||
|     case 'cln': | ||||
|       return new CLightningClient(config.CLIGHTNING.SOCKET); | ||||
|     case 'lnd': | ||||
|     default: | ||||
|       return new LndApi(); | ||||
|  | ||||
| @ -31,12 +31,16 @@ interface IConfig { | ||||
|   LIGHTNING: { | ||||
|     ENABLED: boolean; | ||||
|     BACKEND: 'lnd' | 'cln' | 'ldk'; | ||||
|     TOPOLOGY_FOLDER: string; | ||||
|   }; | ||||
|   LND: { | ||||
|     TLS_CERT_PATH: string; | ||||
|     MACAROON_PATH: string; | ||||
|     REST_API_URL: string; | ||||
|   }; | ||||
|   CLIGHTNING: { | ||||
|     SOCKET: string; | ||||
|   }; | ||||
|   ELECTRUM: { | ||||
|     HOST: string; | ||||
|     PORT: number; | ||||
| @ -177,13 +181,17 @@ const defaults: IConfig = { | ||||
|   }, | ||||
|   'LIGHTNING': { | ||||
|     'ENABLED': false, | ||||
|     'BACKEND': 'lnd' | ||||
|     'BACKEND': 'lnd', | ||||
|     'TOPOLOGY_FOLDER': '', | ||||
|   }, | ||||
|   'LND': { | ||||
|     'TLS_CERT_PATH': '', | ||||
|     'MACAROON_PATH': '', | ||||
|     'REST_API_URL': 'https://localhost:8080', | ||||
|   }, | ||||
|   'CLIGHTNING': { | ||||
|     'SOCKET': '', | ||||
|   }, | ||||
|   'SOCKS5PROXY': { | ||||
|     'ENABLED': false, | ||||
|     'USE_ONION': true, | ||||
| @ -224,6 +232,7 @@ class Config implements IConfig { | ||||
|   BISQ: IConfig['BISQ']; | ||||
|   LIGHTNING: IConfig['LIGHTNING']; | ||||
|   LND: IConfig['LND']; | ||||
|   CLIGHTNING: IConfig['CLIGHTNING']; | ||||
|   SOCKS5PROXY: IConfig['SOCKS5PROXY']; | ||||
|   PRICE_DATA_SERVER: IConfig['PRICE_DATA_SERVER']; | ||||
|   EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER']; | ||||
| @ -242,6 +251,7 @@ class Config implements IConfig { | ||||
|     this.BISQ = configs.BISQ; | ||||
|     this.LIGHTNING = configs.LIGHTNING; | ||||
|     this.LND = configs.LND; | ||||
|     this.CLIGHTNING = configs.CLIGHTNING; | ||||
|     this.SOCKS5PROXY = configs.SOCKS5PROXY; | ||||
|     this.PRICE_DATA_SERVER = configs.PRICE_DATA_SERVER; | ||||
|     this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER; | ||||
|  | ||||
| @ -28,12 +28,13 @@ import nodesRoutes from './api/explorer/nodes.routes'; | ||||
| import channelsRoutes from './api/explorer/channels.routes'; | ||||
| import generalLightningRoutes from './api/explorer/general.routes'; | ||||
| import lightningStatsUpdater from './tasks/lightning/stats-updater.service'; | ||||
| import nodeSyncService from './tasks/lightning/node-sync.service'; | ||||
| import statisticsRoutes from "./api/statistics/statistics.routes"; | ||||
| import miningRoutes from "./api/mining/mining-routes"; | ||||
| import bisqRoutes from "./api/bisq/bisq.routes"; | ||||
| import liquidRoutes from "./api/liquid/liquid.routes"; | ||||
| import bitcoinRoutes from "./api/bitcoin/bitcoin.routes"; | ||||
| import networkSyncService from './tasks/lightning/network-sync.service'; | ||||
| import statisticsRoutes from './api/statistics/statistics.routes'; | ||||
| import miningRoutes from './api/mining/mining-routes'; | ||||
| import bisqRoutes from './api/bisq/bisq.routes'; | ||||
| import liquidRoutes from './api/liquid/liquid.routes'; | ||||
| import bitcoinRoutes from './api/bitcoin/bitcoin.routes'; | ||||
| import fundingTxFetcher from "./tasks/lightning/sync-tasks/funding-tx-fetcher"; | ||||
| 
 | ||||
| class Server { | ||||
|   private wss: WebSocket.Server | undefined; | ||||
| @ -136,8 +137,9 @@ class Server { | ||||
|     } | ||||
| 
 | ||||
|     if (config.LIGHTNING.ENABLED) { | ||||
|       nodeSyncService.$startService() | ||||
|         .then(() => lightningStatsUpdater.$startService()); | ||||
|       fundingTxFetcher.$init() | ||||
|       .then(() => networkSyncService.$startService()) | ||||
|       .then(() => lightningStatsUpdater.$startService()); | ||||
|     } | ||||
| 
 | ||||
|     this.server.listen(config.MEMPOOL.HTTP_PORT, () => { | ||||
|  | ||||
| @ -5,11 +5,12 @@ import bitcoinClient from '../../api/bitcoin/bitcoin-client'; | ||||
| import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory'; | ||||
| import config from '../../config'; | ||||
| import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface'; | ||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||
| import { ILightningApi } from '../../api/lightning/lightning-api.interface'; | ||||
| import { $lookupNodeLocation } from './sync-tasks/node-locations'; | ||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||
| import { convertChannelId } from '../../api/lightning/clightning/clightning-convert'; | ||||
| 
 | ||||
| class NodeSyncService { | ||||
| class NetworkSyncService { | ||||
|   constructor() {} | ||||
| 
 | ||||
|   public async $startService() { | ||||
| @ -27,6 +28,11 @@ class NodeSyncService { | ||||
|       logger.info(`Updating nodes and channels...`); | ||||
| 
 | ||||
|       const networkGraph = await lightningApi.$getNetworkGraph(); | ||||
|       if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) { | ||||
|         logger.info(`LN Network graph is empty, retrying in 10 seconds`); | ||||
|         setTimeout(this.$runUpdater, 10000); | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       for (const node of networkGraph.nodes) { | ||||
|         await this.$saveNode(node); | ||||
| @ -320,7 +326,7 @@ class NodeSyncService { | ||||
|         ;`;
 | ||||
| 
 | ||||
|       await DB.query(query, [ | ||||
|         channel.channel_id, | ||||
|         this.toIntegerId(channel.channel_id), | ||||
|         this.toShortId(channel.channel_id), | ||||
|         channel.capacity, | ||||
|         txid, | ||||
| @ -375,6 +381,10 @@ class NodeSyncService { | ||||
|   } | ||||
| 
 | ||||
|   private async $setChannelsInactive(graphChannelsIds: string[]): Promise<void> { | ||||
|     if (graphChannelsIds.length === 0) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     try { | ||||
|       await DB.query(` | ||||
|         UPDATE channels | ||||
| @ -391,8 +401,7 @@ class NodeSyncService { | ||||
| 
 | ||||
|   private async $saveNode(node: ILightningApi.Node): Promise<void> { | ||||
|     try { | ||||
|       const updatedAt = this.utcDateToMysql(node.last_update); | ||||
|       const sockets = node.addresses.map(a => a.addr).join(','); | ||||
|       const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? ''; | ||||
|       const query = `INSERT INTO nodes(
 | ||||
|           public_key, | ||||
|           first_seen, | ||||
| @ -401,15 +410,16 @@ class NodeSyncService { | ||||
|           color, | ||||
|           sockets | ||||
|         ) | ||||
|         VALUES (?, NOW(), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?, sockets = ?;`;
 | ||||
|         VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?) | ||||
|         ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?`;
 | ||||
| 
 | ||||
|       await DB.query(query, [ | ||||
|         node.pub_key, | ||||
|         updatedAt, | ||||
|         node.last_update, | ||||
|         node.alias, | ||||
|         node.color, | ||||
|         sockets, | ||||
|         updatedAt, | ||||
|         node.last_update, | ||||
|         node.alias, | ||||
|         node.color, | ||||
|         sockets, | ||||
| @ -419,8 +429,19 @@ class NodeSyncService { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private toIntegerId(id: string): string { | ||||
|     if (config.LIGHTNING.BACKEND === 'lnd') { | ||||
|       return id; | ||||
|     } | ||||
|     return convertChannelId(id); | ||||
|   } | ||||
| 
 | ||||
|   /** Decodes a channel id returned by lnd as uint64 to a short channel id */ | ||||
|   private toShortId(id: string): string { | ||||
|     if (config.LIGHTNING.BACKEND === 'cln') { | ||||
|       return id; | ||||
|     } | ||||
| 
 | ||||
|     const n = BigInt(id); | ||||
|     return [ | ||||
|       n >> 40n, // nth block
 | ||||
| @ -435,4 +456,4 @@ class NodeSyncService { | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| export default new NodeSyncService(); | ||||
| export default new NetworkSyncService(); | ||||
| @ -1,35 +1,14 @@ | ||||
| 
 | ||||
| import DB from '../../database'; | ||||
| import logger from '../../logger'; | ||||
| import lightningApi from '../../api/lightning/lightning-api-factory'; | ||||
| import channelsApi from '../../api/explorer/channels.api'; | ||||
| import * as net from 'net'; | ||||
| import LightningStatsImporter from './sync-tasks/stats-importer'; | ||||
| 
 | ||||
| class LightningStatsUpdater { | ||||
|   hardCodedStartTime = '2018-01-12'; | ||||
| 
 | ||||
|   public async $startService() { | ||||
|   public async $startService(): Promise<void> { | ||||
|     logger.info('Starting Lightning Stats service'); | ||||
|     let isInSync = false; | ||||
|     let error: any; | ||||
|     try { | ||||
|       error = null; | ||||
|       isInSync = await this.$lightningIsSynced(); | ||||
|     } catch (e) { | ||||
|       error = e; | ||||
|     } | ||||
|     if (!isInSync) { | ||||
|       if (error) { | ||||
|         logger.warn('Was not able to fetch Lightning Node status: ' + (error instanceof Error ? error.message : error) + '. Retrying in 1 minute...'); | ||||
|       } else { | ||||
|         logger.notice('The Lightning graph is not yet in sync. Retrying in 1 minute...'); | ||||
|       } | ||||
|       setTimeout(() => this.$startService(), 60 * 1000); | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     await this.$populateHistoricalStatistics(); | ||||
|     await this.$populateHistoricalNodeStatistics(); | ||||
|     LightningStatsImporter.$run(); | ||||
| 
 | ||||
|     setTimeout(() => { | ||||
|       this.$runTasks(); | ||||
| @ -50,298 +29,22 @@ class LightningStatsUpdater { | ||||
|     date.setUTCMilliseconds(0); | ||||
|   } | ||||
| 
 | ||||
|   private async $lightningIsSynced(): Promise<boolean> { | ||||
|     const nodeInfo = await lightningApi.$getInfo(); | ||||
|     return nodeInfo.synced_to_chain && nodeInfo.synced_to_graph; | ||||
|   } | ||||
| 
 | ||||
|   private async $runTasks(): Promise<void> { | ||||
|     await this.$logLightningStatsDaily(); | ||||
|     await this.$logNodeStatsDaily(); | ||||
|     await this.$logStatsDaily(); | ||||
| 
 | ||||
|     setTimeout(() => { | ||||
|       this.$runTasks(); | ||||
|     }, this.timeUntilMidnight()); | ||||
|   } | ||||
| 
 | ||||
|   private async $logLightningStatsDaily() { | ||||
|     try { | ||||
|       logger.info(`Running lightning daily stats log...`); | ||||
|   private async $logStatsDaily(): Promise<void> { | ||||
|     const date = new Date(); | ||||
|     this.setDateMidnight(date); | ||||
|     date.setUTCHours(24); | ||||
| 
 | ||||
|       const networkGraph = await lightningApi.$getNetworkGraph(); | ||||
|       let total_capacity = 0; | ||||
|       for (const channel of networkGraph.edges) { | ||||
|         if (channel.capacity) { | ||||
|           total_capacity += parseInt(channel.capacity); | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       let clearnetNodes = 0; | ||||
|       let torNodes = 0; | ||||
|       let unannouncedNodes = 0; | ||||
|       for (const node of networkGraph.nodes) { | ||||
|         for (const socket of node.addresses) { | ||||
|           const hasOnion = socket.addr.indexOf('.onion') !== -1; | ||||
|           if (hasOnion) { | ||||
|             torNodes++; | ||||
|           } | ||||
|           const hasClearnet = [4, 6].includes(net.isIP(socket.addr.split(':')[0])); | ||||
|           if (hasClearnet) { | ||||
|             clearnetNodes++; | ||||
|           } | ||||
|         } | ||||
|         if (node.addresses.length === 0) { | ||||
|           unannouncedNodes++; | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       const channelStats = await channelsApi.$getChannelsStats(); | ||||
| 
 | ||||
|       const query = `INSERT INTO lightning_stats(
 | ||||
|           added, | ||||
|           channel_count, | ||||
|           node_count, | ||||
|           total_capacity, | ||||
|           tor_nodes, | ||||
|           clearnet_nodes, | ||||
|           unannounced_nodes, | ||||
|           avg_capacity, | ||||
|           avg_fee_rate, | ||||
|           avg_base_fee_mtokens, | ||||
|           med_capacity, | ||||
|           med_fee_rate, | ||||
|           med_base_fee_mtokens | ||||
|         ) | ||||
|         VALUES (NOW() - INTERVAL 1 DAY, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
 | ||||
| 
 | ||||
|       await DB.query(query, [ | ||||
|         networkGraph.edges.length, | ||||
|         networkGraph.nodes.length, | ||||
|         total_capacity, | ||||
|         torNodes, | ||||
|         clearnetNodes, | ||||
|         unannouncedNodes, | ||||
|         channelStats.avgCapacity, | ||||
|         channelStats.avgFeeRate, | ||||
|         channelStats.avgBaseFee, | ||||
|         channelStats.medianCapacity, | ||||
|         channelStats.medianFeeRate, | ||||
|         channelStats.medianBaseFee, | ||||
|       ]); | ||||
|       logger.info(`Lightning daily stats done.`); | ||||
|     } catch (e) { | ||||
|       logger.err('$logLightningStatsDaily() error: ' + (e instanceof Error ? e.message : e)); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $logNodeStatsDaily() { | ||||
|     try { | ||||
|       logger.info(`Running daily node stats update...`); | ||||
| 
 | ||||
|       const query = ` | ||||
|         SELECT nodes.public_key, c1.channels_count_left, c2.channels_count_right, c1.channels_capacity_left, | ||||
|           c2.channels_capacity_right | ||||
|         FROM nodes | ||||
|         LEFT JOIN ( | ||||
|           SELECT node1_public_key, COUNT(id) AS channels_count_left, SUM(capacity) AS channels_capacity_left | ||||
|           FROM channels | ||||
|           WHERE channels.status = 1 | ||||
|           GROUP BY node1_public_key | ||||
|         ) c1 ON c1.node1_public_key = nodes.public_key | ||||
|         LEFT JOIN ( | ||||
|           SELECT node2_public_key, COUNT(id) AS channels_count_right, SUM(capacity) AS channels_capacity_right | ||||
|           FROM channels WHERE channels.status = 1 GROUP BY node2_public_key | ||||
|         ) c2 ON c2.node2_public_key = nodes.public_key | ||||
|       `;
 | ||||
|        | ||||
|       const [nodes]: any = await DB.query(query); | ||||
| 
 | ||||
|       for (const node of nodes) { | ||||
|         await DB.query( | ||||
|           `INSERT INTO node_stats(public_key, added, capacity, channels) VALUES (?, NOW() - INTERVAL 1 DAY, ?, ?)`, | ||||
|           [node.public_key, (parseInt(node.channels_capacity_left || 0, 10)) + (parseInt(node.channels_capacity_right || 0, 10)), | ||||
|             node.channels_count_left + node.channels_count_right]); | ||||
|       } | ||||
|       logger.info('Daily node stats has updated.'); | ||||
|     } catch (e) { | ||||
|       logger.err('$logNodeStatsDaily() error: ' + (e instanceof Error ? e.message : e)); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   // We only run this on first launch
 | ||||
|   private async $populateHistoricalStatistics() { | ||||
|     try { | ||||
|       const [rows]: any = await DB.query(`SELECT COUNT(*) FROM lightning_stats`); | ||||
|       // Only run if table is empty
 | ||||
|       if (rows[0]['COUNT(*)'] > 0) { | ||||
|         return; | ||||
|       } | ||||
|       logger.info(`Running historical stats population...`); | ||||
| 
 | ||||
|       const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels ORDER BY created ASC`); | ||||
|       const [nodes]: any = await DB.query(`SELECT first_seen, sockets FROM nodes ORDER BY first_seen ASC`); | ||||
| 
 | ||||
|       const date: Date = new Date(this.hardCodedStartTime); | ||||
|       const currentDate = new Date(); | ||||
|       this.setDateMidnight(currentDate); | ||||
| 
 | ||||
|       while (date < currentDate) { | ||||
|         let totalCapacity = 0; | ||||
|         let channelsCount = 0; | ||||
| 
 | ||||
|         for (const channel of channels) { | ||||
|           if (new Date(channel.created) > date) { | ||||
|             break; | ||||
|           } | ||||
|           if (channel.closing_date === null || new Date(channel.closing_date) > date) { | ||||
|             totalCapacity += channel.capacity; | ||||
|             channelsCount++; | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|         let nodeCount = 0; | ||||
|         let clearnetNodes = 0; | ||||
|         let torNodes = 0; | ||||
|         let unannouncedNodes = 0; | ||||
| 
 | ||||
|         for (const node of nodes) { | ||||
|           if (new Date(node.first_seen) > date) { | ||||
|             break; | ||||
|           } | ||||
|           nodeCount++; | ||||
| 
 | ||||
|           const sockets = node.sockets.split(','); | ||||
|           let isUnnanounced = true; | ||||
|           for (const socket of sockets) { | ||||
|             const hasOnion = socket.indexOf('.onion') !== -1; | ||||
|             if (hasOnion) { | ||||
|               torNodes++; | ||||
|               isUnnanounced = false; | ||||
|             } | ||||
|             const hasClearnet = [4, 6].includes(net.isIP(socket.substring(0, socket.lastIndexOf(':')))); | ||||
|             if (hasClearnet) { | ||||
|               clearnetNodes++; | ||||
|               isUnnanounced = false; | ||||
|             } | ||||
|           } | ||||
|           if (isUnnanounced) { | ||||
|             unannouncedNodes++; | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|         const query = `INSERT INTO lightning_stats(
 | ||||
|           added, | ||||
|           channel_count, | ||||
|           node_count, | ||||
|           total_capacity, | ||||
|           tor_nodes, | ||||
|           clearnet_nodes, | ||||
|           unannounced_nodes, | ||||
|           avg_capacity, | ||||
|           avg_fee_rate, | ||||
|           avg_base_fee_mtokens, | ||||
|           med_capacity, | ||||
|           med_fee_rate, | ||||
|           med_base_fee_mtokens | ||||
|         ) | ||||
|         VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
 | ||||
| 
 | ||||
|         const rowTimestamp = date.getTime() / 1000; // Save timestamp for the row insertion down below
 | ||||
| 
 | ||||
|         date.setUTCDate(date.getUTCDate() + 1); | ||||
| 
 | ||||
|         // Last iteration, save channels stats
 | ||||
|         const channelStats = (date >= currentDate ? await channelsApi.$getChannelsStats() : undefined); | ||||
| 
 | ||||
|         await DB.query(query, [ | ||||
|           rowTimestamp, | ||||
|           channelsCount, | ||||
|           nodeCount, | ||||
|           totalCapacity, | ||||
|           torNodes, | ||||
|           clearnetNodes, | ||||
|           unannouncedNodes, | ||||
|           channelStats?.avgCapacity ?? 0, | ||||
|           channelStats?.avgFeeRate ?? 0, | ||||
|           channelStats?.avgBaseFee ?? 0, | ||||
|           channelStats?.medianCapacity ?? 0, | ||||
|           channelStats?.medianFeeRate ?? 0, | ||||
|           channelStats?.medianBaseFee ?? 0, | ||||
|           ]); | ||||
|       } | ||||
| 
 | ||||
|       logger.info('Historical stats populated.'); | ||||
|     } catch (e) { | ||||
|       logger.err('$populateHistoricalData() error: ' + (e instanceof Error ? e.message : e)); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $populateHistoricalNodeStatistics() { | ||||
|     try { | ||||
|       const [rows]: any = await DB.query(`SELECT COUNT(*) FROM node_stats`); | ||||
|       // Only run if table is empty
 | ||||
|       if (rows[0]['COUNT(*)'] > 0) { | ||||
|         return; | ||||
|       } | ||||
|       logger.info(`Running historical node stats population...`); | ||||
| 
 | ||||
|       const [nodes]: any = await DB.query(`SELECT public_key, first_seen, alias FROM nodes ORDER BY first_seen ASC`); | ||||
| 
 | ||||
|       for (const node of nodes) { | ||||
|         const [channels]: any = await DB.query(`SELECT capacity, created, closing_date FROM channels WHERE node1_public_key = ? OR node2_public_key = ? ORDER BY created ASC`, [node.public_key, node.public_key]); | ||||
| 
 | ||||
|         const date: Date = new Date(this.hardCodedStartTime); | ||||
|         const currentDate = new Date(); | ||||
|         this.setDateMidnight(currentDate); | ||||
| 
 | ||||
|         let lastTotalCapacity = 0; | ||||
|         let lastChannelsCount = 0; | ||||
| 
 | ||||
|         while (date < currentDate) { | ||||
|           let totalCapacity = 0; | ||||
|           let channelsCount = 0; | ||||
|           for (const channel of channels) { | ||||
|             if (new Date(channel.created) > date) { | ||||
|               break; | ||||
|             } | ||||
|             if (channel.closing_date !== null && new Date(channel.closing_date) < date) { | ||||
|               date.setUTCDate(date.getUTCDate() + 1); | ||||
|               continue; | ||||
|             } | ||||
|             totalCapacity += channel.capacity; | ||||
|             channelsCount++; | ||||
|           } | ||||
| 
 | ||||
|           if (lastTotalCapacity === totalCapacity && lastChannelsCount === channelsCount) { | ||||
|             date.setUTCDate(date.getUTCDate() + 1); | ||||
|             continue; | ||||
|           } | ||||
| 
 | ||||
|           lastTotalCapacity = totalCapacity; | ||||
|           lastChannelsCount = channelsCount; | ||||
| 
 | ||||
|           const query = `INSERT INTO node_stats(
 | ||||
|             public_key, | ||||
|             added, | ||||
|             capacity, | ||||
|             channels | ||||
|           ) | ||||
|           VALUES (?, FROM_UNIXTIME(?), ?, ?)`;
 | ||||
| 
 | ||||
|           await DB.query(query, [ | ||||
|             node.public_key, | ||||
|             date.getTime() / 1000, | ||||
|             totalCapacity, | ||||
|             channelsCount, | ||||
|           ]); | ||||
|           date.setUTCDate(date.getUTCDate() + 1); | ||||
|         } | ||||
|         logger.debug('Updated node_stats for: ' + node.alias); | ||||
|       } | ||||
|       logger.info('Historical stats populated.'); | ||||
|     } catch (e) { | ||||
|       logger.err('$populateHistoricalNodeData() error: ' + (e instanceof Error ? e.message : e)); | ||||
|     } | ||||
|     logger.info(`Running lightning daily stats log...`); | ||||
|     const networkGraph = await lightningApi.$getNetworkGraph(); | ||||
|     LightningStatsImporter.computeNetworkStats(date.getTime(), networkGraph); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										113
									
								
								backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								backend/src/tasks/lightning/sync-tasks/funding-tx-fetcher.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,113 @@ | ||||
| import { existsSync, promises } from 'fs'; | ||||
| import bitcoinClient from '../../../api/bitcoin/bitcoin-client'; | ||||
| import config from '../../../config'; | ||||
| import logger from '../../../logger'; | ||||
| 
 | ||||
| const fsPromises = promises; | ||||
| 
 | ||||
| const BLOCKS_CACHE_MAX_SIZE = 100;   | ||||
| const CACHE_FILE_NAME = config.MEMPOOL.CACHE_DIR + '/ln-funding-txs-cache.json'; | ||||
| 
 | ||||
| class FundingTxFetcher { | ||||
|   private running = false; | ||||
|   private blocksCache = {}; | ||||
|   private channelNewlyProcessed = 0; | ||||
|   public fundingTxCache = {}; | ||||
| 
 | ||||
|   async $init(): Promise<void> { | ||||
|     // Load funding tx disk cache
 | ||||
|     if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) { | ||||
|       try { | ||||
|         this.fundingTxCache = JSON.parse(await fsPromises.readFile(CACHE_FILE_NAME, 'utf-8')); | ||||
|       } catch (e) { | ||||
|         logger.err(`Unable to parse channels funding txs disk cache. Starting from scratch`); | ||||
|         this.fundingTxCache = {}; | ||||
|       } | ||||
|       logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   async $fetchChannelsFundingTxs(channelIds: string[]): Promise<void> { | ||||
|     if (this.running) { | ||||
|       return; | ||||
|     } | ||||
|     this.running = true; | ||||
|      | ||||
|     const globalTimer = new Date().getTime() / 1000; | ||||
|     let cacheTimer = new Date().getTime() / 1000; | ||||
|     let loggerTimer = new Date().getTime() / 1000; | ||||
|     let channelProcessed = 0; | ||||
|     this.channelNewlyProcessed = 0; | ||||
|     for (const channelId of channelIds) { | ||||
|       await this.$fetchChannelOpenTx(channelId); | ||||
|       ++channelProcessed; | ||||
| 
 | ||||
|       let elapsedSeconds = Math.round((new Date().getTime() / 1000) - loggerTimer); | ||||
|       if (elapsedSeconds > 10) { | ||||
|         elapsedSeconds = Math.round((new Date().getTime() / 1000) - globalTimer); | ||||
|         logger.debug(`Indexing channels funding tx ${channelProcessed + 1} of ${channelIds.length} ` + | ||||
|           `(${Math.floor(channelProcessed / channelIds.length * 10000) / 100}%) | ` + | ||||
|           `elapsed: ${elapsedSeconds} seconds` | ||||
|         ); | ||||
|         loggerTimer = new Date().getTime() / 1000; | ||||
|       } | ||||
| 
 | ||||
|       elapsedSeconds = Math.round((new Date().getTime() / 1000) - cacheTimer); | ||||
|       if (elapsedSeconds > 60) { | ||||
|         logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); | ||||
|         fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); | ||||
|         cacheTimer = new Date().getTime() / 1000; | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     if (this.channelNewlyProcessed > 0) { | ||||
|       logger.info(`Indexed ${this.channelNewlyProcessed} additional channels funding tx`); | ||||
|       logger.debug(`Saving ${Object.keys(this.fundingTxCache).length} funding txs cache into disk`); | ||||
|       fsPromises.writeFile(CACHE_FILE_NAME, JSON.stringify(this.fundingTxCache)); | ||||
|     } | ||||
| 
 | ||||
|     this.running = false; | ||||
|   } | ||||
|    | ||||
|   public async $fetchChannelOpenTx(channelId: string): Promise<any> { | ||||
|     if (this.fundingTxCache[channelId]) { | ||||
|       return this.fundingTxCache[channelId]; | ||||
|     } | ||||
| 
 | ||||
|     const parts = channelId.split('x'); | ||||
|     const blockHeight = parts[0]; | ||||
|     const txIdx = parts[1]; | ||||
|     const outputIdx = parts[2]; | ||||
| 
 | ||||
|     let block = this.blocksCache[blockHeight]; | ||||
|     // Fetch it from core
 | ||||
|     if (!block) { | ||||
|       const blockHash = await bitcoinClient.getBlockHash(parseInt(blockHeight, 10)); | ||||
|       block = await bitcoinClient.getBlock(blockHash, 1); | ||||
|     } | ||||
|     this.blocksCache[block.height] = block; | ||||
| 
 | ||||
|     const blocksCacheHashes = Object.keys(this.blocksCache).sort((a, b) => parseInt(b) - parseInt(a)).reverse(); | ||||
|     if (blocksCacheHashes.length > BLOCKS_CACHE_MAX_SIZE) { | ||||
|       for (let i = 0; i < 10; ++i) { | ||||
|         delete this.blocksCache[blocksCacheHashes[i]]; | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     const txid = block.tx[txIdx]; | ||||
|     const rawTx = await bitcoinClient.getRawTransaction(txid); | ||||
|     const tx = await bitcoinClient.decodeRawTransaction(rawTx); | ||||
| 
 | ||||
|     this.fundingTxCache[channelId] = { | ||||
|       timestamp: block.time, | ||||
|       txid: txid, | ||||
|       value: tx.vout[outputIdx].value, | ||||
|     }; | ||||
| 
 | ||||
|     ++this.channelNewlyProcessed; | ||||
| 
 | ||||
|     return this.fundingTxCache[channelId]; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| export default new FundingTxFetcher; | ||||
							
								
								
									
										330
									
								
								backend/src/tasks/lightning/sync-tasks/stats-importer.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										330
									
								
								backend/src/tasks/lightning/sync-tasks/stats-importer.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,330 @@ | ||||
| import DB from '../../../database'; | ||||
| import { promises } from 'fs'; | ||||
| import { XMLParser } from 'fast-xml-parser'; | ||||
| import logger from '../../../logger'; | ||||
| import fundingTxFetcher from './funding-tx-fetcher'; | ||||
| import config from '../../../config'; | ||||
| 
 | ||||
| const fsPromises = promises; | ||||
| 
 | ||||
| interface Node { | ||||
|   id: string; | ||||
|   timestamp: number; | ||||
|   features: string; | ||||
|   rgb_color: string; | ||||
|   alias: string; | ||||
|   addresses: string; | ||||
|   out_degree: number; | ||||
|   in_degree: number; | ||||
| } | ||||
| 
 | ||||
| interface Channel { | ||||
|   scid: string; | ||||
|   source: string; | ||||
|   destination: string; | ||||
|   timestamp: number; | ||||
|   features: string; | ||||
|   fee_base_msat: number; | ||||
|   fee_proportional_millionths: number; | ||||
|   htlc_minimim_msat: number; | ||||
|   cltv_expiry_delta: number; | ||||
|   htlc_maximum_msat: number; | ||||
| } | ||||
| 
 | ||||
| class LightningStatsImporter { | ||||
|   topologiesFolder = config.LIGHTNING.TOPOLOGY_FOLDER; | ||||
|   parser = new XMLParser(); | ||||
| 
 | ||||
|   async $run(): Promise<void> { | ||||
|     logger.info(`Importing historical lightning stats`); | ||||
| 
 | ||||
|     const [channels]: any[] = await DB.query('SELECT short_id from channels;'); | ||||
|     logger.info('Caching funding txs for currently existing channels'); | ||||
|     await fundingTxFetcher.$fetchChannelsFundingTxs(channels.map(channel => channel.short_id)); | ||||
|      | ||||
|     await this.$importHistoricalLightningStats(); | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|    * Generate LN network stats for one day | ||||
|    */ | ||||
|   public async computeNetworkStats(timestamp: number, networkGraph): Promise<unknown> { | ||||
|     // Node counts and network shares
 | ||||
|     let clearnetNodes = 0; | ||||
|     let torNodes = 0; | ||||
|     let clearnetTorNodes = 0; | ||||
|     let unannouncedNodes = 0; | ||||
| 
 | ||||
|     for (const node of networkGraph.nodes) { | ||||
|       let hasOnion = false; | ||||
|       let hasClearnet = false; | ||||
|       let isUnnanounced = true; | ||||
| 
 | ||||
|       const sockets = node.addresses.split(','); | ||||
|       for (const socket of sockets) { | ||||
|         hasOnion = hasOnion || (socket.indexOf('torv3://') !== -1); | ||||
|         hasClearnet = hasClearnet || (socket.indexOf('ipv4://') !== -1 || socket.indexOf('ipv6://') !== -1); | ||||
|       } | ||||
|       if (hasOnion && hasClearnet) { | ||||
|         clearnetTorNodes++; | ||||
|         isUnnanounced = false; | ||||
|       } else if (hasOnion) { | ||||
|         torNodes++; | ||||
|         isUnnanounced = false; | ||||
|       } else if (hasClearnet) { | ||||
|         clearnetNodes++; | ||||
|         isUnnanounced = false; | ||||
|       } | ||||
|       if (isUnnanounced) { | ||||
|         unannouncedNodes++; | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     // Channels and node historical stats
 | ||||
|     const nodeStats = {}; | ||||
|     let capacity = 0; | ||||
|     let avgFeeRate = 0; | ||||
|     let avgBaseFee = 0; | ||||
|     const capacities: number[] = []; | ||||
|     const feeRates: number[] = []; | ||||
|     const baseFees: number[] = []; | ||||
|     const alreadyCountedChannels = {}; | ||||
|      | ||||
|     for (const channel of networkGraph.channels) { | ||||
|       const short_id = channel.scid.slice(0, -2); | ||||
| 
 | ||||
|       const tx = await fundingTxFetcher.$fetchChannelOpenTx(short_id); | ||||
|       if (!tx) { | ||||
|         logger.err(`Unable to fetch funding tx for channel ${short_id}. Capacity and creation date is unknown. Skipping channel.`); | ||||
|         continue; | ||||
|       } | ||||
| 
 | ||||
|       if (!nodeStats[channel.source]) { | ||||
|         nodeStats[channel.source] = { | ||||
|           capacity: 0, | ||||
|           channels: 0, | ||||
|         }; | ||||
|       } | ||||
|       if (!nodeStats[channel.destination]) { | ||||
|         nodeStats[channel.destination] = { | ||||
|           capacity: 0, | ||||
|           channels: 0, | ||||
|         }; | ||||
|       } | ||||
|        | ||||
|       nodeStats[channel.source].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.source].channels++; | ||||
|       nodeStats[channel.destination].capacity += Math.round(tx.value * 100000000); | ||||
|       nodeStats[channel.destination].channels++; | ||||
| 
 | ||||
|       if (!alreadyCountedChannels[short_id]) { | ||||
|         capacity += Math.round(tx.value * 100000000); | ||||
|         capacities.push(Math.round(tx.value * 100000000)); | ||||
|         alreadyCountedChannels[short_id] = true; | ||||
|       } | ||||
| 
 | ||||
|       if (channel.fee_proportional_millionths < 5000) { | ||||
|         avgFeeRate += channel.fee_proportional_millionths; | ||||
|         feeRates.push(channel.fee_proportional_millionths); | ||||
|       } | ||||
| 
 | ||||
|       if (channel.fee_base_msat < 5000) { | ||||
|         avgBaseFee += channel.fee_base_msat;       | ||||
|         baseFees.push(channel.fee_base_msat); | ||||
|       } | ||||
|     } | ||||
|      | ||||
|     avgFeeRate /= networkGraph.channels.length; | ||||
|     avgBaseFee /= networkGraph.channels.length; | ||||
|     const medCapacity = capacities.sort((a, b) => b - a)[Math.round(capacities.length / 2 - 1)]; | ||||
|     const medFeeRate = feeRates.sort((a, b) => b - a)[Math.round(feeRates.length / 2 - 1)]; | ||||
|     const medBaseFee = baseFees.sort((a, b) => b - a)[Math.round(baseFees.length / 2 - 1)]; | ||||
|     const avgCapacity = Math.round(capacity / capacities.length); | ||||
|      | ||||
|     let query = `INSERT INTO lightning_stats(
 | ||||
|       added, | ||||
|       channel_count, | ||||
|       node_count, | ||||
|       total_capacity, | ||||
|       tor_nodes, | ||||
|       clearnet_nodes, | ||||
|       unannounced_nodes, | ||||
|       clearnet_tor_nodes, | ||||
|       avg_capacity, | ||||
|       avg_fee_rate, | ||||
|       avg_base_fee_mtokens, | ||||
|       med_capacity, | ||||
|       med_fee_rate, | ||||
|       med_base_fee_mtokens | ||||
|     ) | ||||
|     VALUES (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
 | ||||
| 
 | ||||
|     await DB.query(query, [ | ||||
|       timestamp, | ||||
|       capacities.length, | ||||
|       networkGraph.nodes.length, | ||||
|       capacity, | ||||
|       torNodes, | ||||
|       clearnetNodes, | ||||
|       unannouncedNodes, | ||||
|       clearnetTorNodes, | ||||
|       avgCapacity, | ||||
|       avgFeeRate, | ||||
|       avgBaseFee, | ||||
|       medCapacity, | ||||
|       medFeeRate, | ||||
|       medBaseFee, | ||||
|     ]); | ||||
| 
 | ||||
|     for (const public_key of Object.keys(nodeStats)) { | ||||
|       query = `INSERT INTO node_stats(
 | ||||
|         public_key, | ||||
|         added, | ||||
|         capacity, | ||||
|         channels | ||||
|       ) | ||||
|       VALUES (?, FROM_UNIXTIME(?), ?, ?)`;
 | ||||
|      | ||||
|       await DB.query(query, [ | ||||
|         public_key, | ||||
|         timestamp, | ||||
|         nodeStats[public_key].capacity, | ||||
|         nodeStats[public_key].channels, | ||||
|       ]); | ||||
|     } | ||||
| 
 | ||||
|     return { | ||||
|       added: timestamp, | ||||
|       node_count: networkGraph.nodes.length | ||||
|     }; | ||||
|   } | ||||
| 
 | ||||
|   async $importHistoricalLightningStats(): Promise<void> { | ||||
|     let latestNodeCount = 1; | ||||
| 
 | ||||
|     const fileList = await fsPromises.readdir(this.topologiesFolder); | ||||
|     fileList.sort().reverse(); | ||||
| 
 | ||||
|     const [rows]: any[] = await DB.query('SELECT UNIX_TIMESTAMP(added) as added, node_count FROM lightning_stats'); | ||||
|     const existingStatsTimestamps = {}; | ||||
|     for (const row of rows) { | ||||
|       existingStatsTimestamps[row.added] = rows[0]; | ||||
|     } | ||||
| 
 | ||||
|     for (const filename of fileList) { | ||||
|       const timestamp = parseInt(filename.split('_')[1], 10); | ||||
| 
 | ||||
|       // Stats exist already, don't calculate/insert them
 | ||||
|       if (existingStatsTimestamps[timestamp] !== undefined) { | ||||
|         latestNodeCount = existingStatsTimestamps[timestamp].node_count; | ||||
|         continue; | ||||
|       } | ||||
| 
 | ||||
|       logger.debug(`Processing ${this.topologiesFolder}/${filename}`); | ||||
|       const fileContent = await fsPromises.readFile(`${this.topologiesFolder}/${filename}`, 'utf8'); | ||||
| 
 | ||||
|       let graph; | ||||
|       if (filename.indexOf('.json') !== -1) { | ||||
|         try { | ||||
|           graph = JSON.parse(fileContent); | ||||
|         } catch (e) { | ||||
|           logger.debug(`Invalid topology file, cannot parse the content`); | ||||
|         } | ||||
|       } else { | ||||
|         graph = this.parseFile(fileContent); | ||||
|         if (!graph) { | ||||
|           logger.debug(`Invalid topology file, cannot parse the content`); | ||||
|           continue; | ||||
|         } | ||||
|         await fsPromises.writeFile(`${this.topologiesFolder}/${filename}.json`, JSON.stringify(graph)); | ||||
|       } | ||||
| 
 | ||||
|       if (timestamp > 1556316000) { | ||||
|         // "No, the reason most likely is just that I started collection in 2019,
 | ||||
|         // so what I had before that is just the survivors from before, which weren't that many"
 | ||||
|         const diffRatio = graph.nodes.length / latestNodeCount; | ||||
|         if (diffRatio < 0.9) { | ||||
|           // Ignore drop of more than 90% of the node count as it's probably a missing data point
 | ||||
|           continue; | ||||
|         } | ||||
|       } | ||||
|       latestNodeCount = graph.nodes.length; | ||||
|        | ||||
|       const datestr = `${new Date(timestamp * 1000).toUTCString()} (${timestamp})`; | ||||
|       logger.debug(`${datestr}: Found ${graph.nodes.length} nodes and ${graph.channels.length} channels`); | ||||
| 
 | ||||
|       // Cache funding txs
 | ||||
|       logger.debug(`Caching funding txs for ${datestr}`); | ||||
|       await fundingTxFetcher.$fetchChannelsFundingTxs(graph.channels.map(channel => channel.scid.slice(0, -2))); | ||||
| 
 | ||||
|       logger.debug(`Generating LN network stats for ${datestr}`); | ||||
|       const stat = await this.computeNetworkStats(timestamp, graph); | ||||
| 
 | ||||
|       existingStatsTimestamps[timestamp] = stat; | ||||
|     } | ||||
| 
 | ||||
|     logger.info(`Lightning network stats historical import completed`); | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|    * Parse the file content into XML, and return a list of nodes and channels | ||||
|    */ | ||||
|   private parseFile(fileContent): any { | ||||
|     const graph = this.parser.parse(fileContent); | ||||
|     if (Object.keys(graph).length === 0) { | ||||
|       return null; | ||||
|     } | ||||
| 
 | ||||
|     const nodes: Node[] = []; | ||||
|     const channels: Channel[] = []; | ||||
| 
 | ||||
|     // If there is only one entry, the parser does not return an array, so we override this
 | ||||
|     if (!Array.isArray(graph.graphml.graph.node)) { | ||||
|       graph.graphml.graph.node = [graph.graphml.graph.node]; | ||||
|     } | ||||
|     if (!Array.isArray(graph.graphml.graph.edge)) { | ||||
|       graph.graphml.graph.edge = [graph.graphml.graph.edge]; | ||||
|     } | ||||
| 
 | ||||
|     for (const node of graph.graphml.graph.node) { | ||||
|       if (!node.data) { | ||||
|         continue; | ||||
|       } | ||||
|       nodes.push({ | ||||
|         id: node.data[0], | ||||
|         timestamp: node.data[1], | ||||
|         features: node.data[2], | ||||
|         rgb_color: node.data[3], | ||||
|         alias: node.data[4], | ||||
|         addresses: node.data[5], | ||||
|         out_degree: node.data[6], | ||||
|         in_degree: node.data[7], | ||||
|       }); | ||||
|     } | ||||
| 
 | ||||
|     for (const channel of graph.graphml.graph.edge) { | ||||
|       if (!channel.data) { | ||||
|         continue; | ||||
|       } | ||||
|       channels.push({ | ||||
|         scid: channel.data[0], | ||||
|         source: channel.data[1], | ||||
|         destination: channel.data[2], | ||||
|         timestamp: channel.data[3], | ||||
|         features: channel.data[4], | ||||
|         fee_base_msat: channel.data[5], | ||||
|         fee_proportional_millionths: channel.data[6], | ||||
|         htlc_minimim_msat: channel.data[7], | ||||
|         cltv_expiry_delta: channel.data[8], | ||||
|         htlc_maximum_msat: channel.data[9], | ||||
|       }); | ||||
|     } | ||||
| 
 | ||||
|     return { | ||||
|       nodes: nodes, | ||||
|       channels: channels, | ||||
|     }; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| export default new LightningStatsImporter; | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user