diff --git a/backend/src/api/database-migration.ts b/backend/src/api/database-migration.ts index 7a148af5a..5768cd68d 100644 --- a/backend/src/api/database-migration.ts +++ b/backend/src/api/database-migration.ts @@ -695,7 +695,7 @@ class DatabaseMigration { if (databaseSchemaVersion < 81) { await this.$executeQuery('ALTER TABLE `blocks_audits` ADD version INT NOT NULL DEFAULT 0'); await this.$executeQuery('ALTER TABLE `blocks_audits` ADD INDEX `version` (`version`)'); - await this.$executeQuery('ALTER TABLE `blocks_audits` ADD seen_txs JSON DEFAULT "[]"'); + await this.$executeQuery('ALTER TABLE `blocks_audits` ADD unseen_txs JSON DEFAULT "[]"'); await this.updateToSchemaVersion(81); } } diff --git a/backend/src/indexer.ts b/backend/src/indexer.ts index 0dd1090b8..dfd7f1317 100644 --- a/backend/src/indexer.ts +++ b/backend/src/indexer.ts @@ -10,6 +10,7 @@ import config from './config'; import auditReplicator from './replication/AuditReplication'; import statisticsReplicator from './replication/StatisticsReplication'; import AccelerationRepository from './repositories/AccelerationRepository'; +import BlocksAuditsRepository from './repositories/BlocksAuditsRepository'; export interface CoreIndex { name: string; @@ -192,6 +193,7 @@ class Indexer { await auditReplicator.$sync(); await statisticsReplicator.$sync(); await AccelerationRepository.$indexPastAccelerations(); + await BlocksAuditsRepository.$migrateAuditsV0toV1(); // do not wait for classify blocks to finish blocks.$classifyBlocks(); } catch (e) { diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 2dd0f17dd..ed1b3b445 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -29,9 +29,11 @@ export interface PoolStats extends PoolInfo { } export interface BlockAudit { + version: number, time: number, height: number, hash: string, + unseenTxs: string[], missingTxs: string[], freshTxs: string[], sigopTxs: string[], diff --git a/backend/src/repositories/BlocksAuditsRepository.ts b/backend/src/repositories/BlocksAuditsRepository.ts index 1e0d28689..cddd535de 100644 --- a/backend/src/repositories/BlocksAuditsRepository.ts +++ b/backend/src/repositories/BlocksAuditsRepository.ts @@ -1,13 +1,24 @@ -import blocks from '../api/blocks'; import DB from '../database'; import logger from '../logger'; -import { BlockAudit, AuditScore, TransactionAudit } from '../mempool.interfaces'; +import bitcoinApi from '../api/bitcoin/bitcoin-api-factory'; +import { BlockAudit, AuditScore, TransactionAudit, TransactionStripped } from '../mempool.interfaces'; + +interface MigrationAudit { + version: number, + height: number, + id: string, + timestamp: number, + prioritizedTxs: string[], + acceleratedTxs: string[], + template: TransactionStripped[], + transactions: TransactionStripped[], +} class BlocksAuditRepositories { public async $saveAudit(audit: BlockAudit): Promise { try { - await DB.query(`INSERT INTO blocks_audits(time, height, hash, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight) - VALUE (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [audit.time, audit.height, audit.hash, JSON.stringify(audit.missingTxs), + await DB.query(`INSERT INTO blocks_audits(version, time, height, hash, seen_txs, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight) + VALUE (?, FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [audit.version, audit.time, audit.height, audit.hash, JSON.stringify(audit.missingTxs), JSON.stringify(audit.addedTxs), JSON.stringify(audit.prioritizedTxs), JSON.stringify(audit.freshTxs), JSON.stringify(audit.sigopTxs), JSON.stringify(audit.fullrbfTxs), JSON.stringify(audit.acceleratedTxs), audit.matchRate, audit.expectedFees, audit.expectedWeight]); } catch (e: any) { if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart @@ -62,24 +73,30 @@ class BlocksAuditRepositories { public async $getBlockAudit(hash: string): Promise { try { const [rows]: any[] = await DB.query( - `SELECT blocks_audits.height, blocks_audits.hash as id, UNIX_TIMESTAMP(blocks_audits.time) as timestamp, - template, - missing_txs as missingTxs, - added_txs as addedTxs, - prioritized_txs as prioritizedTxs, - fresh_txs as freshTxs, - sigop_txs as sigopTxs, - fullrbf_txs as fullrbfTxs, - accelerated_txs as acceleratedTxs, - match_rate as matchRate, - expected_fees as expectedFees, - expected_weight as expectedWeight + `SELECT + blocks_audits.version, + blocks_audits.height, + blocks_audits.hash as id, + UNIX_TIMESTAMP(blocks_audits.time) as timestamp, + template, + unseen_txs as unseenTxs, + missing_txs as missingTxs, + added_txs as addedTxs, + prioritized_txs as prioritizedTxs, + fresh_txs as freshTxs, + sigop_txs as sigopTxs, + fullrbf_txs as fullrbfTxs, + accelerated_txs as acceleratedTxs, + match_rate as matchRate, + expected_fees as expectedFees, + expected_weight as expectedWeight FROM blocks_audits JOIN blocks_templates ON blocks_templates.id = blocks_audits.hash WHERE blocks_audits.hash = ? `, [hash]); if (rows.length) { + rows[0].unseenTxs = JSON.parse(rows[0].unseenTxs); rows[0].missingTxs = JSON.parse(rows[0].missingTxs); rows[0].addedTxs = JSON.parse(rows[0].addedTxs); rows[0].prioritizedTxs = JSON.parse(rows[0].prioritizedTxs); @@ -101,7 +118,7 @@ class BlocksAuditRepositories { public async $getBlockTxAudit(hash: string, txid: string): Promise { try { const blockAudit = await this.$getBlockAudit(hash); - + if (blockAudit) { const isAdded = blockAudit.addedTxs.includes(txid); const isPrioritized = blockAudit.prioritizedTxs.includes(txid); @@ -124,7 +141,7 @@ class BlocksAuditRepositories { conflict: isConflict, accelerated: isAccelerated, firstSeen, - } + }; } return null; } catch (e: any) { @@ -186,6 +203,79 @@ class BlocksAuditRepositories { throw e; } } + + /** + * [INDEXING] Migrate audits from v0 to v1 + */ + public async $migrateAuditsV0toV1(): Promise { + try { + const [toMigrate]: MigrationAudit[][] = await DB.query( + `SELECT + blocks_audits.height as height, + blocks_audits.hash as id, + UNIX_TIMESTAMP(blocks_audits.time) as timestamp, + blocks_summaries.transactions as transactions, + blocks_templates.template as template, + blocks_audits.prioritized_txs as prioritizedTxs, + blocks_audits.accelerated_txs as acceleratedTxs + FROM blocks_audits + JOIN blocks_summaries ON blocks_summaries.id = blocks_audits.hash + JOIN blocks_templates ON blocks_templates.id = blocks_audits.hash + WHERE blocks_audits.version = 0 + AND blocks_summaries.version = 2 + ORDER BY blocks_audits.height DESC + `) as any[]; + + logger.info(`migrating ${toMigrate.length} audits to version 1`); + + for (const audit of toMigrate) { + // unpack JSON-serialized transaction lists + audit.transactions = JSON.parse((audit.transactions as any as string) || '[]'); + audit.template = JSON.parse((audit.transactions as any as string) || '[]'); + + // we know transactions in the template, or marked "prioritized" or "accelerated" + // were seen in our mempool before the block was mined. + const isSeen = new Set(); + for (const tx of audit.template) { + isSeen.add(tx.txid); + } + for (const txid of audit.prioritizedTxs) { + isSeen.add(txid); + } + for (const txid of audit.acceleratedTxs) { + isSeen.add(txid); + } + const unseenTxs = audit.transactions.slice(0).map(tx => tx.txid).filter(txid => !isSeen.has(txid)); + + // identify "prioritized" transactions + const prioritizedTxs: string[] = []; + let lastEffectiveRate = 0; + // Iterate over the mined template from bottom to top (excluding the coinbase) + // Transactions should appear in ascending order of mining priority. + for (let i = audit.transactions.length - 1; i > 0; i--) { + const blockTx = audit.transactions[i]; + // If a tx has a lower in-band effective fee rate than the previous tx, + // it must have been prioritized out-of-band (in order to have a higher mining priority) + // so exclude from the analysis. + if ((blockTx.rate || 0) < lastEffectiveRate) { + prioritizedTxs.push(blockTx.txid); + } else { + lastEffectiveRate = blockTx.rate || 0; + } + } + + // Update audit in the database + await DB.query(` + UPDATE blocks_audits SET + unseen_txs = ?, + prioritized_txs = ? + WHERE hash = ? + `, [JSON.stringify(unseenTxs), JSON.stringify(prioritizedTxs), audit.id]); + } + } catch (e: any) { + logger.err(`Error while migrating audits from v0 to v1. Will try again later. Reason: ` + (e instanceof Error ? e.message : e)); + } + } } export default new BlocksAuditRepositories();