compact schemas for cpfp tables
This commit is contained in:
		
							parent
							
								
									7cc11e622c
								
							
						
					
					
						commit
						c68d9d8a13
					
				@ -22,12 +22,10 @@ import poolsParser from './pools-parser';
 | 
			
		||||
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
 | 
			
		||||
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
 | 
			
		||||
import cpfpRepository from '../repositories/CpfpRepository';
 | 
			
		||||
import transactionRepository from '../repositories/TransactionRepository';
 | 
			
		||||
import mining from './mining/mining';
 | 
			
		||||
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
 | 
			
		||||
import PricesRepository from '../repositories/PricesRepository';
 | 
			
		||||
import priceUpdater from '../tasks/price-updater';
 | 
			
		||||
import { Block } from 'bitcoinjs-lib';
 | 
			
		||||
 | 
			
		||||
class Blocks {
 | 
			
		||||
  private blocks: BlockExtended[] = [];
 | 
			
		||||
@ -753,10 +751,8 @@ class Blocks {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $indexCPFP(hash: string, height: number): Promise<void> {
 | 
			
		||||
    let transactions;
 | 
			
		||||
 
 | 
			
		||||
    const block = await bitcoinClient.getBlock(hash, 2);
 | 
			
		||||
    transactions = block.tx.map(tx => {
 | 
			
		||||
    const transactions = block.tx.map(tx => {
 | 
			
		||||
      tx.vsize = tx.weight / 4;
 | 
			
		||||
      tx.fee *= 100_000_000;
 | 
			
		||||
      return tx;
 | 
			
		||||
@ -775,9 +771,12 @@ class Blocks {
 | 
			
		||||
        });
 | 
			
		||||
        const effectiveFeePerVsize = totalFee / totalVSize;
 | 
			
		||||
        if (cluster.length > 1) {
 | 
			
		||||
          await cpfpRepository.$saveCluster(height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
 | 
			
		||||
          for (const tx of cluster) {
 | 
			
		||||
            await transactionRepository.$setCluster(tx.txid, cluster[0].txid);
 | 
			
		||||
          const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
 | 
			
		||||
          const equalFee = cluster.reduce((acc, tx) => {
 | 
			
		||||
            return (acc && Math.round(((tx.fee || 0) / tx.vsize) * 100) / 100 === roundedEffectiveFee);
 | 
			
		||||
          }, true);
 | 
			
		||||
          if (!equalFee) {
 | 
			
		||||
            await cpfpRepository.$saveCluster(cluster[0].txid, height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        cluster = [];
 | 
			
		||||
 | 
			
		||||
@ -445,6 +445,8 @@ class DatabaseMigration {
 | 
			
		||||
 | 
			
		||||
    if (databaseSchemaVersion < 50 && isBitcoin === true) {
 | 
			
		||||
      await this.$executeQuery('ALTER TABLE `blocks` DROP COLUMN `cpfp_indexed`');
 | 
			
		||||
      await this.$executeQuery(this.getCreateCompactCPFPTableQuery(), await this.$checkIfTableExists('compact_cpfp_clusters'));
 | 
			
		||||
      await this.$executeQuery(this.getCreateCompactTransactionsTableQuery(), await this.$checkIfTableExists('compact_transactions'));
 | 
			
		||||
      await this.updateToSchemaVersion(50);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -918,6 +920,25 @@ class DatabaseMigration {
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getCreateCompactCPFPTableQuery(): string {
 | 
			
		||||
    return `CREATE TABLE IF NOT EXISTS compact_cpfp_clusters (
 | 
			
		||||
      root binary(32) NOT NULL,
 | 
			
		||||
      height int(10) NOT NULL,
 | 
			
		||||
      txs BLOB DEFAULT NULL,
 | 
			
		||||
      fee_rate float unsigned NOT NULL,
 | 
			
		||||
      PRIMARY KEY (root),
 | 
			
		||||
      INDEX (height)
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getCreateCompactTransactionsTableQuery(): string {
 | 
			
		||||
    return `CREATE TABLE IF NOT EXISTS compact_transactions (
 | 
			
		||||
      txid binary(32) NOT NULL,
 | 
			
		||||
      cluster binary(32) DEFAULT NULL,
 | 
			
		||||
      PRIMARY KEY (txid)
 | 
			
		||||
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $truncateIndexedData(tables: string[]) {
 | 
			
		||||
    const allowedTables = ['blocks', 'hashrates', 'prices'];
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -671,7 +671,7 @@ class BlocksRepository {
 | 
			
		||||
    try {
 | 
			
		||||
      const blockchainInfo = await bitcoinClient.getBlockchainInfo();
 | 
			
		||||
      const currentBlockHeight = blockchainInfo.blocks;
 | 
			
		||||
      const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from cpfp_clusters`);
 | 
			
		||||
      const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from compact_cpfp_clusters`);
 | 
			
		||||
      const lastHeight = (lastHeightRows.length && lastHeightRows[0].minHeight != null) ? lastHeightRows[0].minHeight : currentBlockHeight;
 | 
			
		||||
 | 
			
		||||
      let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, blockchainInfo.blocks);
 | 
			
		||||
 | 
			
		||||
@ -1,34 +1,72 @@
 | 
			
		||||
import cluster, { Cluster } from 'cluster';
 | 
			
		||||
import { RowDataPacket } from 'mysql2';
 | 
			
		||||
import DB from '../database';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { Ancestor } from '../mempool.interfaces';
 | 
			
		||||
import transactionRepository from '../repositories/TransactionRepository';
 | 
			
		||||
 | 
			
		||||
class CpfpRepository {
 | 
			
		||||
  public async $saveCluster(height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
 | 
			
		||||
  public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
 | 
			
		||||
    if (!txs[0]) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    try {
 | 
			
		||||
      const txsJson = JSON.stringify(txs);
 | 
			
		||||
      const packedTxs = Buffer.from(this.pack(txs));
 | 
			
		||||
      await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          INSERT INTO cpfp_clusters(root, height, txs, fee_rate)
 | 
			
		||||
          VALUE (?, ?, ?, ?)
 | 
			
		||||
          INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
 | 
			
		||||
          VALUE (UNHEX(?), ?, ?, ?)
 | 
			
		||||
          ON DUPLICATE KEY UPDATE
 | 
			
		||||
            height = ?,
 | 
			
		||||
            txs = ?,
 | 
			
		||||
            fee_rate = ?
 | 
			
		||||
        `,
 | 
			
		||||
        [txs[0].txid, height, txsJson, effectiveFeePerVsize, height, txsJson, effectiveFeePerVsize, height]
 | 
			
		||||
        [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
 | 
			
		||||
      );
 | 
			
		||||
      for (const tx of txs) {
 | 
			
		||||
        await transactionRepository.$setCluster(tx.txid, clusterRoot);
 | 
			
		||||
      }
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
      throw e;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $getCluster(clusterRoot: string): Promise<Cluster> {
 | 
			
		||||
    const [clusterRows]: any = await DB.query(
 | 
			
		||||
      `
 | 
			
		||||
        SELECT *
 | 
			
		||||
        FROM compact_cpfp_clusters
 | 
			
		||||
        WHERE root = UNHEX(?)
 | 
			
		||||
      `,
 | 
			
		||||
      [clusterRoot]
 | 
			
		||||
    );
 | 
			
		||||
    const cluster = clusterRows[0];
 | 
			
		||||
    cluster.txs = this.unpack(cluster.txs);
 | 
			
		||||
    return cluster;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $deleteClustersFrom(height: number): Promise<void> {
 | 
			
		||||
    logger.info(`Delete newer cpfp clusters from height ${height} from the database`);
 | 
			
		||||
    try {
 | 
			
		||||
      const [rows] = await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          SELECT txs, height, root from compact_cpfp_clusters
 | 
			
		||||
          WHERE height >= ?
 | 
			
		||||
        `,
 | 
			
		||||
        [height]
 | 
			
		||||
      ) as RowDataPacket[][];
 | 
			
		||||
      if (rows?.length) {
 | 
			
		||||
        for (let clusterToDelete of rows) {
 | 
			
		||||
          const txs = this.unpack(clusterToDelete.txs);
 | 
			
		||||
          for (let tx of txs) {
 | 
			
		||||
            await transactionRepository.$removeTransaction(tx.txid);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          DELETE from cpfp_clusters
 | 
			
		||||
          DELETE from compact_cpfp_clusters
 | 
			
		||||
          WHERE height >= ?
 | 
			
		||||
        `,
 | 
			
		||||
        [height]
 | 
			
		||||
@ -38,6 +76,42 @@ class CpfpRepository {
 | 
			
		||||
      throw e;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public pack(txs: Ancestor[]): ArrayBuffer {
 | 
			
		||||
    const buf = new ArrayBuffer(44 * txs.length);
 | 
			
		||||
    const view = new DataView(buf);
 | 
			
		||||
    txs.forEach((tx, i) => {
 | 
			
		||||
      const offset = i * 44;
 | 
			
		||||
      for (let x = 0; x < 32; x++) {
 | 
			
		||||
        // store txid in little-endian
 | 
			
		||||
        view.setUint8(offset + (31 - x), parseInt(tx.txid.slice(x * 2, (x * 2) + 2), 16));
 | 
			
		||||
      }
 | 
			
		||||
      view.setUint32(offset + 32, tx.weight);
 | 
			
		||||
      view.setBigUint64(offset + 36, BigInt(Math.round(tx.fee)));
 | 
			
		||||
    });
 | 
			
		||||
    return buf;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public unpack(buf: Buffer): Ancestor[] {
 | 
			
		||||
    if (!buf) {
 | 
			
		||||
      return [];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const arrayBuffer = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
 | 
			
		||||
    const txs: Ancestor[] = [];
 | 
			
		||||
    const view = new DataView(arrayBuffer);
 | 
			
		||||
    for (let offset = 0; offset < arrayBuffer.byteLength; offset += 44) {
 | 
			
		||||
      const txid = Array.from(new Uint8Array(arrayBuffer, offset, 32)).reverse().map(b => b.toString(16).padStart(2, '0')).join('');
 | 
			
		||||
      const weight = view.getUint32(offset + 32);
 | 
			
		||||
      const fee = Number(view.getBigUint64(offset + 36));
 | 
			
		||||
      txs.push({
 | 
			
		||||
        txid,
 | 
			
		||||
        weight,
 | 
			
		||||
        fee
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
    return txs;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export default new CpfpRepository();
 | 
			
		||||
@ -1,6 +1,7 @@
 | 
			
		||||
import DB from '../database';
 | 
			
		||||
import logger from '../logger';
 | 
			
		||||
import { Ancestor, CpfpInfo } from '../mempool.interfaces';
 | 
			
		||||
import cpfpRepository from './CpfpRepository';
 | 
			
		||||
 | 
			
		||||
interface CpfpSummary {
 | 
			
		||||
  txid: string;
 | 
			
		||||
@ -12,20 +13,20 @@ interface CpfpSummary {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class TransactionRepository {
 | 
			
		||||
  public async $setCluster(txid: string, cluster: string): Promise<void> {
 | 
			
		||||
  public async $setCluster(txid: string, clusterRoot: string): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          INSERT INTO transactions
 | 
			
		||||
          INSERT INTO compact_transactions
 | 
			
		||||
          (
 | 
			
		||||
            txid,
 | 
			
		||||
            cluster
 | 
			
		||||
          )
 | 
			
		||||
          VALUE (?, ?)
 | 
			
		||||
          VALUE (UNHEX(?), UNHEX(?))
 | 
			
		||||
          ON DUPLICATE KEY UPDATE
 | 
			
		||||
            cluster = ?
 | 
			
		||||
            cluster = UNHEX(?)
 | 
			
		||||
        ;`,
 | 
			
		||||
        [txid, cluster, cluster]
 | 
			
		||||
        [txid, clusterRoot, clusterRoot]
 | 
			
		||||
      );
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
@ -35,18 +36,19 @@ class TransactionRepository {
 | 
			
		||||
 | 
			
		||||
  public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
 | 
			
		||||
    try {
 | 
			
		||||
      let query = `
 | 
			
		||||
        SELECT *
 | 
			
		||||
        FROM transactions
 | 
			
		||||
        LEFT JOIN cpfp_clusters AS cluster ON cluster.root = transactions.cluster
 | 
			
		||||
        WHERE transactions.txid = ?
 | 
			
		||||
      `;
 | 
			
		||||
      const [rows]: any = await DB.query(query, [txid]);
 | 
			
		||||
      if (rows.length) {
 | 
			
		||||
        rows[0].txs = JSON.parse(rows[0].txs) as Ancestor[];
 | 
			
		||||
        if (rows[0]?.txs?.length) {
 | 
			
		||||
          return this.convertCpfp(rows[0]);
 | 
			
		||||
        }
 | 
			
		||||
      const [txRows]: any = await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          SELECT HEX(txid) as id, HEX(cluster) as root
 | 
			
		||||
          FROM compact_transactions
 | 
			
		||||
          WHERE txid = UNHEX(?)
 | 
			
		||||
        `,
 | 
			
		||||
        [txid]
 | 
			
		||||
      );
 | 
			
		||||
      if (txRows.length && txRows[0].root != null) {
 | 
			
		||||
        const txid = txRows[0].id.toLowerCase();
 | 
			
		||||
        const clusterId = txRows[0].root.toLowerCase();
 | 
			
		||||
        const cluster = await cpfpRepository.$getCluster(clusterId);
 | 
			
		||||
        return this.convertCpfp(txid, cluster);
 | 
			
		||||
      }
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e));
 | 
			
		||||
@ -54,12 +56,23 @@ class TransactionRepository {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private convertCpfp(cpfp: CpfpSummary): CpfpInfo {
 | 
			
		||||
  public async $removeTransaction(txid: string): Promise<void> {
 | 
			
		||||
    await DB.query(
 | 
			
		||||
      `
 | 
			
		||||
        DELETE FROM compact_transactions
 | 
			
		||||
        WHERE txid = UNHEX(?)
 | 
			
		||||
      `,
 | 
			
		||||
      [txid]
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private convertCpfp(txid, cluster): CpfpInfo {
 | 
			
		||||
    const descendants: Ancestor[] = [];
 | 
			
		||||
    const ancestors: Ancestor[] = [];
 | 
			
		||||
    let matched = false;
 | 
			
		||||
    for (const tx of cpfp.txs) {
 | 
			
		||||
      if (tx.txid === cpfp.txid) {
 | 
			
		||||
 | 
			
		||||
    for (const tx of cluster.txs) {
 | 
			
		||||
      if (tx.txid === txid) {
 | 
			
		||||
        matched = true;
 | 
			
		||||
      } else if (!matched) {
 | 
			
		||||
        descendants.push(tx);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user