From 7405cf833618762f74d7cc631038af6739c7159c Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 23 Jan 2024 00:44:34 +0000 Subject: [PATCH] Add block classification indexing --- backend/src/api/blocks.ts | 118 +++++++++++++++++- backend/src/api/common.ts | 13 +- backend/src/api/database-migration.ts | 10 +- backend/src/api/websocket-handler.ts | 3 +- backend/src/indexer.ts | 1 + backend/src/logger.ts | 1 + backend/src/replication/AuditReplication.ts | 3 +- backend/src/repositories/BlocksRepository.ts | 4 +- .../repositories/BlocksSummariesRepository.ts | 49 ++++++-- 9 files changed, 187 insertions(+), 15 deletions(-) diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 646672f58..ba4dfa8bd 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -561,6 +561,115 @@ class Blocks { logger.debug(`Indexing block audit details completed`); } + /** + * [INDEXING] Index transaction classification flags for Goggles + */ + public async $classifyBlocks(): Promise { + // classification requires an esplora backend + if (config.MEMPOOL.BACKEND !== 'esplora') { + return; + } + + const blockchainInfo = await bitcoinClient.getBlockchainInfo(); + const currentBlockHeight = blockchainInfo.blocks; + + const unclassifiedBlocksList = await BlocksSummariesRepository.$getSummariesWithVersion(0); + const unclassifiedTemplatesList = await BlocksSummariesRepository.$getTemplatesWithVersion(0); + + // nothing to do + if (!unclassifiedBlocksList.length && !unclassifiedTemplatesList.length) { + return; + } + + let timer = Date.now(); + let indexedThisRun = 0; + let indexedTotal = 0; + + const minHeight = Math.min( + unclassifiedBlocksList[unclassifiedBlocksList.length - 1].height ?? Infinity, + unclassifiedTemplatesList[unclassifiedTemplatesList.length - 1].height ?? Infinity, + ); + const numToIndex = Math.max( + unclassifiedBlocksList.length, + unclassifiedTemplatesList.length, + ); + + const unclassifiedBlocks = {}; + const unclassifiedTemplates = {}; + for (const block of unclassifiedBlocksList) { + unclassifiedBlocks[block.height] = block.id; + } + for (const template of unclassifiedTemplatesList) { + unclassifiedTemplates[template.height] = template.id; + } + + logger.debug(`Classifying blocks and templates from #${currentBlockHeight} to #${minHeight}`, logger.tags.goggles); + + for (let height = currentBlockHeight; height >= 0; height--) { + let txs: TransactionExtended[] | null = null; + if (unclassifiedBlocks[height]) { + const blockHash = unclassifiedBlocks[height]; + // fetch transactions + txs = (await bitcoinApi.$getTxsForBlock(blockHash)).map(tx => transactionUtils.extendTransaction(tx)); + // add CPFP + const cpfpSummary = Common.calculateCpfp(height, txs, true); + // classify + const { transactions: classifiedTxs } = this.summarizeBlockTransactions(blockHash, cpfpSummary.transactions); + BlocksSummariesRepository.$saveTransactions(height, blockHash, classifiedTxs, 1); + } + if (unclassifiedTemplates[height]) { + // classify template + const blockHash = unclassifiedTemplates[height]; + const template = await BlocksSummariesRepository.$getTemplate(blockHash); + const alreadyClassified = template?.transactions.reduce((classified, tx) => (classified || tx.flags > 0), false); + let classifiedTemplate = template?.transactions || []; + if (!alreadyClassified) { + const templateTxs: (TransactionExtended | TransactionClassified)[] = []; + const blockTxMap: { [txid: string]: TransactionExtended } = {}; + for (const tx of (txs || [])) { + blockTxMap[tx.txid] = tx; + } + for (const templateTx of (template?.transactions || [])) { + let tx: TransactionExtended | null = blockTxMap[templateTx.txid]; + if (!tx) { + try { + tx = await transactionUtils.$getTransactionExtended(templateTx.txid, false, true, false); + } catch (e) { + // transaction probably not found + } + } + templateTxs.push(tx || templateTx); + } + const cpfpSummary = Common.calculateCpfp(height, txs?.filter(tx => tx.effectiveFeePerVsize != null) as TransactionExtended[], true); + // classify + const { transactions: classifiedTxs } = this.summarizeBlockTransactions(blockHash, cpfpSummary.transactions); + const classifiedTxMap: { [txid: string]: TransactionClassified } = {}; + for (const tx of classifiedTxs) { + classifiedTxMap[tx.txid] = tx; + } + classifiedTemplate = classifiedTemplate.map(tx => { + if (classifiedTxMap[tx.txid]) { + tx.flags = classifiedTxMap[tx.txid].flags || 0; + } + return tx; + }); + } + BlocksSummariesRepository.$saveTemplate({ height, template: { id: blockHash, transactions: classifiedTemplate }, version: 1 }); + } + + // timing & logging + indexedThisRun++; + indexedTotal++; + const elapsedSeconds = (Date.now() - timer) / 1000; + if (elapsedSeconds > 5) { + const perSecond = indexedThisRun / elapsedSeconds; + logger.debug(`Classified #${height}: ${indexedTotal} / ${numToIndex} blocks (${perSecond.toFixed(1)}/s)`); + timer = Date.now(); + indexedThisRun = 0; + } + } + } + /** * [INDEXING] Index all blocks metadata for the mining dashboard */ @@ -966,6 +1075,7 @@ class Blocks { let height = blockHeight; let summary: BlockSummary; + let summaryVersion = 0; if (cpfpSummary && !Common.isLiquid()) { summary = { id: hash, @@ -980,10 +1090,12 @@ class Blocks { }; }), }; + summaryVersion = 1; } else { if (config.MEMPOOL.BACKEND === 'esplora') { const txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx)); summary = this.summarizeBlockTransactions(hash, txs); + summaryVersion = 1; } else { // Call Core RPC const block = await bitcoinClient.getBlock(hash, 2); @@ -998,7 +1110,7 @@ class Blocks { // Index the response if needed if (Common.blocksSummariesIndexingEnabled() === true) { - await BlocksSummariesRepository.$saveTransactions(height, hash, summary.transactions); + await BlocksSummariesRepository.$saveTransactions(height, hash, summary.transactions, summaryVersion); } return summary.transactions; @@ -1114,16 +1226,18 @@ class Blocks { if (cleanBlock.fee_amt_percentiles === null) { let summary; + let summaryVersion = 0; if (config.MEMPOOL.BACKEND === 'esplora') { const txs = (await bitcoinApi.$getTxsForBlock(cleanBlock.hash)).map(tx => transactionUtils.extendTransaction(tx)); summary = this.summarizeBlockTransactions(cleanBlock.hash, txs); + summaryVersion = 1; } else { // Call Core RPC const block = await bitcoinClient.getBlock(cleanBlock.hash, 2); summary = this.summarizeBlock(block); } - await BlocksSummariesRepository.$saveTransactions(cleanBlock.height, cleanBlock.hash, summary.transactions); + await BlocksSummariesRepository.$saveTransactions(cleanBlock.height, cleanBlock.hash, summary.transactions, summaryVersion); cleanBlock.fee_amt_percentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(cleanBlock.hash); } if (cleanBlock.fee_amt_percentiles !== null) { diff --git a/backend/src/api/common.ts b/backend/src/api/common.ts index 0dccdef84..52fa9042b 100644 --- a/backend/src/api/common.ts +++ b/backend/src/api/common.ts @@ -635,12 +635,12 @@ export class Common { } } - static calculateCpfp(height: number, transactions: TransactionExtended[]): CpfpSummary { + static calculateCpfp(height: number, transactions: TransactionExtended[], saveRelatives: boolean = false): CpfpSummary { const clusters: CpfpCluster[] = []; // list of all cpfp clusters in this block const clusterMap: { [txid: string]: CpfpCluster } = {}; // map transactions to their cpfp cluster let clusterTxs: TransactionExtended[] = []; // working list of elements of the current cluster let ancestors: { [txid: string]: boolean } = {}; // working set of ancestors of the current cluster root - const txMap = {}; + const txMap: { [txid: string]: TransactionExtended } = {}; // initialize the txMap for (const tx of transactions) { txMap[tx.txid] = tx; @@ -710,6 +710,15 @@ export class Common { } } } + if (saveRelatives) { + for (const cluster of clusters) { + cluster.txs.forEach((member, index) => { + txMap[member.txid].descendants = cluster.txs.slice(0, index).reverse(); + txMap[member.txid].ancestors = cluster.txs.slice(index + 1).reverse(); + txMap[member.txid].effectiveFeePerVsize = cluster.effectiveFeePerVsize; + }); + } + } return { transactions, clusters, diff --git a/backend/src/api/database-migration.ts b/backend/src/api/database-migration.ts index 89ef7a7be..e83ad4536 100644 --- a/backend/src/api/database-migration.ts +++ b/backend/src/api/database-migration.ts @@ -7,7 +7,7 @@ import cpfpRepository from '../repositories/CpfpRepository'; import { RowDataPacket } from 'mysql2'; class DatabaseMigration { - private static currentVersion = 66; + private static currentVersion = 67; private queryTimeout = 3600_000; private statisticsAddedIndexed = false; private uniqueLogs: string[] = []; @@ -558,6 +558,14 @@ class DatabaseMigration { await this.$executeQuery('ALTER TABLE `statistics` ADD min_fee FLOAT UNSIGNED DEFAULT NULL'); await this.updateToSchemaVersion(66); } + + if (databaseSchemaVersion < 67) { + await this.$executeQuery('ALTER TABLE `blocks_summaries` ADD version INT NOT NULL DEFAULT 0'); + await this.$executeQuery('ALTER TABLE `blocks_summaries` ADD INDEX `version` (`version`)'); + await this.$executeQuery('ALTER TABLE `blocks_templates` ADD version INT NOT NULL DEFAULT 0'); + await this.$executeQuery('ALTER TABLE `blocks_templates` ADD INDEX `version` (`version`)'); + await this.updateToSchemaVersion(67); + } } /** diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 937d4a7c5..5213a6bee 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -703,7 +703,8 @@ class WebsocketHandler { template: { id: block.id, transactions: stripped, - } + }, + version: 1, }); BlocksAuditsRepository.$saveAudit({ diff --git a/backend/src/indexer.ts b/backend/src/indexer.ts index 2e2f8b037..90b4a59e6 100644 --- a/backend/src/indexer.ts +++ b/backend/src/indexer.ts @@ -185,6 +185,7 @@ class Indexer { await blocks.$generateCPFPDatabase(); await blocks.$generateAuditStats(); await auditReplicator.$sync(); + await blocks.$classifyBlocks(); } catch (e) { this.indexerRunning = false; logger.err(`Indexer failed, trying again in 10 seconds. Reason: ` + (e instanceof Error ? e.message : e)); diff --git a/backend/src/logger.ts b/backend/src/logger.ts index 364c529e7..bbd781df6 100644 --- a/backend/src/logger.ts +++ b/backend/src/logger.ts @@ -35,6 +35,7 @@ class Logger { public tags = { mining: 'Mining', ln: 'Lightning', + goggles: 'Goggles', }; // @ts-ignore diff --git a/backend/src/replication/AuditReplication.ts b/backend/src/replication/AuditReplication.ts index 5de9de0da..503c61613 100644 --- a/backend/src/replication/AuditReplication.ts +++ b/backend/src/replication/AuditReplication.ts @@ -105,7 +105,8 @@ class AuditReplication { template: { id: blockHash, transactions: auditSummary.template || [] - } + }, + version: 1, }); await blocksAuditsRepository.$saveAudit({ hash: blockHash, diff --git a/backend/src/repositories/BlocksRepository.ts b/backend/src/repositories/BlocksRepository.ts index a798c40f8..a2a084265 100644 --- a/backend/src/repositories/BlocksRepository.ts +++ b/backend/src/repositories/BlocksRepository.ts @@ -1040,16 +1040,18 @@ class BlocksRepository { if (extras.feePercentiles === null) { let summary; + let summaryVersion = 0; if (config.MEMPOOL.BACKEND === 'esplora') { const txs = (await bitcoinApi.$getTxsForBlock(dbBlk.id)).map(tx => transactionUtils.extendTransaction(tx)); summary = blocks.summarizeBlockTransactions(dbBlk.id, txs); + summaryVersion = 1; } else { // Call Core RPC const block = await bitcoinClient.getBlock(dbBlk.id, 2); summary = blocks.summarizeBlock(block); } - await BlocksSummariesRepository.$saveTransactions(dbBlk.height, dbBlk.id, summary.transactions); + await BlocksSummariesRepository.$saveTransactions(dbBlk.height, dbBlk.id, summary.transactions, summaryVersion); extras.feePercentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(dbBlk.id); } if (extras.feePercentiles !== null) { diff --git a/backend/src/repositories/BlocksSummariesRepository.ts b/backend/src/repositories/BlocksSummariesRepository.ts index a61bf60ed..b440960e0 100644 --- a/backend/src/repositories/BlocksSummariesRepository.ts +++ b/backend/src/repositories/BlocksSummariesRepository.ts @@ -17,30 +17,30 @@ class BlocksSummariesRepository { return undefined; } - public async $saveTransactions(blockHeight: number, blockId: string, transactions: TransactionClassified[]): Promise { + public async $saveTransactions(blockHeight: number, blockId: string, transactions: TransactionClassified[], version: number): Promise { try { const transactionsStr = JSON.stringify(transactions); await DB.query(` INSERT INTO blocks_summaries - SET height = ?, transactions = ?, id = ? + SET height = ?, transactions = ?, id = ?, version = ? ON DUPLICATE KEY UPDATE transactions = ?`, - [blockHeight, transactionsStr, blockId, transactionsStr]); + [blockHeight, transactionsStr, blockId, version, transactionsStr]); } catch (e: any) { logger.debug(`Cannot save block summary transactions for ${blockId}. Reason: ${e instanceof Error ? e.message : e}`); throw e; } } - public async $saveTemplate(params: { height: number, template: BlockSummary}) { + public async $saveTemplate(params: { height: number, template: BlockSummary, version: number}): Promise { const blockId = params.template?.id; try { const transactions = JSON.stringify(params.template?.transactions || []); await DB.query(` - INSERT INTO blocks_templates (id, template) - VALUE (?, ?) + INSERT INTO blocks_templates (id, template, version) + VALUE (?, ?, ?) ON DUPLICATE KEY UPDATE template = ? - `, [blockId, transactions, transactions]); + `, [blockId, transactions, params.version, transactions]); } catch (e: any) { if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart logger.debug(`Cannot save block template for ${blockId} because it has already been indexed, ignoring`); @@ -76,6 +76,41 @@ class BlocksSummariesRepository { return []; } + public async $getSummariesWithVersion(version: number): Promise<{ height: number, id: string }[]> { + try { + const [rows]: any[] = await DB.query(` + SELECT + height, + id + FROM blocks_summaries + WHERE version = ? + ORDER BY height DESC;`, [version]); + return rows; + } catch (e) { + logger.err(`Cannot get block summaries with version. Reason: ` + (e instanceof Error ? e.message : e)); + } + + return []; + } + + public async $getTemplatesWithVersion(version: number): Promise<{ height: number, id: string }[]> { + try { + const [rows]: any[] = await DB.query(` + SELECT + blocks_summaries.height as height, + blocks_templates.id as id + FROM blocks_templates + JOIN blocks_summaries ON blocks_templates.id = blocks_summaries.id + WHERE blocks_templates.version = ? + ORDER BY height DESC;`, [version]); + return rows; + } catch (e) { + logger.err(`Cannot get block summaries with version. Reason: ` + (e instanceof Error ? e.message : e)); + } + + return []; + } + /** * Get the fee percentiles if the block has already been indexed, [] otherwise *