From b50936f001406c3465b7350a3d54cf26c517eddb Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 9 Jan 2023 10:24:12 -0600 Subject: [PATCH] compact schemas for cpfp tables --- backend/src/api/blocks.ts | 15 ++-- backend/src/api/database-migration.ts | 21 +++++ backend/src/repositories/BlocksRepository.ts | 2 +- backend/src/repositories/CpfpRepository.ts | 86 +++++++++++++++++-- .../src/repositories/TransactionRepository.ts | 53 +++++++----- 5 files changed, 142 insertions(+), 35 deletions(-) diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index a782dbacc..2b1f88518 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -22,12 +22,10 @@ import poolsParser from './pools-parser'; import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository'; import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository'; import cpfpRepository from '../repositories/CpfpRepository'; -import transactionRepository from '../repositories/TransactionRepository'; import mining from './mining/mining'; import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository'; import PricesRepository from '../repositories/PricesRepository'; import priceUpdater from '../tasks/price-updater'; -import { Block } from 'bitcoinjs-lib'; class Blocks { private blocks: BlockExtended[] = []; @@ -753,10 +751,8 @@ class Blocks { } public async $indexCPFP(hash: string, height: number): Promise { - let transactions; - const block = await bitcoinClient.getBlock(hash, 2); - transactions = block.tx.map(tx => { + const transactions = block.tx.map(tx => { tx.vsize = tx.weight / 4; tx.fee *= 100_000_000; return tx; @@ -775,9 +771,12 @@ class Blocks { }); const effectiveFeePerVsize = totalFee / totalVSize; if (cluster.length > 1) { - await cpfpRepository.$saveCluster(height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize); - for (const tx of cluster) { - await transactionRepository.$setCluster(tx.txid, cluster[0].txid); + const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100; + const equalFee = cluster.reduce((acc, tx) => { + return (acc && Math.round(((tx.fee || 0) / tx.vsize) * 100) / 100 === roundedEffectiveFee); + }, true); + if (!equalFee) { + await cpfpRepository.$saveCluster(cluster[0].txid, height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize); } } cluster = []; diff --git a/backend/src/api/database-migration.ts b/backend/src/api/database-migration.ts index 68087fdb5..6d7901ffa 100644 --- a/backend/src/api/database-migration.ts +++ b/backend/src/api/database-migration.ts @@ -445,6 +445,8 @@ class DatabaseMigration { if (databaseSchemaVersion < 50 && isBitcoin === true) { await this.$executeQuery('ALTER TABLE `blocks` DROP COLUMN `cpfp_indexed`'); + await this.$executeQuery(this.getCreateCompactCPFPTableQuery(), await this.$checkIfTableExists('compact_cpfp_clusters')); + await this.$executeQuery(this.getCreateCompactTransactionsTableQuery(), await this.$checkIfTableExists('compact_transactions')); await this.updateToSchemaVersion(50); } } @@ -918,6 +920,25 @@ class DatabaseMigration { ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; } + private getCreateCompactCPFPTableQuery(): string { + return `CREATE TABLE IF NOT EXISTS compact_cpfp_clusters ( + root binary(32) NOT NULL, + height int(10) NOT NULL, + txs BLOB DEFAULT NULL, + fee_rate float unsigned NOT NULL, + PRIMARY KEY (root), + INDEX (height) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + } + + private getCreateCompactTransactionsTableQuery(): string { + return `CREATE TABLE IF NOT EXISTS compact_transactions ( + txid binary(32) NOT NULL, + cluster binary(32) DEFAULT NULL, + PRIMARY KEY (txid) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`; + } + public async $truncateIndexedData(tables: string[]) { const allowedTables = ['blocks', 'hashrates', 'prices']; diff --git a/backend/src/repositories/BlocksRepository.ts b/backend/src/repositories/BlocksRepository.ts index 5c52a8ba0..6cd5d2785 100644 --- a/backend/src/repositories/BlocksRepository.ts +++ b/backend/src/repositories/BlocksRepository.ts @@ -671,7 +671,7 @@ class BlocksRepository { try { const blockchainInfo = await bitcoinClient.getBlockchainInfo(); const currentBlockHeight = blockchainInfo.blocks; - const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from cpfp_clusters`); + const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from compact_cpfp_clusters`); const lastHeight = (lastHeightRows.length && lastHeightRows[0].minHeight != null) ? lastHeightRows[0].minHeight : currentBlockHeight; let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, blockchainInfo.blocks); diff --git a/backend/src/repositories/CpfpRepository.ts b/backend/src/repositories/CpfpRepository.ts index 563e6ede1..a5f17540a 100644 --- a/backend/src/repositories/CpfpRepository.ts +++ b/backend/src/repositories/CpfpRepository.ts @@ -1,34 +1,72 @@ +import cluster, { Cluster } from 'cluster'; +import { RowDataPacket } from 'mysql2'; import DB from '../database'; import logger from '../logger'; import { Ancestor } from '../mempool.interfaces'; +import transactionRepository from '../repositories/TransactionRepository'; class CpfpRepository { - public async $saveCluster(height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise { + public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise { + if (!txs[0]) { + return; + } try { - const txsJson = JSON.stringify(txs); + const packedTxs = Buffer.from(this.pack(txs)); await DB.query( ` - INSERT INTO cpfp_clusters(root, height, txs, fee_rate) - VALUE (?, ?, ?, ?) + INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate) + VALUE (UNHEX(?), ?, ?, ?) ON DUPLICATE KEY UPDATE height = ?, txs = ?, fee_rate = ? `, - [txs[0].txid, height, txsJson, effectiveFeePerVsize, height, txsJson, effectiveFeePerVsize, height] + [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize] ); + for (const tx of txs) { + await transactionRepository.$setCluster(tx.txid, clusterRoot); + } } catch (e: any) { logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e)); throw e; } } + public async $getCluster(clusterRoot: string): Promise { + const [clusterRows]: any = await DB.query( + ` + SELECT * + FROM compact_cpfp_clusters + WHERE root = UNHEX(?) + `, + [clusterRoot] + ); + const cluster = clusterRows[0]; + cluster.txs = this.unpack(cluster.txs); + return cluster; + } + public async $deleteClustersFrom(height: number): Promise { logger.info(`Delete newer cpfp clusters from height ${height} from the database`); try { + const [rows] = await DB.query( + ` + SELECT txs, height, root from compact_cpfp_clusters + WHERE height >= ? + `, + [height] + ) as RowDataPacket[][]; + if (rows?.length) { + for (let clusterToDelete of rows) { + const txs = this.unpack(clusterToDelete.txs); + for (let tx of txs) { + await transactionRepository.$removeTransaction(tx.txid); + } + } + } await DB.query( ` - DELETE from cpfp_clusters + DELETE from compact_cpfp_clusters WHERE height >= ? `, [height] @@ -38,6 +76,42 @@ class CpfpRepository { throw e; } } + + public pack(txs: Ancestor[]): ArrayBuffer { + const buf = new ArrayBuffer(44 * txs.length); + const view = new DataView(buf); + txs.forEach((tx, i) => { + const offset = i * 44; + for (let x = 0; x < 32; x++) { + // store txid in little-endian + view.setUint8(offset + (31 - x), parseInt(tx.txid.slice(x * 2, (x * 2) + 2), 16)); + } + view.setUint32(offset + 32, tx.weight); + view.setBigUint64(offset + 36, BigInt(Math.round(tx.fee))); + }); + return buf; + } + + public unpack(buf: Buffer): Ancestor[] { + if (!buf) { + return []; + } + + const arrayBuffer = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); + const txs: Ancestor[] = []; + const view = new DataView(arrayBuffer); + for (let offset = 0; offset < arrayBuffer.byteLength; offset += 44) { + const txid = Array.from(new Uint8Array(arrayBuffer, offset, 32)).reverse().map(b => b.toString(16).padStart(2, '0')).join(''); + const weight = view.getUint32(offset + 32); + const fee = Number(view.getBigUint64(offset + 36)); + txs.push({ + txid, + weight, + fee + }); + } + return txs; + } } export default new CpfpRepository(); \ No newline at end of file diff --git a/backend/src/repositories/TransactionRepository.ts b/backend/src/repositories/TransactionRepository.ts index 74debb833..2d05f0e14 100644 --- a/backend/src/repositories/TransactionRepository.ts +++ b/backend/src/repositories/TransactionRepository.ts @@ -1,6 +1,7 @@ import DB from '../database'; import logger from '../logger'; import { Ancestor, CpfpInfo } from '../mempool.interfaces'; +import cpfpRepository from './CpfpRepository'; interface CpfpSummary { txid: string; @@ -12,20 +13,20 @@ interface CpfpSummary { } class TransactionRepository { - public async $setCluster(txid: string, cluster: string): Promise { + public async $setCluster(txid: string, clusterRoot: string): Promise { try { await DB.query( ` - INSERT INTO transactions + INSERT INTO compact_transactions ( txid, cluster ) - VALUE (?, ?) + VALUE (UNHEX(?), UNHEX(?)) ON DUPLICATE KEY UPDATE - cluster = ? + cluster = UNHEX(?) ;`, - [txid, cluster, cluster] + [txid, clusterRoot, clusterRoot] ); } catch (e: any) { logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e)); @@ -35,18 +36,19 @@ class TransactionRepository { public async $getCpfpInfo(txid: string): Promise { try { - let query = ` - SELECT * - FROM transactions - LEFT JOIN cpfp_clusters AS cluster ON cluster.root = transactions.cluster - WHERE transactions.txid = ? - `; - const [rows]: any = await DB.query(query, [txid]); - if (rows.length) { - rows[0].txs = JSON.parse(rows[0].txs) as Ancestor[]; - if (rows[0]?.txs?.length) { - return this.convertCpfp(rows[0]); - } + const [txRows]: any = await DB.query( + ` + SELECT HEX(txid) as id, HEX(cluster) as root + FROM compact_transactions + WHERE txid = UNHEX(?) + `, + [txid] + ); + if (txRows.length && txRows[0].root != null) { + const txid = txRows[0].id.toLowerCase(); + const clusterId = txRows[0].root.toLowerCase(); + const cluster = await cpfpRepository.$getCluster(clusterId); + return this.convertCpfp(txid, cluster); } } catch (e) { logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e)); @@ -54,12 +56,23 @@ class TransactionRepository { } } - private convertCpfp(cpfp: CpfpSummary): CpfpInfo { + public async $removeTransaction(txid: string): Promise { + await DB.query( + ` + DELETE FROM compact_transactions + WHERE txid = UNHEX(?) + `, + [txid] + ); + } + + private convertCpfp(txid, cluster): CpfpInfo { const descendants: Ancestor[] = []; const ancestors: Ancestor[] = []; let matched = false; - for (const tx of cpfp.txs) { - if (tx.txid === cpfp.txid) { + + for (const tx of cluster.txs) { + if (tx.txid === txid) { matched = true; } else if (!matched) { descendants.push(tx);