Make cpfp db save operations atomic
This commit is contained in:
		
							parent
							
								
									b15a89f676
								
							
						
					
					
						commit
						2448090546
					
				@ -1050,9 +1050,13 @@ class Blocks {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise<void> {
 | 
			
		||||
    const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
 | 
			
		||||
    if (!result) {
 | 
			
		||||
      await cpfpRepository.$insertProgressMarker(height);
 | 
			
		||||
    try {
 | 
			
		||||
      const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
 | 
			
		||||
      if (!result) {
 | 
			
		||||
        await cpfpRepository.$insertProgressMarker(height);
 | 
			
		||||
      }
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      // not a fatal error, we'll try again next time the indexer runs
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -30,7 +30,7 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async query<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
 | 
			
		||||
    OkPacket[] | ResultSetHeader>(query, params?): Promise<[T, FieldPacket[]]>
 | 
			
		||||
    OkPacket[] | ResultSetHeader>(query, params?, connection?: PoolConnection): Promise<[T, FieldPacket[]]>
 | 
			
		||||
  {
 | 
			
		||||
    this.checkDBFlag();
 | 
			
		||||
    let hardTimeout;
 | 
			
		||||
@ -45,7 +45,9 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
 | 
			
		||||
          reject(new Error(`DB query failed to return, reject or time out within ${hardTimeout / 1000}s - ${query?.sql?.slice(0, 160) || (typeof(query) === 'string' || query instanceof String ? query?.slice(0, 160) : 'unknown query')}`));
 | 
			
		||||
        }, hardTimeout);
 | 
			
		||||
 | 
			
		||||
        this.getPool().then(pool => {
 | 
			
		||||
        // Use a specific connection if provided, otherwise delegate to the pool
 | 
			
		||||
        const connectionPromise = connection ? Promise.resolve(connection) : this.getPool();
 | 
			
		||||
        connectionPromise.then((pool: PoolConnection | Pool) => {
 | 
			
		||||
          return pool.query(query, params) as Promise<[T, FieldPacket[]]>;
 | 
			
		||||
        }).then(result => {
 | 
			
		||||
          resolve(result);
 | 
			
		||||
@ -61,6 +63,33 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $atomicQuery<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
 | 
			
		||||
    OkPacket[] | ResultSetHeader>(queries: { query, params }[]): Promise<[T, FieldPacket[]][]>
 | 
			
		||||
  {
 | 
			
		||||
    const pool = await this.getPool();
 | 
			
		||||
    const connection = await pool.getConnection();
 | 
			
		||||
    try {
 | 
			
		||||
      await connection.beginTransaction();
 | 
			
		||||
 | 
			
		||||
      const results: [T, FieldPacket[]][]  = [];
 | 
			
		||||
      for (const query of queries) {
 | 
			
		||||
        const result = await this.query(query.query, query.params, connection) as [T, FieldPacket[]];
 | 
			
		||||
        results.push(result);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      await connection.commit();
 | 
			
		||||
 | 
			
		||||
      return results;
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      logger.err('Could not complete db transaction, rolling back: ' + (e instanceof Error ? e.message : e));
 | 
			
		||||
      connection.rollback();
 | 
			
		||||
      connection.release();
 | 
			
		||||
      throw e;
 | 
			
		||||
    } finally {
 | 
			
		||||
      connection.release();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async checkDbConnection() {
 | 
			
		||||
    this.checkDBFlag();
 | 
			
		||||
    try {
 | 
			
		||||
 | 
			
		||||
@ -5,52 +5,10 @@ import { Ancestor, CpfpCluster } from '../mempool.interfaces';
 | 
			
		||||
import transactionRepository from '../repositories/TransactionRepository';
 | 
			
		||||
 | 
			
		||||
class CpfpRepository {
 | 
			
		||||
  public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
 | 
			
		||||
    if (!txs[0]) {
 | 
			
		||||
      return false;
 | 
			
		||||
    }
 | 
			
		||||
    // skip clusters of transactions with the same fees
 | 
			
		||||
    const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
 | 
			
		||||
    const equalFee = txs.length > 1 && txs.reduce((acc, tx) => {
 | 
			
		||||
      return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
 | 
			
		||||
    }, true);
 | 
			
		||||
    if (equalFee) {
 | 
			
		||||
      return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      const packedTxs = Buffer.from(this.pack(txs));
 | 
			
		||||
      await DB.query(
 | 
			
		||||
        `
 | 
			
		||||
          INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
 | 
			
		||||
          VALUE (UNHEX(?), ?, ?, ?)
 | 
			
		||||
          ON DUPLICATE KEY UPDATE
 | 
			
		||||
            height = ?,
 | 
			
		||||
            txs = ?,
 | 
			
		||||
            fee_rate = ?
 | 
			
		||||
        `,
 | 
			
		||||
        [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
 | 
			
		||||
      );
 | 
			
		||||
      const maxChunk = 10;
 | 
			
		||||
      let chunkIndex = 0;
 | 
			
		||||
      while (chunkIndex < txs.length) {
 | 
			
		||||
        const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
 | 
			
		||||
          return { txid: tx.txid, cluster: clusterRoot };
 | 
			
		||||
        });
 | 
			
		||||
        await transactionRepository.$batchSetCluster(chunk);
 | 
			
		||||
        chunkIndex += maxChunk;
 | 
			
		||||
      }
 | 
			
		||||
      return true;
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
      throw e;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $batchSaveClusters(clusters: { root: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number }[]): Promise<boolean> {
 | 
			
		||||
    try {
 | 
			
		||||
      const clusterValues: any[] = [];
 | 
			
		||||
      const txs: any[] = [];
 | 
			
		||||
      const clusterValues: [string, number, Buffer, number][] = [];
 | 
			
		||||
      const txs: { txid: string, cluster: string }[] = [];
 | 
			
		||||
 | 
			
		||||
      for (const cluster of clusters) {
 | 
			
		||||
        if (cluster.txs?.length) {
 | 
			
		||||
@ -76,6 +34,8 @@ class CpfpRepository {
 | 
			
		||||
        return false;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const queries: { query, params }[] = [];
 | 
			
		||||
 | 
			
		||||
      const maxChunk = 100;
 | 
			
		||||
      let chunkIndex = 0;
 | 
			
		||||
      // insert clusters in batches of up to 100 rows
 | 
			
		||||
@ -89,10 +49,10 @@ class CpfpRepository {
 | 
			
		||||
          return (' (UNHEX(?), ?, ?, ?)');
 | 
			
		||||
        }) + ';';
 | 
			
		||||
        const values = chunk.flat();
 | 
			
		||||
        await DB.query(
 | 
			
		||||
        queries.push({
 | 
			
		||||
          query,
 | 
			
		||||
          values
 | 
			
		||||
        );
 | 
			
		||||
          params: values,
 | 
			
		||||
        });
 | 
			
		||||
        chunkIndex += maxChunk;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -100,10 +60,12 @@ class CpfpRepository {
 | 
			
		||||
      // insert transactions in batches of up to 100 rows
 | 
			
		||||
      while (chunkIndex < txs.length) {
 | 
			
		||||
        const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
 | 
			
		||||
        await transactionRepository.$batchSetCluster(chunk);
 | 
			
		||||
        queries.push(transactionRepository.buildBatchSetQuery(chunk));
 | 
			
		||||
        chunkIndex += maxChunk;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      await DB.$atomicQuery(queries);
 | 
			
		||||
 | 
			
		||||
      return true;
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
 | 
			
		||||
@ -25,9 +25,8 @@ class TransactionRepository {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $batchSetCluster(txs): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      let query = `
 | 
			
		||||
  public buildBatchSetQuery(txs: { txid: string, cluster: string }[]): { query, params } {
 | 
			
		||||
    let query = `
 | 
			
		||||
          INSERT IGNORE INTO compact_transactions
 | 
			
		||||
          (
 | 
			
		||||
            txid,
 | 
			
		||||
@ -35,13 +34,22 @@ class TransactionRepository {
 | 
			
		||||
          )
 | 
			
		||||
          VALUES
 | 
			
		||||
      `;
 | 
			
		||||
      query += txs.map(tx => {
 | 
			
		||||
        return (' (UNHEX(?), UNHEX(?))');
 | 
			
		||||
      }) + ';';
 | 
			
		||||
      const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
 | 
			
		||||
    query += txs.map(tx => {
 | 
			
		||||
      return (' (UNHEX(?), UNHEX(?))');
 | 
			
		||||
    }) + ';';
 | 
			
		||||
    const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
 | 
			
		||||
    return {
 | 
			
		||||
      query,
 | 
			
		||||
      params: values,
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async $batchSetCluster(txs): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      const query = this.buildBatchSetQuery(txs);
 | 
			
		||||
      await DB.query(
 | 
			
		||||
        query,
 | 
			
		||||
        values
 | 
			
		||||
        query.query,
 | 
			
		||||
        query.params,
 | 
			
		||||
      );
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user