Merge pull request #2939 from mempool/mononaut/cpfp-optimizations
CPFP Optimizations
This commit is contained in:
commit
01a46344b9
@ -22,12 +22,10 @@ import poolsParser from './pools-parser';
|
|||||||
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
|
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
|
||||||
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
|
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
|
||||||
import cpfpRepository from '../repositories/CpfpRepository';
|
import cpfpRepository from '../repositories/CpfpRepository';
|
||||||
import transactionRepository from '../repositories/TransactionRepository';
|
|
||||||
import mining from './mining/mining';
|
import mining from './mining/mining';
|
||||||
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
|
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
|
||||||
import PricesRepository from '../repositories/PricesRepository';
|
import PricesRepository from '../repositories/PricesRepository';
|
||||||
import priceUpdater from '../tasks/price-updater';
|
import priceUpdater from '../tasks/price-updater';
|
||||||
import { Block } from 'bitcoinjs-lib';
|
|
||||||
|
|
||||||
class Blocks {
|
class Blocks {
|
||||||
private blocks: BlockExtended[] = [];
|
private blocks: BlockExtended[] = [];
|
||||||
@ -340,9 +338,10 @@ class Blocks {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// Get all indexed block hash
|
// Get all indexed block hash
|
||||||
const unindexedBlocks = await blocksRepository.$getCPFPUnindexedBlocks();
|
const unindexedBlockHeights = await blocksRepository.$getCPFPUnindexedBlocks();
|
||||||
|
logger.info(`Indexing cpfp data for ${unindexedBlockHeights.length} blocks`);
|
||||||
|
|
||||||
if (!unindexedBlocks?.length) {
|
if (!unindexedBlockHeights?.length) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,30 +350,26 @@ class Blocks {
|
|||||||
let countThisRun = 0;
|
let countThisRun = 0;
|
||||||
let timer = new Date().getTime() / 1000;
|
let timer = new Date().getTime() / 1000;
|
||||||
const startedAt = new Date().getTime() / 1000;
|
const startedAt = new Date().getTime() / 1000;
|
||||||
|
for (const height of unindexedBlockHeights) {
|
||||||
for (const block of unindexedBlocks) {
|
|
||||||
// Logging
|
// Logging
|
||||||
|
const hash = await bitcoinApi.$getBlockHash(height);
|
||||||
const elapsedSeconds = Math.max(1, new Date().getTime() / 1000 - timer);
|
const elapsedSeconds = Math.max(1, new Date().getTime() / 1000 - timer);
|
||||||
if (elapsedSeconds > 5) {
|
if (elapsedSeconds > 5) {
|
||||||
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
|
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
|
||||||
const blockPerSeconds = Math.max(1, countThisRun / elapsedSeconds);
|
const blockPerSeconds = (countThisRun / elapsedSeconds);
|
||||||
const progress = Math.round(count / unindexedBlocks.length * 10000) / 100;
|
const progress = Math.round(count / unindexedBlockHeights.length * 10000) / 100;
|
||||||
logger.debug(`Indexing cpfp clusters for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlocks.length} (${progress}%) | elapsed: ${runningFor} seconds`);
|
logger.debug(`Indexing cpfp clusters for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlockHeights.length} (${progress}%) | elapsed: ${runningFor} seconds`);
|
||||||
timer = new Date().getTime() / 1000;
|
timer = new Date().getTime() / 1000;
|
||||||
countThisRun = 0;
|
countThisRun = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.$indexCPFP(block.hash, block.height); // Calculate and save CPFP data for transactions in this block
|
await this.$indexCPFP(hash, height); // Calculate and save CPFP data for transactions in this block
|
||||||
|
|
||||||
// Logging
|
// Logging
|
||||||
count++;
|
count++;
|
||||||
countThisRun++;
|
countThisRun++;
|
||||||
}
|
}
|
||||||
if (count > 0) {
|
|
||||||
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
|
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
|
||||||
} else {
|
|
||||||
logger.debug(`CPFP indexing completed: indexed ${count} blocks`);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err(`CPFP indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
|
logger.err(`CPFP indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||||
throw e;
|
throw e;
|
||||||
@ -752,33 +747,14 @@ class Blocks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async $indexCPFP(hash: string, height: number): Promise<void> {
|
public async $indexCPFP(hash: string, height: number): Promise<void> {
|
||||||
let transactions;
|
|
||||||
if (Common.blocksSummariesIndexingEnabled()) {
|
|
||||||
transactions = await this.$getStrippedBlockTransactions(hash);
|
|
||||||
const rawBlock = await bitcoinApi.$getRawBlock(hash);
|
|
||||||
const block = Block.fromBuffer(rawBlock);
|
|
||||||
const txMap = {};
|
|
||||||
for (const tx of block.transactions || []) {
|
|
||||||
txMap[tx.getId()] = tx;
|
|
||||||
}
|
|
||||||
for (const tx of transactions) {
|
|
||||||
// convert from bitcoinjs to esplora vin format
|
|
||||||
if (txMap[tx.txid]?.ins) {
|
|
||||||
tx.vin = txMap[tx.txid].ins.map(vin => {
|
|
||||||
return {
|
|
||||||
txid: vin.hash.slice().reverse().toString('hex')
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
const block = await bitcoinClient.getBlock(hash, 2);
|
const block = await bitcoinClient.getBlock(hash, 2);
|
||||||
transactions = block.tx.map(tx => {
|
const transactions = block.tx.map(tx => {
|
||||||
tx.vsize = tx.weight / 4;
|
tx.vsize = tx.weight / 4;
|
||||||
tx.fee *= 100_000_000;
|
tx.fee *= 100_000_000;
|
||||||
return tx;
|
return tx;
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
const clusters: any[] = [];
|
||||||
|
|
||||||
let cluster: TransactionStripped[] = [];
|
let cluster: TransactionStripped[] = [];
|
||||||
let ancestors: { [txid: string]: boolean } = {};
|
let ancestors: { [txid: string]: boolean } = {};
|
||||||
@ -793,10 +769,12 @@ class Blocks {
|
|||||||
});
|
});
|
||||||
const effectiveFeePerVsize = totalFee / totalVSize;
|
const effectiveFeePerVsize = totalFee / totalVSize;
|
||||||
if (cluster.length > 1) {
|
if (cluster.length > 1) {
|
||||||
await cpfpRepository.$saveCluster(height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
|
clusters.push({
|
||||||
for (const tx of cluster) {
|
root: cluster[0].txid,
|
||||||
await transactionRepository.$setCluster(tx.txid, cluster[0].txid);
|
height,
|
||||||
}
|
txs: cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }),
|
||||||
|
effectiveFeePerVsize,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
cluster = [];
|
cluster = [];
|
||||||
ancestors = {};
|
ancestors = {};
|
||||||
@ -806,7 +784,10 @@ class Blocks {
|
|||||||
ancestors[vin.txid] = true;
|
ancestors[vin.txid] = true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
await blocksRepository.$setCPFPIndexed(hash);
|
const result = await cpfpRepository.$batchSaveClusters(clusters);
|
||||||
|
if (!result) {
|
||||||
|
await cpfpRepository.$insertProgressMarker(height);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,9 +2,12 @@ import config from '../config';
|
|||||||
import DB from '../database';
|
import DB from '../database';
|
||||||
import logger from '../logger';
|
import logger from '../logger';
|
||||||
import { Common } from './common';
|
import { Common } from './common';
|
||||||
|
import blocksRepository from '../repositories/BlocksRepository';
|
||||||
|
import cpfpRepository from '../repositories/CpfpRepository';
|
||||||
|
import { RowDataPacket } from 'mysql2';
|
||||||
|
|
||||||
class DatabaseMigration {
|
class DatabaseMigration {
|
||||||
private static currentVersion = 49;
|
private static currentVersion = 52;
|
||||||
private queryTimeout = 3600_000;
|
private queryTimeout = 3600_000;
|
||||||
private statisticsAddedIndexed = false;
|
private statisticsAddedIndexed = false;
|
||||||
private uniqueLogs: string[] = [];
|
private uniqueLogs: string[] = [];
|
||||||
@ -442,6 +445,29 @@ class DatabaseMigration {
|
|||||||
await this.$executeQuery('TRUNCATE TABLE `blocks_audits`');
|
await this.$executeQuery('TRUNCATE TABLE `blocks_audits`');
|
||||||
await this.updateToSchemaVersion(49);
|
await this.updateToSchemaVersion(49);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (databaseSchemaVersion < 50) {
|
||||||
|
await this.$executeQuery('ALTER TABLE `blocks` DROP COLUMN `cpfp_indexed`');
|
||||||
|
await this.updateToSchemaVersion(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseSchemaVersion < 51) {
|
||||||
|
await this.$executeQuery('ALTER TABLE `cpfp_clusters` ADD INDEX `height` (`height`)');
|
||||||
|
await this.updateToSchemaVersion(51);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseSchemaVersion < 52) {
|
||||||
|
await this.$executeQuery(this.getCreateCompactCPFPTableQuery(), await this.$checkIfTableExists('compact_cpfp_clusters'));
|
||||||
|
await this.$executeQuery(this.getCreateCompactTransactionsTableQuery(), await this.$checkIfTableExists('compact_transactions'));
|
||||||
|
try {
|
||||||
|
await this.$convertCompactCpfpTables();
|
||||||
|
await this.$executeQuery('DROP TABLE IF EXISTS `cpfp_clusters`');
|
||||||
|
await this.$executeQuery('DROP TABLE IF EXISTS `transactions`');
|
||||||
|
await this.updateToSchemaVersion(52);
|
||||||
|
} catch(e) {
|
||||||
|
logger.warn('' + (e instanceof Error ? e.message : e));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -913,6 +939,25 @@ class DatabaseMigration {
|
|||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getCreateCompactCPFPTableQuery(): string {
|
||||||
|
return `CREATE TABLE IF NOT EXISTS compact_cpfp_clusters (
|
||||||
|
root binary(32) NOT NULL,
|
||||||
|
height int(10) NOT NULL,
|
||||||
|
txs BLOB DEFAULT NULL,
|
||||||
|
fee_rate float unsigned,
|
||||||
|
PRIMARY KEY (root),
|
||||||
|
INDEX (height)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||||
|
}
|
||||||
|
|
||||||
|
private getCreateCompactTransactionsTableQuery(): string {
|
||||||
|
return `CREATE TABLE IF NOT EXISTS compact_transactions (
|
||||||
|
txid binary(32) NOT NULL,
|
||||||
|
cluster binary(32) DEFAULT NULL,
|
||||||
|
PRIMARY KEY (txid)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||||
|
}
|
||||||
|
|
||||||
public async $truncateIndexedData(tables: string[]) {
|
public async $truncateIndexedData(tables: string[]) {
|
||||||
const allowedTables = ['blocks', 'hashrates', 'prices'];
|
const allowedTables = ['blocks', 'hashrates', 'prices'];
|
||||||
|
|
||||||
@ -933,6 +978,49 @@ class DatabaseMigration {
|
|||||||
logger.warn(`Unable to erase indexed data`);
|
logger.warn(`Unable to erase indexed data`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async $convertCompactCpfpTables(): Promise<void> {
|
||||||
|
try {
|
||||||
|
const batchSize = 250;
|
||||||
|
const maxHeight = await blocksRepository.$mostRecentBlockHeight() || 0;
|
||||||
|
const [minHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from cpfp_clusters`);
|
||||||
|
const minHeight = (minHeightRows.length && minHeightRows[0].minHeight != null) ? minHeightRows[0].minHeight : maxHeight;
|
||||||
|
let height = maxHeight;
|
||||||
|
|
||||||
|
// Logging
|
||||||
|
let timer = new Date().getTime() / 1000;
|
||||||
|
const startedAt = new Date().getTime() / 1000;
|
||||||
|
|
||||||
|
while (height > minHeight) {
|
||||||
|
const [rows] = await DB.query(
|
||||||
|
`
|
||||||
|
SELECT * from cpfp_clusters
|
||||||
|
WHERE height <= ? AND height > ?
|
||||||
|
ORDER BY height
|
||||||
|
`,
|
||||||
|
[height, height - batchSize]
|
||||||
|
) as RowDataPacket[][];
|
||||||
|
if (rows?.length) {
|
||||||
|
await cpfpRepository.$batchSaveClusters(rows.map(row => {
|
||||||
|
return {
|
||||||
|
root: row.root,
|
||||||
|
height: row.height,
|
||||||
|
txs: JSON.parse(row.txs),
|
||||||
|
effectiveFeePerVsize: row.fee_rate,
|
||||||
|
};
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
const elapsed = new Date().getTime() / 1000 - timer;
|
||||||
|
const runningFor = new Date().getTime() / 1000 - startedAt;
|
||||||
|
logger.debug(`Migrated cpfp data from block ${height} to ${height - batchSize} in ${elapsed.toFixed(2)} seconds | total elapsed: ${runningFor.toFixed(2)} seconds`);
|
||||||
|
timer = new Date().getTime() / 1000;
|
||||||
|
height -= batchSize;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(`Failed to migrate cpfp transaction data`);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default new DatabaseMigration();
|
export default new DatabaseMigration();
|
||||||
|
@ -8,6 +8,8 @@ import HashratesRepository from './HashratesRepository';
|
|||||||
import { escape } from 'mysql2';
|
import { escape } from 'mysql2';
|
||||||
import BlocksSummariesRepository from './BlocksSummariesRepository';
|
import BlocksSummariesRepository from './BlocksSummariesRepository';
|
||||||
import DifficultyAdjustmentsRepository from './DifficultyAdjustmentsRepository';
|
import DifficultyAdjustmentsRepository from './DifficultyAdjustmentsRepository';
|
||||||
|
import bitcoinClient from '../api/bitcoin/bitcoin-client';
|
||||||
|
import config from '../config';
|
||||||
|
|
||||||
class BlocksRepository {
|
class BlocksRepository {
|
||||||
/**
|
/**
|
||||||
@ -667,16 +669,32 @@ class BlocksRepository {
|
|||||||
*/
|
*/
|
||||||
public async $getCPFPUnindexedBlocks(): Promise<any[]> {
|
public async $getCPFPUnindexedBlocks(): Promise<any[]> {
|
||||||
try {
|
try {
|
||||||
const [rows]: any = await DB.query(`SELECT height, hash FROM blocks WHERE cpfp_indexed = 0 ORDER BY height DESC`);
|
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
|
||||||
return rows;
|
const currentBlockHeight = blockchainInfo.blocks;
|
||||||
|
let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, currentBlockHeight);
|
||||||
|
if (indexingBlockAmount <= -1) {
|
||||||
|
indexingBlockAmount = currentBlockHeight + 1;
|
||||||
|
}
|
||||||
|
const minHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1);
|
||||||
|
|
||||||
|
const [rows]: any[] = await DB.query(`
|
||||||
|
SELECT height
|
||||||
|
FROM compact_cpfp_clusters
|
||||||
|
WHERE height <= ? AND height >= ?
|
||||||
|
ORDER BY height DESC;
|
||||||
|
`, [currentBlockHeight, minHeight]);
|
||||||
|
|
||||||
|
const indexedHeights = {};
|
||||||
|
rows.forEach((row) => { indexedHeights[row.height] = true; });
|
||||||
|
const allHeights: number[] = Array.from(Array(currentBlockHeight - minHeight + 1).keys(), n => n + minHeight).reverse();
|
||||||
|
const unindexedHeights = allHeights.filter(x => !indexedHeights[x]);
|
||||||
|
|
||||||
|
return unindexedHeights;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err('Cannot fetch CPFP unindexed blocks. Reason: ' + (e instanceof Error ? e.message : e));
|
logger.err('Cannot fetch CPFP unindexed blocks. Reason: ' + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
return [];
|
||||||
|
|
||||||
public async $setCPFPIndexed(hash: string): Promise<void> {
|
|
||||||
await DB.query(`UPDATE blocks SET cpfp_indexed = 1 WHERE hash = ?`, [hash]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,34 +1,151 @@
|
|||||||
|
import cluster, { Cluster } from 'cluster';
|
||||||
|
import { RowDataPacket } from 'mysql2';
|
||||||
import DB from '../database';
|
import DB from '../database';
|
||||||
import logger from '../logger';
|
import logger from '../logger';
|
||||||
import { Ancestor } from '../mempool.interfaces';
|
import { Ancestor } from '../mempool.interfaces';
|
||||||
|
import transactionRepository from '../repositories/TransactionRepository';
|
||||||
|
|
||||||
class CpfpRepository {
|
class CpfpRepository {
|
||||||
public async $saveCluster(height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
|
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
|
||||||
|
if (!txs[0]) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// skip clusters of transactions with the same fees
|
||||||
|
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
|
||||||
|
const equalFee = txs.reduce((acc, tx) => {
|
||||||
|
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
|
||||||
|
}, true);
|
||||||
|
if (equalFee) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const txsJson = JSON.stringify(txs);
|
const packedTxs = Buffer.from(this.pack(txs));
|
||||||
await DB.query(
|
await DB.query(
|
||||||
`
|
`
|
||||||
INSERT INTO cpfp_clusters(root, height, txs, fee_rate)
|
INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
|
||||||
VALUE (?, ?, ?, ?)
|
VALUE (UNHEX(?), ?, ?, ?)
|
||||||
ON DUPLICATE KEY UPDATE
|
ON DUPLICATE KEY UPDATE
|
||||||
height = ?,
|
height = ?,
|
||||||
txs = ?,
|
txs = ?,
|
||||||
fee_rate = ?
|
fee_rate = ?
|
||||||
`,
|
`,
|
||||||
[txs[0].txid, height, txsJson, effectiveFeePerVsize, height, txsJson, effectiveFeePerVsize, height]
|
[clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
|
||||||
);
|
);
|
||||||
|
const maxChunk = 10;
|
||||||
|
let chunkIndex = 0;
|
||||||
|
while (chunkIndex < txs.length) {
|
||||||
|
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
|
||||||
|
return { txid: tx.txid, cluster: clusterRoot };
|
||||||
|
});
|
||||||
|
await transactionRepository.$batchSetCluster(chunk);
|
||||||
|
chunkIndex += maxChunk;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async $batchSaveClusters(clusters: { root: string, height: number, txs: any, effectiveFeePerVsize: number}[]): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
const clusterValues: any[] = [];
|
||||||
|
const txs: any[] = [];
|
||||||
|
|
||||||
|
for (const cluster of clusters) {
|
||||||
|
if (cluster.txs?.length > 1) {
|
||||||
|
const roundedEffectiveFee = Math.round(cluster.effectiveFeePerVsize * 100) / 100;
|
||||||
|
const equalFee = cluster.txs.reduce((acc, tx) => {
|
||||||
|
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
|
||||||
|
}, true);
|
||||||
|
if (!equalFee) {
|
||||||
|
clusterValues.push([
|
||||||
|
cluster.root,
|
||||||
|
cluster.height,
|
||||||
|
Buffer.from(this.pack(cluster.txs)),
|
||||||
|
cluster.effectiveFeePerVsize
|
||||||
|
]);
|
||||||
|
for (const tx of cluster.txs) {
|
||||||
|
txs.push({ txid: tx.txid, cluster: cluster.root });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clusterValues.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxChunk = 100;
|
||||||
|
let chunkIndex = 0;
|
||||||
|
// insert transactions in batches of up to 100 rows
|
||||||
|
while (chunkIndex < txs.length) {
|
||||||
|
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
|
||||||
|
await transactionRepository.$batchSetCluster(chunk);
|
||||||
|
chunkIndex += maxChunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkIndex = 0;
|
||||||
|
// insert clusters in batches of up to 100 rows
|
||||||
|
while (chunkIndex < clusterValues.length) {
|
||||||
|
const chunk = clusterValues.slice(chunkIndex, chunkIndex + maxChunk);
|
||||||
|
let query = `
|
||||||
|
INSERT IGNORE INTO compact_cpfp_clusters(root, height, txs, fee_rate)
|
||||||
|
VALUES
|
||||||
|
`;
|
||||||
|
query += chunk.map(chunk => {
|
||||||
|
return (' (UNHEX(?), ?, ?, ?)');
|
||||||
|
}) + ';';
|
||||||
|
const values = chunk.flat();
|
||||||
|
await DB.query(
|
||||||
|
query,
|
||||||
|
values
|
||||||
|
);
|
||||||
|
chunkIndex += maxChunk;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} catch (e: any) {
|
||||||
|
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async $getCluster(clusterRoot: string): Promise<Cluster> {
|
||||||
|
const [clusterRows]: any = await DB.query(
|
||||||
|
`
|
||||||
|
SELECT *
|
||||||
|
FROM compact_cpfp_clusters
|
||||||
|
WHERE root = UNHEX(?)
|
||||||
|
`,
|
||||||
|
[clusterRoot]
|
||||||
|
);
|
||||||
|
const cluster = clusterRows[0];
|
||||||
|
cluster.txs = this.unpack(cluster.txs);
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
public async $deleteClustersFrom(height: number): Promise<void> {
|
public async $deleteClustersFrom(height: number): Promise<void> {
|
||||||
logger.info(`Delete newer cpfp clusters from height ${height} from the database`);
|
logger.info(`Delete newer cpfp clusters from height ${height} from the database`);
|
||||||
try {
|
try {
|
||||||
|
const [rows] = await DB.query(
|
||||||
|
`
|
||||||
|
SELECT txs, height, root from compact_cpfp_clusters
|
||||||
|
WHERE height >= ?
|
||||||
|
`,
|
||||||
|
[height]
|
||||||
|
) as RowDataPacket[][];
|
||||||
|
if (rows?.length) {
|
||||||
|
for (let clusterToDelete of rows) {
|
||||||
|
const txs = this.unpack(clusterToDelete.txs);
|
||||||
|
for (let tx of txs) {
|
||||||
|
await transactionRepository.$removeTransaction(tx.txid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
await DB.query(
|
await DB.query(
|
||||||
`
|
`
|
||||||
DELETE from cpfp_clusters
|
DELETE from compact_cpfp_clusters
|
||||||
WHERE height >= ?
|
WHERE height >= ?
|
||||||
`,
|
`,
|
||||||
[height]
|
[height]
|
||||||
@ -38,6 +155,70 @@ class CpfpRepository {
|
|||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// insert a dummy row to mark that we've indexed as far as this block
|
||||||
|
public async $insertProgressMarker(height: number): Promise<void> {
|
||||||
|
try {
|
||||||
|
const [rows]: any = await DB.query(
|
||||||
|
`
|
||||||
|
SELECT root
|
||||||
|
FROM compact_cpfp_clusters
|
||||||
|
WHERE height = ?
|
||||||
|
`,
|
||||||
|
[height]
|
||||||
|
);
|
||||||
|
if (!rows?.length) {
|
||||||
|
const rootBuffer = Buffer.alloc(32);
|
||||||
|
rootBuffer.writeInt32LE(height);
|
||||||
|
await DB.query(
|
||||||
|
`
|
||||||
|
INSERT INTO compact_cpfp_clusters(root, height, fee_rate)
|
||||||
|
VALUE (?, ?, ?)
|
||||||
|
`,
|
||||||
|
[rootBuffer, height, 0]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (e: any) {
|
||||||
|
logger.err(`Cannot insert cpfp progress marker. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public pack(txs: Ancestor[]): ArrayBuffer {
|
||||||
|
const buf = new ArrayBuffer(44 * txs.length);
|
||||||
|
const view = new DataView(buf);
|
||||||
|
txs.forEach((tx, i) => {
|
||||||
|
const offset = i * 44;
|
||||||
|
for (let x = 0; x < 32; x++) {
|
||||||
|
// store txid in little-endian
|
||||||
|
view.setUint8(offset + (31 - x), parseInt(tx.txid.slice(x * 2, (x * 2) + 2), 16));
|
||||||
|
}
|
||||||
|
view.setUint32(offset + 32, tx.weight);
|
||||||
|
view.setBigUint64(offset + 36, BigInt(Math.round(tx.fee)));
|
||||||
|
});
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public unpack(buf: Buffer): Ancestor[] {
|
||||||
|
if (!buf) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const arrayBuffer = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
|
||||||
|
const txs: Ancestor[] = [];
|
||||||
|
const view = new DataView(arrayBuffer);
|
||||||
|
for (let offset = 0; offset < arrayBuffer.byteLength; offset += 44) {
|
||||||
|
const txid = Array.from(new Uint8Array(arrayBuffer, offset, 32)).reverse().map(b => b.toString(16).padStart(2, '0')).join('');
|
||||||
|
const weight = view.getUint32(offset + 32);
|
||||||
|
const fee = Number(view.getBigUint64(offset + 36));
|
||||||
|
txs.push({
|
||||||
|
txid,
|
||||||
|
weight,
|
||||||
|
fee
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return txs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default new CpfpRepository();
|
export default new CpfpRepository();
|
@ -1,6 +1,7 @@
|
|||||||
import DB from '../database';
|
import DB from '../database';
|
||||||
import logger from '../logger';
|
import logger from '../logger';
|
||||||
import { Ancestor, CpfpInfo } from '../mempool.interfaces';
|
import { Ancestor, CpfpInfo } from '../mempool.interfaces';
|
||||||
|
import cpfpRepository from './CpfpRepository';
|
||||||
|
|
||||||
interface CpfpSummary {
|
interface CpfpSummary {
|
||||||
txid: string;
|
txid: string;
|
||||||
@ -12,20 +13,20 @@ interface CpfpSummary {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class TransactionRepository {
|
class TransactionRepository {
|
||||||
public async $setCluster(txid: string, cluster: string): Promise<void> {
|
public async $setCluster(txid: string, clusterRoot: string): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await DB.query(
|
await DB.query(
|
||||||
`
|
`
|
||||||
INSERT INTO transactions
|
INSERT INTO compact_transactions
|
||||||
(
|
(
|
||||||
txid,
|
txid,
|
||||||
cluster
|
cluster
|
||||||
)
|
)
|
||||||
VALUE (?, ?)
|
VALUE (UNHEX(?), UNHEX(?))
|
||||||
ON DUPLICATE KEY UPDATE
|
ON DUPLICATE KEY UPDATE
|
||||||
cluster = ?
|
cluster = UNHEX(?)
|
||||||
;`,
|
;`,
|
||||||
[txid, cluster, cluster]
|
[txid, clusterRoot, clusterRoot]
|
||||||
);
|
);
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
@ -33,33 +34,69 @@ class TransactionRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
|
public async $batchSetCluster(txs): Promise<void> {
|
||||||
try {
|
try {
|
||||||
let query = `
|
let query = `
|
||||||
SELECT *
|
INSERT IGNORE INTO compact_transactions
|
||||||
FROM transactions
|
(
|
||||||
LEFT JOIN cpfp_clusters AS cluster ON cluster.root = transactions.cluster
|
txid,
|
||||||
WHERE transactions.txid = ?
|
cluster
|
||||||
|
)
|
||||||
|
VALUES
|
||||||
`;
|
`;
|
||||||
const [rows]: any = await DB.query(query, [txid]);
|
query += txs.map(tx => {
|
||||||
if (rows.length) {
|
return (' (UNHEX(?), UNHEX(?))');
|
||||||
rows[0].txs = JSON.parse(rows[0].txs) as Ancestor[];
|
}) + ';';
|
||||||
if (rows[0]?.txs?.length) {
|
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
|
||||||
return this.convertCpfp(rows[0]);
|
await DB.query(
|
||||||
|
query,
|
||||||
|
values
|
||||||
|
);
|
||||||
|
} catch (e: any) {
|
||||||
|
logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
|
||||||
|
try {
|
||||||
|
const [txRows]: any = await DB.query(
|
||||||
|
`
|
||||||
|
SELECT HEX(txid) as id, HEX(cluster) as root
|
||||||
|
FROM compact_transactions
|
||||||
|
WHERE txid = UNHEX(?)
|
||||||
|
`,
|
||||||
|
[txid]
|
||||||
|
);
|
||||||
|
if (txRows.length && txRows[0].root != null) {
|
||||||
|
const txid = txRows[0].id.toLowerCase();
|
||||||
|
const clusterId = txRows[0].root.toLowerCase();
|
||||||
|
const cluster = await cpfpRepository.$getCluster(clusterId);
|
||||||
|
return this.convertCpfp(txid, cluster);
|
||||||
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e));
|
logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private convertCpfp(cpfp: CpfpSummary): CpfpInfo {
|
public async $removeTransaction(txid: string): Promise<void> {
|
||||||
|
await DB.query(
|
||||||
|
`
|
||||||
|
DELETE FROM compact_transactions
|
||||||
|
WHERE txid = UNHEX(?)
|
||||||
|
`,
|
||||||
|
[txid]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private convertCpfp(txid, cluster): CpfpInfo {
|
||||||
const descendants: Ancestor[] = [];
|
const descendants: Ancestor[] = [];
|
||||||
const ancestors: Ancestor[] = [];
|
const ancestors: Ancestor[] = [];
|
||||||
let matched = false;
|
let matched = false;
|
||||||
for (const tx of cpfp.txs) {
|
|
||||||
if (tx.txid === cpfp.txid) {
|
for (const tx of cluster.txs) {
|
||||||
|
if (tx.txid === txid) {
|
||||||
matched = true;
|
matched = true;
|
||||||
} else if (!matched) {
|
} else if (!matched) {
|
||||||
descendants.push(tx);
|
descendants.push(tx);
|
||||||
@ -70,7 +107,6 @@ class TransactionRepository {
|
|||||||
return {
|
return {
|
||||||
descendants,
|
descendants,
|
||||||
ancestors,
|
ancestors,
|
||||||
effectiveFeePerVsize: cpfp.fee_rate
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,26 +131,20 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
|
|||||||
this.cpfpInfo = null;
|
this.cpfpInfo = null;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (cpfpInfo.effectiveFeePerVsize) {
|
// merge ancestors/descendants
|
||||||
this.tx.effectiveFeePerVsize = cpfpInfo.effectiveFeePerVsize;
|
const relatives = [...(cpfpInfo.ancestors || []), ...(cpfpInfo.descendants || [])];
|
||||||
} else {
|
if (cpfpInfo.bestDescendant && !cpfpInfo.descendants?.length) {
|
||||||
const lowerFeeParents = cpfpInfo.ancestors.filter(
|
relatives.push(cpfpInfo.bestDescendant);
|
||||||
(parent) => parent.fee / (parent.weight / 4) < this.tx.feePerVsize
|
}
|
||||||
);
|
|
||||||
let totalWeight =
|
let totalWeight =
|
||||||
this.tx.weight +
|
this.tx.weight +
|
||||||
lowerFeeParents.reduce((prev, val) => prev + val.weight, 0);
|
relatives.reduce((prev, val) => prev + val.weight, 0);
|
||||||
let totalFees =
|
let totalFees =
|
||||||
this.tx.fee +
|
this.tx.fee +
|
||||||
lowerFeeParents.reduce((prev, val) => prev + val.fee, 0);
|
relatives.reduce((prev, val) => prev + val.fee, 0);
|
||||||
|
|
||||||
if (cpfpInfo?.bestDescendant) {
|
|
||||||
totalWeight += cpfpInfo?.bestDescendant.weight;
|
|
||||||
totalFees += cpfpInfo?.bestDescendant.fee;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.tx.effectiveFeePerVsize = totalFees / (totalWeight / 4);
|
this.tx.effectiveFeePerVsize = totalFees / (totalWeight / 4);
|
||||||
}
|
|
||||||
if (!this.tx.status.confirmed) {
|
if (!this.tx.status.confirmed) {
|
||||||
this.stateService.markBlock$.next({
|
this.stateService.markBlock$.next({
|
||||||
txFeePerVSize: this.tx.effectiveFeePerVsize,
|
txFeePerVSize: this.tx.effectiveFeePerVsize,
|
||||||
|
@ -24,7 +24,6 @@ export interface CpfpInfo {
|
|||||||
ancestors: Ancestor[];
|
ancestors: Ancestor[];
|
||||||
descendants?: Ancestor[];
|
descendants?: Ancestor[];
|
||||||
bestDescendant?: BestDescendant | null;
|
bestDescendant?: BestDescendant | null;
|
||||||
effectiveFeePerVsize?: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DifficultyAdjustment {
|
export interface DifficultyAdjustment {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user