Merge pull request #4637 from mempool/mononaut/summaries-indexing-error
More robust error handling and logging during summaries indexing
This commit is contained in:
		
						commit
						b44ec76130
					
				| @ -2,7 +2,7 @@ import config from '../config'; | |||||||
| import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; | import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; | ||||||
| import logger from '../logger'; | import logger from '../logger'; | ||||||
| import memPool from './mempool'; | import memPool from './mempool'; | ||||||
| import { BlockExtended, BlockExtension, BlockSummary, PoolTag, TransactionExtended, TransactionMinerInfo, CpfpSummary, MempoolTransactionExtended, TransactionClassified } from '../mempool.interfaces'; | import { BlockExtended, BlockExtension, BlockSummary, PoolTag, TransactionExtended, TransactionMinerInfo, CpfpSummary, MempoolTransactionExtended, TransactionClassified, BlockAudit } from '../mempool.interfaces'; | ||||||
| import { Common } from './common'; | import { Common } from './common'; | ||||||
| import diskCache from './disk-cache'; | import diskCache from './disk-cache'; | ||||||
| import transactionUtils from './transaction-utils'; | import transactionUtils from './transaction-utils'; | ||||||
| @ -451,7 +451,9 @@ class Blocks { | |||||||
|         if (config.MEMPOOL.BACKEND === 'esplora') { |         if (config.MEMPOOL.BACKEND === 'esplora') { | ||||||
|           const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx)); |           const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx)); | ||||||
|           const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs); |           const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs); | ||||||
|  |           if (cpfpSummary) { | ||||||
|             await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary
 |             await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary
 | ||||||
|  |           } | ||||||
|         } else { |         } else { | ||||||
|           await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary
 |           await this.$getStrippedBlockTransactions(block.hash, true, true); // This will index the block summary
 | ||||||
|         } |         } | ||||||
| @ -995,11 +997,11 @@ class Blocks { | |||||||
|     return state; |     return state; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private updateTimerProgress(state, msg) { |   private updateTimerProgress(state, msg): void { | ||||||
|     state.progress = msg; |     state.progress = msg; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private clearTimer(state) { |   private clearTimer(state): void { | ||||||
|     if (state.timer) { |     if (state.timer) { | ||||||
|       clearTimeout(state.timer); |       clearTimeout(state.timer); | ||||||
|     } |     } | ||||||
| @ -1088,13 +1090,19 @@ class Blocks { | |||||||
|       summary = { |       summary = { | ||||||
|         id: hash, |         id: hash, | ||||||
|         transactions: cpfpSummary.transactions.map(tx => { |         transactions: cpfpSummary.transactions.map(tx => { | ||||||
|  |           let flags: number = 0; | ||||||
|  |           try { | ||||||
|  |             flags = tx.flags || Common.getTransactionFlags(tx); | ||||||
|  |           } catch (e) { | ||||||
|  |             logger.warn('Failed to classify transaction: ' + (e instanceof Error ? e.message : e)); | ||||||
|  |           } | ||||||
|           return { |           return { | ||||||
|             txid: tx.txid, |             txid: tx.txid, | ||||||
|             fee: tx.fee || 0, |             fee: tx.fee || 0, | ||||||
|             vsize: tx.vsize, |             vsize: tx.vsize, | ||||||
|             value: Math.round(tx.vout.reduce((acc, vout) => acc + (vout.value ? vout.value : 0), 0)), |             value: Math.round(tx.vout.reduce((acc, vout) => acc + (vout.value ? vout.value : 0), 0)), | ||||||
|             rate: tx.effectiveFeePerVsize, |             rate: tx.effectiveFeePerVsize, | ||||||
|             flags: tx.flags || Common.getTransactionFlags(tx), |             flags: flags, | ||||||
|           }; |           }; | ||||||
|         }), |         }), | ||||||
|       }; |       }; | ||||||
| @ -1284,7 +1292,7 @@ class Blocks { | |||||||
|     return blocks; |     return blocks; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   public async $getBlockAuditSummary(hash: string): Promise<any> { |   public async $getBlockAuditSummary(hash: string): Promise<BlockAudit | null> { | ||||||
|     if (['mainnet', 'testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) { |     if (['mainnet', 'testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) { | ||||||
|       return BlocksAuditsRepository.$getBlockAudit(hash); |       return BlocksAuditsRepository.$getBlockAudit(hash); | ||||||
|     } else { |     } else { | ||||||
| @ -1304,7 +1312,7 @@ class Blocks { | |||||||
|     return this.currentBlockHeight; |     return this.currentBlockHeight; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   public async $indexCPFP(hash: string, height: number, txs?: TransactionExtended[]): Promise<CpfpSummary> { |   public async $indexCPFP(hash: string, height: number, txs?: TransactionExtended[]): Promise<CpfpSummary | null> { | ||||||
|     let transactions = txs; |     let transactions = txs; | ||||||
|     if (!transactions) { |     if (!transactions) { | ||||||
|       if (config.MEMPOOL.BACKEND === 'esplora') { |       if (config.MEMPOOL.BACKEND === 'esplora') { | ||||||
| @ -1319,6 +1327,7 @@ class Blocks { | |||||||
|       } |       } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     if (transactions?.length != null) { | ||||||
|       const summary = Common.calculateCpfp(height, transactions as TransactionExtended[]); |       const summary = Common.calculateCpfp(height, transactions as TransactionExtended[]); | ||||||
| 
 | 
 | ||||||
|       await this.$saveCpfp(hash, height, summary); |       await this.$saveCpfp(hash, height, summary); | ||||||
| @ -1327,6 +1336,10 @@ class Blocks { | |||||||
|       await blocksRepository.$saveEffectiveFeeStats(hash, effectiveFeeStats); |       await blocksRepository.$saveEffectiveFeeStats(hash, effectiveFeeStats); | ||||||
| 
 | 
 | ||||||
|       return summary; |       return summary; | ||||||
|  |     } else { | ||||||
|  |       logger.err(`Cannot index CPFP for block ${height} - missing transaction data`); | ||||||
|  |       return null; | ||||||
|  |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise<void> { |   public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise<void> { | ||||||
|  | |||||||
| @ -6,6 +6,7 @@ import { NodeSocket } from '../repositories/NodesSocketsRepository'; | |||||||
| import { isIP } from 'net'; | import { isIP } from 'net'; | ||||||
| import transactionUtils from './transaction-utils'; | import transactionUtils from './transaction-utils'; | ||||||
| import { isPoint } from '../utils/secp256k1'; | import { isPoint } from '../utils/secp256k1'; | ||||||
|  | import logger from '../logger'; | ||||||
| export class Common { | export class Common { | ||||||
|   static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? |   static nativeAssetId = config.MEMPOOL.NETWORK === 'liquidtestnet' ? | ||||||
|     '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' |     '144c654344aa716d6f3abcc1ca90e5641e4e2a7f633bc09fe3baf64585819a49' | ||||||
| @ -261,6 +262,9 @@ export class Common { | |||||||
|         case 'v0_p2wpkh': flags |= TransactionFlags.p2wpkh; break; |         case 'v0_p2wpkh': flags |= TransactionFlags.p2wpkh; break; | ||||||
|         case 'v0_p2wsh': flags |= TransactionFlags.p2wsh; break; |         case 'v0_p2wsh': flags |= TransactionFlags.p2wsh; break; | ||||||
|         case 'v1_p2tr': { |         case 'v1_p2tr': { | ||||||
|  |           if (!vin.witness?.length) { | ||||||
|  |             throw new Error('Taproot input missing witness data'); | ||||||
|  |           } | ||||||
|           flags |= TransactionFlags.p2tr; |           flags |= TransactionFlags.p2tr; | ||||||
|           // in taproot, if the last witness item begins with 0x50, it's an annex
 |           // in taproot, if the last witness item begins with 0x50, it's an annex
 | ||||||
|           const hasAnnex = vin.witness?.[vin.witness.length - 1].startsWith('50'); |           const hasAnnex = vin.witness?.[vin.witness.length - 1].startsWith('50'); | ||||||
| @ -301,7 +305,7 @@ export class Common { | |||||||
|         case 'p2pk': { |         case 'p2pk': { | ||||||
|           flags |= TransactionFlags.p2pk; |           flags |= TransactionFlags.p2pk; | ||||||
|           // detect fake pubkey (i.e. not a valid DER point on the secp256k1 curve)
 |           // detect fake pubkey (i.e. not a valid DER point on the secp256k1 curve)
 | ||||||
|           hasFakePubkey = hasFakePubkey || !isPoint(vout.scriptpubkey.slice(2, -2)); |           hasFakePubkey = hasFakePubkey || !isPoint(vout.scriptpubkey?.slice(2, -2)); | ||||||
|         } break; |         } break; | ||||||
|         case 'multisig': { |         case 'multisig': { | ||||||
|           flags |= TransactionFlags.p2ms; |           flags |= TransactionFlags.p2ms; | ||||||
| @ -348,7 +352,12 @@ export class Common { | |||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   static classifyTransaction(tx: TransactionExtended): TransactionClassified { |   static classifyTransaction(tx: TransactionExtended): TransactionClassified { | ||||||
|     const flags = Common.getTransactionFlags(tx); |     let flags = 0; | ||||||
|  |     try { | ||||||
|  |       flags = Common.getTransactionFlags(tx); | ||||||
|  |     } catch (e) { | ||||||
|  |       logger.warn('Failed to add classification flags to transaction: ' + (e instanceof Error ? e.message : e)); | ||||||
|  |     } | ||||||
|     tx.flags = flags; |     tx.flags = flags; | ||||||
|     return { |     return { | ||||||
|       ...Common.stripTransaction(tx), |       ...Common.stripTransaction(tx), | ||||||
|  | |||||||
| @ -59,7 +59,7 @@ class BlocksAuditRepositories { | |||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   public async $getBlockAudit(hash: string): Promise<any> { |   public async $getBlockAudit(hash: string): Promise<BlockAudit | null> { | ||||||
|     try { |     try { | ||||||
|       const [rows]: any[] = await DB.query( |       const [rows]: any[] = await DB.query( | ||||||
|         `SELECT blocks_audits.height, blocks_audits.hash as id, UNIX_TIMESTAMP(blocks_audits.time) as timestamp,
 |         `SELECT blocks_audits.height, blocks_audits.hash as id, UNIX_TIMESTAMP(blocks_audits.time) as timestamp,
 | ||||||
|  | |||||||
| @ -5,7 +5,7 @@ import logger from '../logger'; | |||||||
| import { Common } from '../api/common'; | import { Common } from '../api/common'; | ||||||
| import PoolsRepository from './PoolsRepository'; | import PoolsRepository from './PoolsRepository'; | ||||||
| import HashratesRepository from './HashratesRepository'; | import HashratesRepository from './HashratesRepository'; | ||||||
| import { escape } from 'mysql2'; | import { RowDataPacket, escape } from 'mysql2'; | ||||||
| import BlocksSummariesRepository from './BlocksSummariesRepository'; | import BlocksSummariesRepository from './BlocksSummariesRepository'; | ||||||
| import DifficultyAdjustmentsRepository from './DifficultyAdjustmentsRepository'; | import DifficultyAdjustmentsRepository from './DifficultyAdjustmentsRepository'; | ||||||
| import bitcoinClient from '../api/bitcoin/bitcoin-client'; | import bitcoinClient from '../api/bitcoin/bitcoin-client'; | ||||||
| @ -802,10 +802,10 @@ class BlocksRepository { | |||||||
|   /** |   /** | ||||||
|    * Get a list of blocks that have been indexed |    * Get a list of blocks that have been indexed | ||||||
|    */ |    */ | ||||||
|   public async $getIndexedBlocks(): Promise<any[]> { |   public async $getIndexedBlocks(): Promise<{ height: number, hash: string }[]> { | ||||||
|     try { |     try { | ||||||
|       const [rows]: any = await DB.query(`SELECT height, hash FROM blocks ORDER BY height DESC`); |       const [rows] = await DB.query(`SELECT height, hash FROM blocks ORDER BY height DESC`) as RowDataPacket[][]; | ||||||
|       return rows; |       return rows as { height: number, hash: string }[]; | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err('Cannot generate block size and weight history. Reason: ' + (e instanceof Error ? e.message : e)); |       logger.err('Cannot generate block size and weight history. Reason: ' + (e instanceof Error ? e.message : e)); | ||||||
|       throw e; |       throw e; | ||||||
| @ -815,7 +815,7 @@ class BlocksRepository { | |||||||
|   /** |   /** | ||||||
|    * Get a list of blocks that have not had CPFP data indexed |    * Get a list of blocks that have not had CPFP data indexed | ||||||
|    */ |    */ | ||||||
|    public async $getCPFPUnindexedBlocks(): Promise<any[]> { |    public async $getCPFPUnindexedBlocks(): Promise<number[]> { | ||||||
|     try { |     try { | ||||||
|       const blockchainInfo = await bitcoinClient.getBlockchainInfo(); |       const blockchainInfo = await bitcoinClient.getBlockchainInfo(); | ||||||
|       const currentBlockHeight = blockchainInfo.blocks; |       const currentBlockHeight = blockchainInfo.blocks; | ||||||
| @ -825,13 +825,13 @@ class BlocksRepository { | |||||||
|       } |       } | ||||||
|       const minHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1); |       const minHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1); | ||||||
| 
 | 
 | ||||||
|       const [rows]: any[] = await DB.query(` |       const [rows] = await DB.query(` | ||||||
|         SELECT height |         SELECT height | ||||||
|         FROM compact_cpfp_clusters |         FROM compact_cpfp_clusters | ||||||
|         WHERE height <= ? AND height >= ? |         WHERE height <= ? AND height >= ? | ||||||
|         GROUP BY height |         GROUP BY height | ||||||
|         ORDER BY height DESC; |         ORDER BY height DESC; | ||||||
|       `, [currentBlockHeight, minHeight]);
 |       `, [currentBlockHeight, minHeight]) as RowDataPacket[][];
 | ||||||
| 
 | 
 | ||||||
|       const indexedHeights = {}; |       const indexedHeights = {}; | ||||||
|       rows.forEach((row) => { indexedHeights[row.height] = true; }); |       rows.forEach((row) => { indexedHeights[row.height] = true; }); | ||||||
|  | |||||||
| @ -1,3 +1,4 @@ | |||||||
|  | import { RowDataPacket } from 'mysql2'; | ||||||
| import DB from '../database'; | import DB from '../database'; | ||||||
| import logger from '../logger'; | import logger from '../logger'; | ||||||
| import { BlockSummary, TransactionClassified } from '../mempool.interfaces'; | import { BlockSummary, TransactionClassified } from '../mempool.interfaces'; | ||||||
| @ -69,7 +70,7 @@ class BlocksSummariesRepository { | |||||||
| 
 | 
 | ||||||
|   public async $getIndexedSummariesId(): Promise<string[]> { |   public async $getIndexedSummariesId(): Promise<string[]> { | ||||||
|     try { |     try { | ||||||
|       const [rows]: any[] = await DB.query(`SELECT id from blocks_summaries`); |       const [rows] = await DB.query(`SELECT id from blocks_summaries`) as RowDataPacket[][]; | ||||||
|       return rows.map(row => row.id); |       return rows.map(row => row.id); | ||||||
|     } catch (e) { |     } catch (e) { | ||||||
|       logger.err(`Cannot get block summaries id list. Reason: ` + (e instanceof Error ? e.message : e)); |       logger.err(`Cannot get block summaries id list. Reason: ` + (e instanceof Error ? e.message : e)); | ||||||
|  | |||||||
| @ -31,6 +31,9 @@ const curveP = BigInt(`0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF | |||||||
|  * @returns {boolean} true if the point is on the SECP256K1 curve |  * @returns {boolean} true if the point is on the SECP256K1 curve | ||||||
|  */ |  */ | ||||||
| export function isPoint(pointHex: string): boolean { | export function isPoint(pointHex: string): boolean { | ||||||
|  |   if (!pointHex?.length) { | ||||||
|  |     return false; | ||||||
|  |   } | ||||||
|   if ( |   if ( | ||||||
|     !( |     !( | ||||||
|       // is uncompressed
 |       // is uncompressed
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user