batch db inserts for cpfp data
This commit is contained in:
		
							parent
							
								
									fcd047f302
								
							
						
					
					
						commit
						8de3fd0988
					
				@ -760,6 +760,8 @@ class Blocks {
 | 
				
			|||||||
      return tx;
 | 
					      return tx;
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    const clusters: any[] = [];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let cluster: TransactionStripped[] = [];
 | 
					    let cluster: TransactionStripped[] = [];
 | 
				
			||||||
    let ancestors: { [txid: string]: boolean } = {};
 | 
					    let ancestors: { [txid: string]: boolean } = {};
 | 
				
			||||||
    for (let i = transactions.length - 1; i >= 0; i--) {
 | 
					    for (let i = transactions.length - 1; i >= 0; i--) {
 | 
				
			||||||
@ -773,13 +775,12 @@ class Blocks {
 | 
				
			|||||||
        });
 | 
					        });
 | 
				
			||||||
        const effectiveFeePerVsize = totalFee / totalVSize;
 | 
					        const effectiveFeePerVsize = totalFee / totalVSize;
 | 
				
			||||||
        if (cluster.length > 1) {
 | 
					        if (cluster.length > 1) {
 | 
				
			||||||
          const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
 | 
					          clusters.push({
 | 
				
			||||||
          const equalFee = cluster.reduce((acc, tx) => {
 | 
					            root: cluster[0].txid,
 | 
				
			||||||
            return (acc && Math.round(((tx.fee || 0) / tx.vsize) * 100) / 100 === roundedEffectiveFee);
 | 
					            height,
 | 
				
			||||||
          }, true);
 | 
					            txs: cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }),
 | 
				
			||||||
          if (!equalFee) {
 | 
					            effectiveFeePerVsize,
 | 
				
			||||||
            await cpfpRepository.$saveCluster(cluster[0].txid, height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
 | 
					          });
 | 
				
			||||||
          }
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        cluster = [];
 | 
					        cluster = [];
 | 
				
			||||||
        ancestors = {};
 | 
					        ancestors = {};
 | 
				
			||||||
@ -789,6 +790,7 @@ class Blocks {
 | 
				
			|||||||
        ancestors[vin.txid] = true;
 | 
					        ancestors[vin.txid] = true;
 | 
				
			||||||
      });
 | 
					      });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    await cpfpRepository.$batchSaveClusters(clusters);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -10,6 +10,15 @@ class CpfpRepository {
 | 
				
			|||||||
    if (!txs[0]) {
 | 
					    if (!txs[0]) {
 | 
				
			||||||
      return;
 | 
					      return;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    // skip clusters of transactions with the same fees
 | 
				
			||||||
 | 
					    const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
 | 
				
			||||||
 | 
					    const equalFee = txs.reduce((acc, tx) => {
 | 
				
			||||||
 | 
					      return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
 | 
				
			||||||
 | 
					    }, true);
 | 
				
			||||||
 | 
					    if (equalFee) {
 | 
				
			||||||
 | 
					      return;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try {
 | 
					    try {
 | 
				
			||||||
      const packedTxs = Buffer.from(this.pack(txs));
 | 
					      const packedTxs = Buffer.from(this.pack(txs));
 | 
				
			||||||
      await DB.query(
 | 
					      await DB.query(
 | 
				
			||||||
@ -23,8 +32,14 @@ class CpfpRepository {
 | 
				
			|||||||
        `,
 | 
					        `,
 | 
				
			||||||
        [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
 | 
					        [clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
 | 
				
			||||||
      );
 | 
					      );
 | 
				
			||||||
      for (const tx of txs) {
 | 
					      const maxChunk = 10;
 | 
				
			||||||
        await transactionRepository.$setCluster(tx.txid, clusterRoot);
 | 
					      let chunkIndex = 0;
 | 
				
			||||||
 | 
					      while (chunkIndex < txs.length) {
 | 
				
			||||||
 | 
					        const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
 | 
				
			||||||
 | 
					          return { txid: tx.txid, cluster: clusterRoot };
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					        await transactionRepository.$batchSetCluster(chunk);
 | 
				
			||||||
 | 
					        chunkIndex += maxChunk;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } catch (e: any) {
 | 
					    } catch (e: any) {
 | 
				
			||||||
      logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
					      logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
				
			||||||
@ -32,6 +47,69 @@ class CpfpRepository {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public async $batchSaveClusters(clusters: { root: string, height: number, txs: any, effectiveFeePerVsize: number}[]): Promise<void> {
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      const clusterValues: any[] = [];
 | 
				
			||||||
 | 
					      const txs: any[] = [];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      for (const cluster of clusters) {
 | 
				
			||||||
 | 
					        if (cluster.txs?.length > 1) {
 | 
				
			||||||
 | 
					          const roundedEffectiveFee = Math.round(cluster.effectiveFeePerVsize * 100) / 100;
 | 
				
			||||||
 | 
					          const equalFee = cluster.txs.reduce((acc, tx) => {
 | 
				
			||||||
 | 
					            return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
 | 
				
			||||||
 | 
					          }, true);
 | 
				
			||||||
 | 
					          if (!equalFee) {
 | 
				
			||||||
 | 
					            clusterValues.push([
 | 
				
			||||||
 | 
					              cluster.root,
 | 
				
			||||||
 | 
					              cluster.height,
 | 
				
			||||||
 | 
					              Buffer.from(this.pack(cluster.txs)),
 | 
				
			||||||
 | 
					              cluster.effectiveFeePerVsize
 | 
				
			||||||
 | 
					            ]);
 | 
				
			||||||
 | 
					            for (const tx of cluster.txs) {
 | 
				
			||||||
 | 
					              txs.push({ txid: tx.txid, cluster: cluster.root });
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      if (!clusterValues.length) {
 | 
				
			||||||
 | 
					        return;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      const maxChunk = 100;
 | 
				
			||||||
 | 
					      let chunkIndex = 0;
 | 
				
			||||||
 | 
					      // insert transactions in batches of up to 100 rows
 | 
				
			||||||
 | 
					      while (chunkIndex < txs.length) {
 | 
				
			||||||
 | 
					        const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
 | 
				
			||||||
 | 
					        await transactionRepository.$batchSetCluster(chunk);
 | 
				
			||||||
 | 
					        chunkIndex += maxChunk;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      chunkIndex = 0;
 | 
				
			||||||
 | 
					      // insert clusters in batches of up to 100 rows
 | 
				
			||||||
 | 
					      while (chunkIndex < clusterValues.length) {
 | 
				
			||||||
 | 
					        const chunk = clusterValues.slice(chunkIndex, chunkIndex + maxChunk);
 | 
				
			||||||
 | 
					        let query = `
 | 
				
			||||||
 | 
					            INSERT IGNORE INTO compact_cpfp_clusters(root, height, txs, fee_rate)
 | 
				
			||||||
 | 
					            VALUES
 | 
				
			||||||
 | 
					        `;
 | 
				
			||||||
 | 
					        query += chunk.map(chunk => {
 | 
				
			||||||
 | 
					          return (' (UNHEX(?), ?, ?, ?)');
 | 
				
			||||||
 | 
					        }) + ';';
 | 
				
			||||||
 | 
					        const values = chunk.flat();
 | 
				
			||||||
 | 
					        await DB.query(
 | 
				
			||||||
 | 
					          query,
 | 
				
			||||||
 | 
					          values
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					        chunkIndex += maxChunk;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      return;
 | 
				
			||||||
 | 
					    } catch (e: any) {
 | 
				
			||||||
 | 
					      logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
				
			||||||
 | 
					      throw e;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async $getCluster(clusterRoot: string): Promise<Cluster> {
 | 
					  public async $getCluster(clusterRoot: string): Promise<Cluster> {
 | 
				
			||||||
    const [clusterRows]: any = await DB.query(
 | 
					    const [clusterRows]: any = await DB.query(
 | 
				
			||||||
      `
 | 
					      `
 | 
				
			||||||
 | 
				
			|||||||
@ -34,6 +34,30 @@ class TransactionRepository {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public async $batchSetCluster(txs): Promise<void> {
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      let query = `
 | 
				
			||||||
 | 
					          INSERT IGNORE INTO compact_transactions
 | 
				
			||||||
 | 
					          (
 | 
				
			||||||
 | 
					            txid,
 | 
				
			||||||
 | 
					            cluster
 | 
				
			||||||
 | 
					          )
 | 
				
			||||||
 | 
					          VALUES
 | 
				
			||||||
 | 
					      `;
 | 
				
			||||||
 | 
					      query += txs.map(tx => {
 | 
				
			||||||
 | 
					        return (' (UNHEX(?), UNHEX(?))');
 | 
				
			||||||
 | 
					      }) + ';';
 | 
				
			||||||
 | 
					      const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
 | 
				
			||||||
 | 
					      await DB.query(
 | 
				
			||||||
 | 
					        query,
 | 
				
			||||||
 | 
					        values
 | 
				
			||||||
 | 
					      );
 | 
				
			||||||
 | 
					    } catch (e: any) {
 | 
				
			||||||
 | 
					      logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
				
			||||||
 | 
					      throw e;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
 | 
					  public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
 | 
				
			||||||
    try {
 | 
					    try {
 | 
				
			||||||
      const [txRows]: any = await DB.query(
 | 
					      const [txRows]: any = await DB.query(
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user