diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index a6c922f03..94e1414a4 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -1042,9 +1042,13 @@ class Blocks { } public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise { - const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters); - if (!result) { - await cpfpRepository.$insertProgressMarker(height); + try { + const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters); + if (!result) { + await cpfpRepository.$insertProgressMarker(height); + } + } catch (e) { + // not a fatal error, we'll try again next time the indexer runs } } } diff --git a/backend/src/database.ts b/backend/src/database.ts index 070774c92..6ad545fda 100644 --- a/backend/src/database.ts +++ b/backend/src/database.ts @@ -30,7 +30,7 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr } public async query(query, params?): Promise<[T, FieldPacket[]]> + OkPacket[] | ResultSetHeader>(query, params?, connection?: PoolConnection): Promise<[T, FieldPacket[]]> { this.checkDBFlag(); let hardTimeout; @@ -45,7 +45,9 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr reject(new Error(`DB query failed to return, reject or time out within ${hardTimeout / 1000}s - ${query?.sql?.slice(0, 160) || (typeof(query) === 'string' || query instanceof String ? query?.slice(0, 160) : 'unknown query')}`)); }, hardTimeout); - this.getPool().then(pool => { + // Use a specific connection if provided, otherwise delegate to the pool + const connectionPromise = connection ? Promise.resolve(connection) : this.getPool(); + connectionPromise.then((pool: PoolConnection | Pool) => { return pool.query(query, params) as Promise<[T, FieldPacket[]]>; }).then(result => { resolve(result); @@ -61,6 +63,33 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr } } + public async $atomicQuery(queries: { query, params }[]): Promise<[T, FieldPacket[]][]> + { + const pool = await this.getPool(); + const connection = await pool.getConnection(); + try { + await connection.beginTransaction(); + + const results: [T, FieldPacket[]][] = []; + for (const query of queries) { + const result = await this.query(query.query, query.params, connection) as [T, FieldPacket[]]; + results.push(result); + } + + await connection.commit(); + + return results; + } catch (e) { + logger.err('Could not complete db transaction, rolling back: ' + (e instanceof Error ? e.message : e)); + connection.rollback(); + connection.release(); + throw e; + } finally { + connection.release(); + } + } + public async checkDbConnection() { this.checkDBFlag(); try { diff --git a/backend/src/repositories/CpfpRepository.ts b/backend/src/repositories/CpfpRepository.ts index 90ba2ac80..b33ff1e4a 100644 --- a/backend/src/repositories/CpfpRepository.ts +++ b/backend/src/repositories/CpfpRepository.ts @@ -5,52 +5,10 @@ import { Ancestor, CpfpCluster } from '../mempool.interfaces'; import transactionRepository from '../repositories/TransactionRepository'; class CpfpRepository { - public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise { - if (!txs[0]) { - return false; - } - // skip clusters of transactions with the same fees - const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100; - const equalFee = txs.length > 1 && txs.reduce((acc, tx) => { - return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee); - }, true); - if (equalFee) { - return false; - } - - try { - const packedTxs = Buffer.from(this.pack(txs)); - await DB.query( - ` - INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate) - VALUE (UNHEX(?), ?, ?, ?) - ON DUPLICATE KEY UPDATE - height = ?, - txs = ?, - fee_rate = ? - `, - [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize] - ); - const maxChunk = 10; - let chunkIndex = 0; - while (chunkIndex < txs.length) { - const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => { - return { txid: tx.txid, cluster: clusterRoot }; - }); - await transactionRepository.$batchSetCluster(chunk); - chunkIndex += maxChunk; - } - return true; - } catch (e: any) { - logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e)); - throw e; - } - } - public async $batchSaveClusters(clusters: { root: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number }[]): Promise { try { - const clusterValues: any[] = []; - const txs: any[] = []; + const clusterValues: [string, number, Buffer, number][] = []; + const txs: { txid: string, cluster: string }[] = []; for (const cluster of clusters) { if (cluster.txs?.length) { @@ -76,6 +34,8 @@ class CpfpRepository { return false; } + const queries: { query, params }[] = []; + const maxChunk = 100; let chunkIndex = 0; // insert clusters in batches of up to 100 rows @@ -89,10 +49,10 @@ class CpfpRepository { return (' (UNHEX(?), ?, ?, ?)'); }) + ';'; const values = chunk.flat(); - await DB.query( + queries.push({ query, - values - ); + params: values, + }); chunkIndex += maxChunk; } @@ -100,10 +60,12 @@ class CpfpRepository { // insert transactions in batches of up to 100 rows while (chunkIndex < txs.length) { const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk); - await transactionRepository.$batchSetCluster(chunk); + queries.push(transactionRepository.buildBatchSetQuery(chunk)); chunkIndex += maxChunk; } + await DB.$atomicQuery(queries); + return true; } catch (e: any) { logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e)); diff --git a/backend/src/repositories/TransactionRepository.ts b/backend/src/repositories/TransactionRepository.ts index bde95df9b..b5067f790 100644 --- a/backend/src/repositories/TransactionRepository.ts +++ b/backend/src/repositories/TransactionRepository.ts @@ -25,9 +25,8 @@ class TransactionRepository { } } - public async $batchSetCluster(txs): Promise { - try { - let query = ` + public buildBatchSetQuery(txs: { txid: string, cluster: string }[]): { query, params } { + let query = ` INSERT IGNORE INTO compact_transactions ( txid, @@ -35,13 +34,22 @@ class TransactionRepository { ) VALUES `; - query += txs.map(tx => { - return (' (UNHEX(?), UNHEX(?))'); - }) + ';'; - const values = txs.map(tx => [tx.txid, tx.cluster]).flat(); + query += txs.map(tx => { + return (' (UNHEX(?), UNHEX(?))'); + }) + ';'; + const values = txs.map(tx => [tx.txid, tx.cluster]).flat(); + return { + query, + params: values, + }; + } + + public async $batchSetCluster(txs): Promise { + try { + const query = this.buildBatchSetQuery(txs); await DB.query( - query, - values + query.query, + query.params, ); } catch (e: any) { logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));