Refactor forensics batching, speed up opened channel forensics
This commit is contained in:
		
							parent
							
								
									5bee54a2bf
								
							
						
					
					
						commit
						995acb238d
					
				@ -71,18 +71,18 @@ class ForensicsService {
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      logger.debug(`Started running closed channel forensics...`);
 | 
			
		||||
      let remainingChannels;
 | 
			
		||||
      let allChannels;
 | 
			
		||||
      if (onlyNewChannels) {
 | 
			
		||||
        remainingChannels = await channelsApi.$getClosedChannelsWithoutReason();
 | 
			
		||||
        allChannels = await channelsApi.$getClosedChannelsWithoutReason();
 | 
			
		||||
      } else {
 | 
			
		||||
        remainingChannels = await channelsApi.$getUnresolvedClosedChannels();
 | 
			
		||||
        allChannels = await channelsApi.$getUnresolvedClosedChannels();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      let progress = 0;
 | 
			
		||||
      const sliceLength = 1000;
 | 
			
		||||
      // process batches of 1000 channels
 | 
			
		||||
      for (let i = 0; i < Math.ceil(remainingChannels.length / sliceLength); i++) {
 | 
			
		||||
        const channels = remainingChannels.slice(i * sliceLength, (i + 1) * sliceLength);
 | 
			
		||||
      for (let i = 0; i < Math.ceil(allChannels.length / sliceLength); i++) {
 | 
			
		||||
        const channels = allChannels.slice(i * sliceLength, (i + 1) * sliceLength);
 | 
			
		||||
 | 
			
		||||
        let allOutspends: IEsploraApi.Outspend[][] = [];
 | 
			
		||||
        const forceClosedChannels: { channel: any, cachedSpends: string[] }[] = [];
 | 
			
		||||
@ -91,31 +91,28 @@ class ForensicsService {
 | 
			
		||||
        try {
 | 
			
		||||
          const outspendTxids = channels.map(channel => channel.closing_transaction_id);
 | 
			
		||||
          allOutspends = await bitcoinApi.$getBatchedOutspendsInternal(outspendTxids);
 | 
			
		||||
          logger.info(`Fetched outspends for ${allOutspends.length} txs from esplora for lightning forensics`);
 | 
			
		||||
          logger.info(`Fetched outspends for ${allOutspends.length} txs from esplora for LN forensics`);
 | 
			
		||||
          await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
          logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs/outspends'}. Reason ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
        }
 | 
			
		||||
        // fetch spending transactions in bulk and load into txCache
 | 
			
		||||
        try {
 | 
			
		||||
          const newSpendingTxids: { [txid: string]: boolean } = {};
 | 
			
		||||
          for (const outspends of allOutspends) {
 | 
			
		||||
            for (const outspend of outspends) {
 | 
			
		||||
              if (outspend.spent && outspend.txid) {
 | 
			
		||||
                if (!this.txCache[outspend.txid]) {
 | 
			
		||||
                  newSpendingTxids[outspend.txid] = true;
 | 
			
		||||
                }
 | 
			
		||||
              }
 | 
			
		||||
        const newSpendingTxids: { [txid: string]: boolean } = {};
 | 
			
		||||
        for (const outspends of allOutspends) {
 | 
			
		||||
          for (const outspend of outspends) {
 | 
			
		||||
            if (outspend.spent && outspend.txid) {
 | 
			
		||||
              newSpendingTxids[outspend.txid] = true;
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
          const allOutspendTxs = await bitcoinApi.$getRawTransactions(Object.keys(newSpendingTxids));
 | 
			
		||||
          logger.info(`Fetched ${allOutspendTxs.length} out-spending txs from esplora for lightning forensics`);
 | 
			
		||||
          for (const tx of allOutspendTxs) {
 | 
			
		||||
            this.txCache[tx.txid] = tx;
 | 
			
		||||
          }
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
          logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs'}. Reason ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
        }
 | 
			
		||||
        const allOutspendTxs = await this.fetchTransactions(
 | 
			
		||||
          allOutspends.flatMap(outspends =>
 | 
			
		||||
            outspends
 | 
			
		||||
              .filter(outspend => outspend.spent && outspend.txid)
 | 
			
		||||
              .map(outspend => outspend.txid)
 | 
			
		||||
          )
 | 
			
		||||
        );
 | 
			
		||||
        logger.info(`Fetched ${allOutspendTxs.length} out-spending txs from esplora for LN forensics`);
 | 
			
		||||
 | 
			
		||||
        // process each outspend
 | 
			
		||||
        for (const [index, channel] of channels.entries()) {
 | 
			
		||||
@ -163,21 +160,8 @@ class ForensicsService {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // fetch force-closing transactions in bulk
 | 
			
		||||
        try {
 | 
			
		||||
          const newClosingTxids: { [txid: string]: boolean } = {};
 | 
			
		||||
          for (const { channel } of forceClosedChannels) {
 | 
			
		||||
            if (!this.txCache[channel.closing_transaction_id]) {
 | 
			
		||||
              newClosingTxids[channel.closing_transaction_id] = true;
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
          const closingTxs = await bitcoinApi.$getRawTransactions(Object.keys(newClosingTxids));
 | 
			
		||||
          logger.info(`Fetched ${closingTxs.length} closing txs from esplora for lightning forensics`);
 | 
			
		||||
          for (const tx of closingTxs) {
 | 
			
		||||
            this.txCache[tx.txid] = tx;
 | 
			
		||||
          }
 | 
			
		||||
        } catch (e) {
 | 
			
		||||
          logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs'}. Reason ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
        }
 | 
			
		||||
        const closingTxs = await this.fetchTransactions(forceClosedChannels.map(x => x.channel.closing_transaction_id));
 | 
			
		||||
        logger.info(`Fetched ${closingTxs.length} closing txs from esplora for LN forensics`);
 | 
			
		||||
 | 
			
		||||
        // process channels with no lightning script reasons
 | 
			
		||||
        for (const { channel, cachedSpends } of forceClosedChannels) {
 | 
			
		||||
@ -211,7 +195,7 @@ class ForensicsService {
 | 
			
		||||
        progress += channels.length;
 | 
			
		||||
        const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
 | 
			
		||||
        if (elapsedSeconds > 10) {
 | 
			
		||||
          logger.debug(`Updating channel closed channel forensics ${progress}/${remainingChannels.length}`);
 | 
			
		||||
          logger.debug(`Updating channel closed channel forensics ${progress}/${allChannels.length}`);
 | 
			
		||||
          this.loggerTimer = new Date().getTime() / 1000;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
@ -279,8 +263,11 @@ class ForensicsService {
 | 
			
		||||
      logger.debug(`Started running open channel forensics...`);
 | 
			
		||||
      const channels = await channelsApi.$getChannelsWithoutSourceChecked();
 | 
			
		||||
 | 
			
		||||
      // preload open channel transactions
 | 
			
		||||
      await this.fetchTransactions(channels.map(channel => channel.transaction_id), true);
 | 
			
		||||
 | 
			
		||||
      for (const openChannel of channels) {
 | 
			
		||||
        const openTx = await this.fetchTransaction(openChannel.transaction_id, true);
 | 
			
		||||
        const openTx = this.txCache[openChannel.transaction_id];
 | 
			
		||||
        if (!openTx) {
 | 
			
		||||
          continue;
 | 
			
		||||
        }
 | 
			
		||||
@ -414,17 +401,17 @@ class ForensicsService {
 | 
			
		||||
            };
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // preload outspend transactions
 | 
			
		||||
        await this.fetchTransactions(outspends.filter(o => o.spent && o.txid).map(o => o.txid), true);
 | 
			
		||||
 | 
			
		||||
        for (let i = 0; i < outspends?.length; i++) {
 | 
			
		||||
          const outspend = outspends[i];
 | 
			
		||||
          const output = prevChannel.outputs[i];
 | 
			
		||||
          if (outspend.spent && outspend.txid) {
 | 
			
		||||
            try {
 | 
			
		||||
              const spendingTx = await this.fetchTransaction(outspend.txid, true);
 | 
			
		||||
              if (spendingTx) {
 | 
			
		||||
                output.type = this.findLightningScript(spendingTx.vin[outspend.vin || 0]);
 | 
			
		||||
              }
 | 
			
		||||
            } catch (e) {
 | 
			
		||||
              logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + outspend.txid}. Reason ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
            const spendingTx = this.txCache[outspend.txid];
 | 
			
		||||
            if (spendingTx) {
 | 
			
		||||
              output.type = this.findLightningScript(spendingTx.vin[outspend.vin || 0]);
 | 
			
		||||
            }
 | 
			
		||||
          } else {
 | 
			
		||||
            output.type = 0;
 | 
			
		||||
@ -496,6 +483,29 @@ class ForensicsService {
 | 
			
		||||
    return tx;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // fetches a batch of transactions and adds them to the txCache
 | 
			
		||||
  // the returned list of txs does *not* preserve ordering or number
 | 
			
		||||
  async fetchTransactions(txids, temp: boolean = false): Promise<(IEsploraApi.Transaction | null)[]> {
 | 
			
		||||
    // deduplicate txids
 | 
			
		||||
    const uniqueTxids = [...new Set<string>(txids)];
 | 
			
		||||
    // filter out any transactions we already have in the cache
 | 
			
		||||
    const needToFetch: string[] = uniqueTxids.filter(txid => !this.txCache[txid]);
 | 
			
		||||
    try {
 | 
			
		||||
      const txs = await bitcoinApi.$getRawTransactions(needToFetch);
 | 
			
		||||
      for (const tx of txs) {
 | 
			
		||||
        this.txCache[tx.txid] = tx;
 | 
			
		||||
        if (temp) {
 | 
			
		||||
          this.tempCached.push(tx.txid);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs'}. Reason ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      return [];
 | 
			
		||||
    }
 | 
			
		||||
    return txids.map(txid => this.txCache[txid]);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  clearTempCache(): void {
 | 
			
		||||
    for (const txid of this.tempCached) {
 | 
			
		||||
      delete this.txCache[txid];
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user