Add block classification indexing

This commit is contained in:
Mononaut 2024-01-23 00:44:34 +00:00
parent 0c9c79c86c
commit 7405cf8336
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
9 changed files with 187 additions and 15 deletions

View File

@ -561,6 +561,115 @@ class Blocks {
logger.debug(`Indexing block audit details completed`); logger.debug(`Indexing block audit details completed`);
} }
/**
* [INDEXING] Index transaction classification flags for Goggles
*/
public async $classifyBlocks(): Promise<void> {
// classification requires an esplora backend
if (config.MEMPOOL.BACKEND !== 'esplora') {
return;
}
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
const currentBlockHeight = blockchainInfo.blocks;
const unclassifiedBlocksList = await BlocksSummariesRepository.$getSummariesWithVersion(0);
const unclassifiedTemplatesList = await BlocksSummariesRepository.$getTemplatesWithVersion(0);
// nothing to do
if (!unclassifiedBlocksList.length && !unclassifiedTemplatesList.length) {
return;
}
let timer = Date.now();
let indexedThisRun = 0;
let indexedTotal = 0;
const minHeight = Math.min(
unclassifiedBlocksList[unclassifiedBlocksList.length - 1].height ?? Infinity,
unclassifiedTemplatesList[unclassifiedTemplatesList.length - 1].height ?? Infinity,
);
const numToIndex = Math.max(
unclassifiedBlocksList.length,
unclassifiedTemplatesList.length,
);
const unclassifiedBlocks = {};
const unclassifiedTemplates = {};
for (const block of unclassifiedBlocksList) {
unclassifiedBlocks[block.height] = block.id;
}
for (const template of unclassifiedTemplatesList) {
unclassifiedTemplates[template.height] = template.id;
}
logger.debug(`Classifying blocks and templates from #${currentBlockHeight} to #${minHeight}`, logger.tags.goggles);
for (let height = currentBlockHeight; height >= 0; height--) {
let txs: TransactionExtended[] | null = null;
if (unclassifiedBlocks[height]) {
const blockHash = unclassifiedBlocks[height];
// fetch transactions
txs = (await bitcoinApi.$getTxsForBlock(blockHash)).map(tx => transactionUtils.extendTransaction(tx));
// add CPFP
const cpfpSummary = Common.calculateCpfp(height, txs, true);
// classify
const { transactions: classifiedTxs } = this.summarizeBlockTransactions(blockHash, cpfpSummary.transactions);
BlocksSummariesRepository.$saveTransactions(height, blockHash, classifiedTxs, 1);
}
if (unclassifiedTemplates[height]) {
// classify template
const blockHash = unclassifiedTemplates[height];
const template = await BlocksSummariesRepository.$getTemplate(blockHash);
const alreadyClassified = template?.transactions.reduce((classified, tx) => (classified || tx.flags > 0), false);
let classifiedTemplate = template?.transactions || [];
if (!alreadyClassified) {
const templateTxs: (TransactionExtended | TransactionClassified)[] = [];
const blockTxMap: { [txid: string]: TransactionExtended } = {};
for (const tx of (txs || [])) {
blockTxMap[tx.txid] = tx;
}
for (const templateTx of (template?.transactions || [])) {
let tx: TransactionExtended | null = blockTxMap[templateTx.txid];
if (!tx) {
try {
tx = await transactionUtils.$getTransactionExtended(templateTx.txid, false, true, false);
} catch (e) {
// transaction probably not found
}
}
templateTxs.push(tx || templateTx);
}
const cpfpSummary = Common.calculateCpfp(height, txs?.filter(tx => tx.effectiveFeePerVsize != null) as TransactionExtended[], true);
// classify
const { transactions: classifiedTxs } = this.summarizeBlockTransactions(blockHash, cpfpSummary.transactions);
const classifiedTxMap: { [txid: string]: TransactionClassified } = {};
for (const tx of classifiedTxs) {
classifiedTxMap[tx.txid] = tx;
}
classifiedTemplate = classifiedTemplate.map(tx => {
if (classifiedTxMap[tx.txid]) {
tx.flags = classifiedTxMap[tx.txid].flags || 0;
}
return tx;
});
}
BlocksSummariesRepository.$saveTemplate({ height, template: { id: blockHash, transactions: classifiedTemplate }, version: 1 });
}
// timing & logging
indexedThisRun++;
indexedTotal++;
const elapsedSeconds = (Date.now() - timer) / 1000;
if (elapsedSeconds > 5) {
const perSecond = indexedThisRun / elapsedSeconds;
logger.debug(`Classified #${height}: ${indexedTotal} / ${numToIndex} blocks (${perSecond.toFixed(1)}/s)`);
timer = Date.now();
indexedThisRun = 0;
}
}
}
/** /**
* [INDEXING] Index all blocks metadata for the mining dashboard * [INDEXING] Index all blocks metadata for the mining dashboard
*/ */
@ -966,6 +1075,7 @@ class Blocks {
let height = blockHeight; let height = blockHeight;
let summary: BlockSummary; let summary: BlockSummary;
let summaryVersion = 0;
if (cpfpSummary && !Common.isLiquid()) { if (cpfpSummary && !Common.isLiquid()) {
summary = { summary = {
id: hash, id: hash,
@ -980,10 +1090,12 @@ class Blocks {
}; };
}), }),
}; };
summaryVersion = 1;
} else { } else {
if (config.MEMPOOL.BACKEND === 'esplora') { if (config.MEMPOOL.BACKEND === 'esplora') {
const txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx)); const txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx));
summary = this.summarizeBlockTransactions(hash, txs); summary = this.summarizeBlockTransactions(hash, txs);
summaryVersion = 1;
} else { } else {
// Call Core RPC // Call Core RPC
const block = await bitcoinClient.getBlock(hash, 2); const block = await bitcoinClient.getBlock(hash, 2);
@ -998,7 +1110,7 @@ class Blocks {
// Index the response if needed // Index the response if needed
if (Common.blocksSummariesIndexingEnabled() === true) { if (Common.blocksSummariesIndexingEnabled() === true) {
await BlocksSummariesRepository.$saveTransactions(height, hash, summary.transactions); await BlocksSummariesRepository.$saveTransactions(height, hash, summary.transactions, summaryVersion);
} }
return summary.transactions; return summary.transactions;
@ -1114,16 +1226,18 @@ class Blocks {
if (cleanBlock.fee_amt_percentiles === null) { if (cleanBlock.fee_amt_percentiles === null) {
let summary; let summary;
let summaryVersion = 0;
if (config.MEMPOOL.BACKEND === 'esplora') { if (config.MEMPOOL.BACKEND === 'esplora') {
const txs = (await bitcoinApi.$getTxsForBlock(cleanBlock.hash)).map(tx => transactionUtils.extendTransaction(tx)); const txs = (await bitcoinApi.$getTxsForBlock(cleanBlock.hash)).map(tx => transactionUtils.extendTransaction(tx));
summary = this.summarizeBlockTransactions(cleanBlock.hash, txs); summary = this.summarizeBlockTransactions(cleanBlock.hash, txs);
summaryVersion = 1;
} else { } else {
// Call Core RPC // Call Core RPC
const block = await bitcoinClient.getBlock(cleanBlock.hash, 2); const block = await bitcoinClient.getBlock(cleanBlock.hash, 2);
summary = this.summarizeBlock(block); summary = this.summarizeBlock(block);
} }
await BlocksSummariesRepository.$saveTransactions(cleanBlock.height, cleanBlock.hash, summary.transactions); await BlocksSummariesRepository.$saveTransactions(cleanBlock.height, cleanBlock.hash, summary.transactions, summaryVersion);
cleanBlock.fee_amt_percentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(cleanBlock.hash); cleanBlock.fee_amt_percentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(cleanBlock.hash);
} }
if (cleanBlock.fee_amt_percentiles !== null) { if (cleanBlock.fee_amt_percentiles !== null) {

View File

@ -635,12 +635,12 @@ export class Common {
} }
} }
static calculateCpfp(height: number, transactions: TransactionExtended[]): CpfpSummary { static calculateCpfp(height: number, transactions: TransactionExtended[], saveRelatives: boolean = false): CpfpSummary {
const clusters: CpfpCluster[] = []; // list of all cpfp clusters in this block const clusters: CpfpCluster[] = []; // list of all cpfp clusters in this block
const clusterMap: { [txid: string]: CpfpCluster } = {}; // map transactions to their cpfp cluster const clusterMap: { [txid: string]: CpfpCluster } = {}; // map transactions to their cpfp cluster
let clusterTxs: TransactionExtended[] = []; // working list of elements of the current cluster let clusterTxs: TransactionExtended[] = []; // working list of elements of the current cluster
let ancestors: { [txid: string]: boolean } = {}; // working set of ancestors of the current cluster root let ancestors: { [txid: string]: boolean } = {}; // working set of ancestors of the current cluster root
const txMap = {}; const txMap: { [txid: string]: TransactionExtended } = {};
// initialize the txMap // initialize the txMap
for (const tx of transactions) { for (const tx of transactions) {
txMap[tx.txid] = tx; txMap[tx.txid] = tx;
@ -710,6 +710,15 @@ export class Common {
} }
} }
} }
if (saveRelatives) {
for (const cluster of clusters) {
cluster.txs.forEach((member, index) => {
txMap[member.txid].descendants = cluster.txs.slice(0, index).reverse();
txMap[member.txid].ancestors = cluster.txs.slice(index + 1).reverse();
txMap[member.txid].effectiveFeePerVsize = cluster.effectiveFeePerVsize;
});
}
}
return { return {
transactions, transactions,
clusters, clusters,

View File

@ -7,7 +7,7 @@ import cpfpRepository from '../repositories/CpfpRepository';
import { RowDataPacket } from 'mysql2'; import { RowDataPacket } from 'mysql2';
class DatabaseMigration { class DatabaseMigration {
private static currentVersion = 66; private static currentVersion = 67;
private queryTimeout = 3600_000; private queryTimeout = 3600_000;
private statisticsAddedIndexed = false; private statisticsAddedIndexed = false;
private uniqueLogs: string[] = []; private uniqueLogs: string[] = [];
@ -558,6 +558,14 @@ class DatabaseMigration {
await this.$executeQuery('ALTER TABLE `statistics` ADD min_fee FLOAT UNSIGNED DEFAULT NULL'); await this.$executeQuery('ALTER TABLE `statistics` ADD min_fee FLOAT UNSIGNED DEFAULT NULL');
await this.updateToSchemaVersion(66); await this.updateToSchemaVersion(66);
} }
if (databaseSchemaVersion < 67) {
await this.$executeQuery('ALTER TABLE `blocks_summaries` ADD version INT NOT NULL DEFAULT 0');
await this.$executeQuery('ALTER TABLE `blocks_summaries` ADD INDEX `version` (`version`)');
await this.$executeQuery('ALTER TABLE `blocks_templates` ADD version INT NOT NULL DEFAULT 0');
await this.$executeQuery('ALTER TABLE `blocks_templates` ADD INDEX `version` (`version`)');
await this.updateToSchemaVersion(67);
}
} }
/** /**

View File

@ -703,7 +703,8 @@ class WebsocketHandler {
template: { template: {
id: block.id, id: block.id,
transactions: stripped, transactions: stripped,
} },
version: 1,
}); });
BlocksAuditsRepository.$saveAudit({ BlocksAuditsRepository.$saveAudit({

View File

@ -185,6 +185,7 @@ class Indexer {
await blocks.$generateCPFPDatabase(); await blocks.$generateCPFPDatabase();
await blocks.$generateAuditStats(); await blocks.$generateAuditStats();
await auditReplicator.$sync(); await auditReplicator.$sync();
await blocks.$classifyBlocks();
} catch (e) { } catch (e) {
this.indexerRunning = false; this.indexerRunning = false;
logger.err(`Indexer failed, trying again in 10 seconds. Reason: ` + (e instanceof Error ? e.message : e)); logger.err(`Indexer failed, trying again in 10 seconds. Reason: ` + (e instanceof Error ? e.message : e));

View File

@ -35,6 +35,7 @@ class Logger {
public tags = { public tags = {
mining: 'Mining', mining: 'Mining',
ln: 'Lightning', ln: 'Lightning',
goggles: 'Goggles',
}; };
// @ts-ignore // @ts-ignore

View File

@ -105,7 +105,8 @@ class AuditReplication {
template: { template: {
id: blockHash, id: blockHash,
transactions: auditSummary.template || [] transactions: auditSummary.template || []
} },
version: 1,
}); });
await blocksAuditsRepository.$saveAudit({ await blocksAuditsRepository.$saveAudit({
hash: blockHash, hash: blockHash,

View File

@ -1040,16 +1040,18 @@ class BlocksRepository {
if (extras.feePercentiles === null) { if (extras.feePercentiles === null) {
let summary; let summary;
let summaryVersion = 0;
if (config.MEMPOOL.BACKEND === 'esplora') { if (config.MEMPOOL.BACKEND === 'esplora') {
const txs = (await bitcoinApi.$getTxsForBlock(dbBlk.id)).map(tx => transactionUtils.extendTransaction(tx)); const txs = (await bitcoinApi.$getTxsForBlock(dbBlk.id)).map(tx => transactionUtils.extendTransaction(tx));
summary = blocks.summarizeBlockTransactions(dbBlk.id, txs); summary = blocks.summarizeBlockTransactions(dbBlk.id, txs);
summaryVersion = 1;
} else { } else {
// Call Core RPC // Call Core RPC
const block = await bitcoinClient.getBlock(dbBlk.id, 2); const block = await bitcoinClient.getBlock(dbBlk.id, 2);
summary = blocks.summarizeBlock(block); summary = blocks.summarizeBlock(block);
} }
await BlocksSummariesRepository.$saveTransactions(dbBlk.height, dbBlk.id, summary.transactions); await BlocksSummariesRepository.$saveTransactions(dbBlk.height, dbBlk.id, summary.transactions, summaryVersion);
extras.feePercentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(dbBlk.id); extras.feePercentiles = await BlocksSummariesRepository.$getFeePercentilesByBlockId(dbBlk.id);
} }
if (extras.feePercentiles !== null) { if (extras.feePercentiles !== null) {

View File

@ -17,30 +17,30 @@ class BlocksSummariesRepository {
return undefined; return undefined;
} }
public async $saveTransactions(blockHeight: number, blockId: string, transactions: TransactionClassified[]): Promise<void> { public async $saveTransactions(blockHeight: number, blockId: string, transactions: TransactionClassified[], version: number): Promise<void> {
try { try {
const transactionsStr = JSON.stringify(transactions); const transactionsStr = JSON.stringify(transactions);
await DB.query(` await DB.query(`
INSERT INTO blocks_summaries INSERT INTO blocks_summaries
SET height = ?, transactions = ?, id = ? SET height = ?, transactions = ?, id = ?, version = ?
ON DUPLICATE KEY UPDATE transactions = ?`, ON DUPLICATE KEY UPDATE transactions = ?`,
[blockHeight, transactionsStr, blockId, transactionsStr]); [blockHeight, transactionsStr, blockId, version, transactionsStr]);
} catch (e: any) { } catch (e: any) {
logger.debug(`Cannot save block summary transactions for ${blockId}. Reason: ${e instanceof Error ? e.message : e}`); logger.debug(`Cannot save block summary transactions for ${blockId}. Reason: ${e instanceof Error ? e.message : e}`);
throw e; throw e;
} }
} }
public async $saveTemplate(params: { height: number, template: BlockSummary}) { public async $saveTemplate(params: { height: number, template: BlockSummary, version: number}): Promise<void> {
const blockId = params.template?.id; const blockId = params.template?.id;
try { try {
const transactions = JSON.stringify(params.template?.transactions || []); const transactions = JSON.stringify(params.template?.transactions || []);
await DB.query(` await DB.query(`
INSERT INTO blocks_templates (id, template) INSERT INTO blocks_templates (id, template, version)
VALUE (?, ?) VALUE (?, ?, ?)
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
template = ? template = ?
`, [blockId, transactions, transactions]); `, [blockId, transactions, params.version, transactions]);
} catch (e: any) { } catch (e: any) {
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
logger.debug(`Cannot save block template for ${blockId} because it has already been indexed, ignoring`); logger.debug(`Cannot save block template for ${blockId} because it has already been indexed, ignoring`);
@ -76,6 +76,41 @@ class BlocksSummariesRepository {
return []; return [];
} }
public async $getSummariesWithVersion(version: number): Promise<{ height: number, id: string }[]> {
try {
const [rows]: any[] = await DB.query(`
SELECT
height,
id
FROM blocks_summaries
WHERE version = ?
ORDER BY height DESC;`, [version]);
return rows;
} catch (e) {
logger.err(`Cannot get block summaries with version. Reason: ` + (e instanceof Error ? e.message : e));
}
return [];
}
public async $getTemplatesWithVersion(version: number): Promise<{ height: number, id: string }[]> {
try {
const [rows]: any[] = await DB.query(`
SELECT
blocks_summaries.height as height,
blocks_templates.id as id
FROM blocks_templates
JOIN blocks_summaries ON blocks_templates.id = blocks_summaries.id
WHERE blocks_templates.version = ?
ORDER BY height DESC;`, [version]);
return rows;
} catch (e) {
logger.err(`Cannot get block summaries with version. Reason: ` + (e instanceof Error ? e.message : e));
}
return [];
}
/** /**
* Get the fee percentiles if the block has already been indexed, [] otherwise * Get the fee percentiles if the block has already been indexed, [] otherwise
* *