compact schemas for cpfp tables
This commit is contained in:
parent
7793eaecbc
commit
b50936f001
@ -22,12 +22,10 @@ import poolsParser from './pools-parser';
|
||||
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
|
||||
import BlocksAuditsRepository from '../repositories/BlocksAuditsRepository';
|
||||
import cpfpRepository from '../repositories/CpfpRepository';
|
||||
import transactionRepository from '../repositories/TransactionRepository';
|
||||
import mining from './mining/mining';
|
||||
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
|
||||
import PricesRepository from '../repositories/PricesRepository';
|
||||
import priceUpdater from '../tasks/price-updater';
|
||||
import { Block } from 'bitcoinjs-lib';
|
||||
|
||||
class Blocks {
|
||||
private blocks: BlockExtended[] = [];
|
||||
@ -753,10 +751,8 @@ class Blocks {
|
||||
}
|
||||
|
||||
public async $indexCPFP(hash: string, height: number): Promise<void> {
|
||||
let transactions;
|
||||
|
||||
const block = await bitcoinClient.getBlock(hash, 2);
|
||||
transactions = block.tx.map(tx => {
|
||||
const transactions = block.tx.map(tx => {
|
||||
tx.vsize = tx.weight / 4;
|
||||
tx.fee *= 100_000_000;
|
||||
return tx;
|
||||
@ -775,9 +771,12 @@ class Blocks {
|
||||
});
|
||||
const effectiveFeePerVsize = totalFee / totalVSize;
|
||||
if (cluster.length > 1) {
|
||||
await cpfpRepository.$saveCluster(height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
|
||||
for (const tx of cluster) {
|
||||
await transactionRepository.$setCluster(tx.txid, cluster[0].txid);
|
||||
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
|
||||
const equalFee = cluster.reduce((acc, tx) => {
|
||||
return (acc && Math.round(((tx.fee || 0) / tx.vsize) * 100) / 100 === roundedEffectiveFee);
|
||||
}, true);
|
||||
if (!equalFee) {
|
||||
await cpfpRepository.$saveCluster(cluster[0].txid, height, cluster.map(tx => { return { txid: tx.txid, weight: tx.vsize * 4, fee: tx.fee || 0 }; }), effectiveFeePerVsize);
|
||||
}
|
||||
}
|
||||
cluster = [];
|
||||
|
@ -445,6 +445,8 @@ class DatabaseMigration {
|
||||
|
||||
if (databaseSchemaVersion < 50 && isBitcoin === true) {
|
||||
await this.$executeQuery('ALTER TABLE `blocks` DROP COLUMN `cpfp_indexed`');
|
||||
await this.$executeQuery(this.getCreateCompactCPFPTableQuery(), await this.$checkIfTableExists('compact_cpfp_clusters'));
|
||||
await this.$executeQuery(this.getCreateCompactTransactionsTableQuery(), await this.$checkIfTableExists('compact_transactions'));
|
||||
await this.updateToSchemaVersion(50);
|
||||
}
|
||||
}
|
||||
@ -918,6 +920,25 @@ class DatabaseMigration {
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
private getCreateCompactCPFPTableQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS compact_cpfp_clusters (
|
||||
root binary(32) NOT NULL,
|
||||
height int(10) NOT NULL,
|
||||
txs BLOB DEFAULT NULL,
|
||||
fee_rate float unsigned NOT NULL,
|
||||
PRIMARY KEY (root),
|
||||
INDEX (height)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
private getCreateCompactTransactionsTableQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS compact_transactions (
|
||||
txid binary(32) NOT NULL,
|
||||
cluster binary(32) DEFAULT NULL,
|
||||
PRIMARY KEY (txid)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
public async $truncateIndexedData(tables: string[]) {
|
||||
const allowedTables = ['blocks', 'hashrates', 'prices'];
|
||||
|
||||
|
@ -671,7 +671,7 @@ class BlocksRepository {
|
||||
try {
|
||||
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
|
||||
const currentBlockHeight = blockchainInfo.blocks;
|
||||
const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from cpfp_clusters`);
|
||||
const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from compact_cpfp_clusters`);
|
||||
const lastHeight = (lastHeightRows.length && lastHeightRows[0].minHeight != null) ? lastHeightRows[0].minHeight : currentBlockHeight;
|
||||
|
||||
let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, blockchainInfo.blocks);
|
||||
|
@ -1,34 +1,72 @@
|
||||
import cluster, { Cluster } from 'cluster';
|
||||
import { RowDataPacket } from 'mysql2';
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
import { Ancestor } from '../mempool.interfaces';
|
||||
import transactionRepository from '../repositories/TransactionRepository';
|
||||
|
||||
class CpfpRepository {
|
||||
public async $saveCluster(height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
|
||||
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
|
||||
if (!txs[0]) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const txsJson = JSON.stringify(txs);
|
||||
const packedTxs = Buffer.from(this.pack(txs));
|
||||
await DB.query(
|
||||
`
|
||||
INSERT INTO cpfp_clusters(root, height, txs, fee_rate)
|
||||
VALUE (?, ?, ?, ?)
|
||||
INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
|
||||
VALUE (UNHEX(?), ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
height = ?,
|
||||
txs = ?,
|
||||
fee_rate = ?
|
||||
`,
|
||||
[txs[0].txid, height, txsJson, effectiveFeePerVsize, height, txsJson, effectiveFeePerVsize, height]
|
||||
[clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
|
||||
);
|
||||
for (const tx of txs) {
|
||||
await transactionRepository.$setCluster(tx.txid, clusterRoot);
|
||||
}
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async $getCluster(clusterRoot: string): Promise<Cluster> {
|
||||
const [clusterRows]: any = await DB.query(
|
||||
`
|
||||
SELECT *
|
||||
FROM compact_cpfp_clusters
|
||||
WHERE root = UNHEX(?)
|
||||
`,
|
||||
[clusterRoot]
|
||||
);
|
||||
const cluster = clusterRows[0];
|
||||
cluster.txs = this.unpack(cluster.txs);
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public async $deleteClustersFrom(height: number): Promise<void> {
|
||||
logger.info(`Delete newer cpfp clusters from height ${height} from the database`);
|
||||
try {
|
||||
const [rows] = await DB.query(
|
||||
`
|
||||
SELECT txs, height, root from compact_cpfp_clusters
|
||||
WHERE height >= ?
|
||||
`,
|
||||
[height]
|
||||
) as RowDataPacket[][];
|
||||
if (rows?.length) {
|
||||
for (let clusterToDelete of rows) {
|
||||
const txs = this.unpack(clusterToDelete.txs);
|
||||
for (let tx of txs) {
|
||||
await transactionRepository.$removeTransaction(tx.txid);
|
||||
}
|
||||
}
|
||||
}
|
||||
await DB.query(
|
||||
`
|
||||
DELETE from cpfp_clusters
|
||||
DELETE from compact_cpfp_clusters
|
||||
WHERE height >= ?
|
||||
`,
|
||||
[height]
|
||||
@ -38,6 +76,42 @@ class CpfpRepository {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public pack(txs: Ancestor[]): ArrayBuffer {
|
||||
const buf = new ArrayBuffer(44 * txs.length);
|
||||
const view = new DataView(buf);
|
||||
txs.forEach((tx, i) => {
|
||||
const offset = i * 44;
|
||||
for (let x = 0; x < 32; x++) {
|
||||
// store txid in little-endian
|
||||
view.setUint8(offset + (31 - x), parseInt(tx.txid.slice(x * 2, (x * 2) + 2), 16));
|
||||
}
|
||||
view.setUint32(offset + 32, tx.weight);
|
||||
view.setBigUint64(offset + 36, BigInt(Math.round(tx.fee)));
|
||||
});
|
||||
return buf;
|
||||
}
|
||||
|
||||
public unpack(buf: Buffer): Ancestor[] {
|
||||
if (!buf) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const arrayBuffer = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
|
||||
const txs: Ancestor[] = [];
|
||||
const view = new DataView(arrayBuffer);
|
||||
for (let offset = 0; offset < arrayBuffer.byteLength; offset += 44) {
|
||||
const txid = Array.from(new Uint8Array(arrayBuffer, offset, 32)).reverse().map(b => b.toString(16).padStart(2, '0')).join('');
|
||||
const weight = view.getUint32(offset + 32);
|
||||
const fee = Number(view.getBigUint64(offset + 36));
|
||||
txs.push({
|
||||
txid,
|
||||
weight,
|
||||
fee
|
||||
});
|
||||
}
|
||||
return txs;
|
||||
}
|
||||
}
|
||||
|
||||
export default new CpfpRepository();
|
@ -1,6 +1,7 @@
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
import { Ancestor, CpfpInfo } from '../mempool.interfaces';
|
||||
import cpfpRepository from './CpfpRepository';
|
||||
|
||||
interface CpfpSummary {
|
||||
txid: string;
|
||||
@ -12,20 +13,20 @@ interface CpfpSummary {
|
||||
}
|
||||
|
||||
class TransactionRepository {
|
||||
public async $setCluster(txid: string, cluster: string): Promise<void> {
|
||||
public async $setCluster(txid: string, clusterRoot: string): Promise<void> {
|
||||
try {
|
||||
await DB.query(
|
||||
`
|
||||
INSERT INTO transactions
|
||||
INSERT INTO compact_transactions
|
||||
(
|
||||
txid,
|
||||
cluster
|
||||
)
|
||||
VALUE (?, ?)
|
||||
VALUE (UNHEX(?), UNHEX(?))
|
||||
ON DUPLICATE KEY UPDATE
|
||||
cluster = ?
|
||||
cluster = UNHEX(?)
|
||||
;`,
|
||||
[txid, cluster, cluster]
|
||||
[txid, clusterRoot, clusterRoot]
|
||||
);
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot save transaction cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
@ -35,18 +36,19 @@ class TransactionRepository {
|
||||
|
||||
public async $getCpfpInfo(txid: string): Promise<CpfpInfo | void> {
|
||||
try {
|
||||
let query = `
|
||||
SELECT *
|
||||
FROM transactions
|
||||
LEFT JOIN cpfp_clusters AS cluster ON cluster.root = transactions.cluster
|
||||
WHERE transactions.txid = ?
|
||||
`;
|
||||
const [rows]: any = await DB.query(query, [txid]);
|
||||
if (rows.length) {
|
||||
rows[0].txs = JSON.parse(rows[0].txs) as Ancestor[];
|
||||
if (rows[0]?.txs?.length) {
|
||||
return this.convertCpfp(rows[0]);
|
||||
}
|
||||
const [txRows]: any = await DB.query(
|
||||
`
|
||||
SELECT HEX(txid) as id, HEX(cluster) as root
|
||||
FROM compact_transactions
|
||||
WHERE txid = UNHEX(?)
|
||||
`,
|
||||
[txid]
|
||||
);
|
||||
if (txRows.length && txRows[0].root != null) {
|
||||
const txid = txRows[0].id.toLowerCase();
|
||||
const clusterId = txRows[0].root.toLowerCase();
|
||||
const cluster = await cpfpRepository.$getCluster(clusterId);
|
||||
return this.convertCpfp(txid, cluster);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err('Cannot get transaction cpfp info from db. Reason: ' + (e instanceof Error ? e.message : e));
|
||||
@ -54,12 +56,23 @@ class TransactionRepository {
|
||||
}
|
||||
}
|
||||
|
||||
private convertCpfp(cpfp: CpfpSummary): CpfpInfo {
|
||||
public async $removeTransaction(txid: string): Promise<void> {
|
||||
await DB.query(
|
||||
`
|
||||
DELETE FROM compact_transactions
|
||||
WHERE txid = UNHEX(?)
|
||||
`,
|
||||
[txid]
|
||||
);
|
||||
}
|
||||
|
||||
private convertCpfp(txid, cluster): CpfpInfo {
|
||||
const descendants: Ancestor[] = [];
|
||||
const ancestors: Ancestor[] = [];
|
||||
let matched = false;
|
||||
for (const tx of cpfp.txs) {
|
||||
if (tx.txid === cpfp.txid) {
|
||||
|
||||
for (const tx of cluster.txs) {
|
||||
if (tx.txid === txid) {
|
||||
matched = true;
|
||||
} else if (!matched) {
|
||||
descendants.push(tx);
|
||||
|
Loading…
x
Reference in New Issue
Block a user