handle gaps in indexed CPFP data
This commit is contained in:
parent
8de3fd0988
commit
f0d3bb87c6
@ -338,10 +338,10 @@ class Blocks {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// Get all indexed block hash
|
// Get all indexed block hash
|
||||||
const unindexedBlocks = await blocksRepository.$getCPFPUnindexedBlocks();
|
const unindexedBlockHeights = await blocksRepository.$getCPFPUnindexedBlocks();
|
||||||
logger.info(`Indexing cpfp data for ${unindexedBlocks.length} blocks`);
|
logger.info(`Indexing cpfp data for ${unindexedBlockHeights.length} blocks`);
|
||||||
|
|
||||||
if (!unindexedBlocks?.length) {
|
if (!unindexedBlockHeights?.length) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,32 +350,26 @@ class Blocks {
|
|||||||
let countThisRun = 0;
|
let countThisRun = 0;
|
||||||
let timer = new Date().getTime() / 1000;
|
let timer = new Date().getTime() / 1000;
|
||||||
const startedAt = new Date().getTime() / 1000;
|
const startedAt = new Date().getTime() / 1000;
|
||||||
let lastHeight;
|
for (const height of unindexedBlockHeights) {
|
||||||
for (const block of unindexedBlocks) {
|
|
||||||
// Logging
|
// Logging
|
||||||
|
const hash = await bitcoinApi.$getBlockHash(height);
|
||||||
const elapsedSeconds = Math.max(1, new Date().getTime() / 1000 - timer);
|
const elapsedSeconds = Math.max(1, new Date().getTime() / 1000 - timer);
|
||||||
if (elapsedSeconds > 5) {
|
if (elapsedSeconds > 5) {
|
||||||
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
|
const runningFor = Math.max(1, Math.round((new Date().getTime() / 1000) - startedAt));
|
||||||
const blockPerSeconds = (countThisRun / elapsedSeconds);
|
const blockPerSeconds = (countThisRun / elapsedSeconds);
|
||||||
const progress = Math.round(count / unindexedBlocks.length * 10000) / 100;
|
const progress = Math.round(count / unindexedBlockHeights.length * 10000) / 100;
|
||||||
logger.debug(`Indexing cpfp clusters for #${block.height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlocks.length} (${progress}%) | elapsed: ${runningFor} seconds`);
|
logger.debug(`Indexing cpfp clusters for #${height} | ~${blockPerSeconds.toFixed(2)} blocks/sec | total: ${count}/${unindexedBlockHeights.length} (${progress}%) | elapsed: ${runningFor} seconds`);
|
||||||
timer = new Date().getTime() / 1000;
|
timer = new Date().getTime() / 1000;
|
||||||
countThisRun = 0;
|
countThisRun = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.$indexCPFP(block.hash, block.height); // Calculate and save CPFP data for transactions in this block
|
await this.$indexCPFP(hash, height); // Calculate and save CPFP data for transactions in this block
|
||||||
|
|
||||||
lastHeight = block.height;
|
|
||||||
// Logging
|
// Logging
|
||||||
count++;
|
count++;
|
||||||
countThisRun++;
|
countThisRun++;
|
||||||
}
|
}
|
||||||
if (count > 0) {
|
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
|
||||||
await cpfpRepository.$insertProgressMarker(lastHeight);
|
|
||||||
logger.notice(`CPFP indexing completed: indexed ${count} blocks`);
|
|
||||||
} else {
|
|
||||||
logger.debug(`CPFP indexing completed: indexed ${count} blocks`);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err(`CPFP indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
|
logger.err(`CPFP indexing failed. Trying again in 10 seconds. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||||
throw e;
|
throw e;
|
||||||
@ -790,7 +784,10 @@ class Blocks {
|
|||||||
ancestors[vin.txid] = true;
|
ancestors[vin.txid] = true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
await cpfpRepository.$batchSaveClusters(clusters);
|
const result = await cpfpRepository.$batchSaveClusters(clusters);
|
||||||
|
if (!result) {
|
||||||
|
await cpfpRepository.$insertProgressMarker(height);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,19 +671,25 @@ class BlocksRepository {
|
|||||||
try {
|
try {
|
||||||
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
|
const blockchainInfo = await bitcoinClient.getBlockchainInfo();
|
||||||
const currentBlockHeight = blockchainInfo.blocks;
|
const currentBlockHeight = blockchainInfo.blocks;
|
||||||
const [lastHeightRows]: any = await DB.query(`SELECT MIN(height) AS minHeight from compact_cpfp_clusters`);
|
let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, currentBlockHeight);
|
||||||
const lastHeight = (lastHeightRows.length && lastHeightRows[0].minHeight != null) ? lastHeightRows[0].minHeight : currentBlockHeight;
|
|
||||||
|
|
||||||
let indexingBlockAmount = Math.min(config.MEMPOOL.INDEXING_BLOCKS_AMOUNT, blockchainInfo.blocks);
|
|
||||||
if (indexingBlockAmount <= -1) {
|
if (indexingBlockAmount <= -1) {
|
||||||
indexingBlockAmount = currentBlockHeight + 1;
|
indexingBlockAmount = currentBlockHeight + 1;
|
||||||
}
|
}
|
||||||
const firstHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1);
|
const minHeight = Math.max(0, currentBlockHeight - indexingBlockAmount + 1);
|
||||||
|
|
||||||
if (firstHeight < lastHeight) {
|
const [rows]: any[] = await DB.query(`
|
||||||
const [rows]: any = await DB.query(`SELECT height, hash FROM blocks WHERE height BETWEEN ? AND ? ORDER BY height DESC`, [firstHeight, lastHeight]);
|
SELECT height
|
||||||
return rows;
|
FROM compact_cpfp_clusters
|
||||||
}
|
WHERE height <= ? AND height >= ?
|
||||||
|
ORDER BY height DESC;
|
||||||
|
`, [currentBlockHeight, minHeight]);
|
||||||
|
|
||||||
|
const indexedHeights = {};
|
||||||
|
rows.forEach((row) => { indexedHeights[row.height] = true; });
|
||||||
|
const allHeights: number[] = Array.from(Array(currentBlockHeight - minHeight + 1).keys(), n => n + minHeight).reverse();
|
||||||
|
const unindexedHeights = allHeights.filter(x => !indexedHeights[x]);
|
||||||
|
|
||||||
|
return unindexedHeights;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err('Cannot fetch CPFP unindexed blocks. Reason: ' + (e instanceof Error ? e.message : e));
|
logger.err('Cannot fetch CPFP unindexed blocks. Reason: ' + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -6,9 +6,9 @@ import { Ancestor } from '../mempool.interfaces';
|
|||||||
import transactionRepository from '../repositories/TransactionRepository';
|
import transactionRepository from '../repositories/TransactionRepository';
|
||||||
|
|
||||||
class CpfpRepository {
|
class CpfpRepository {
|
||||||
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<void> {
|
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
|
||||||
if (!txs[0]) {
|
if (!txs[0]) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
// skip clusters of transactions with the same fees
|
// skip clusters of transactions with the same fees
|
||||||
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
|
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
|
||||||
@ -16,7 +16,7 @@ class CpfpRepository {
|
|||||||
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
|
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
|
||||||
}, true);
|
}, true);
|
||||||
if (equalFee) {
|
if (equalFee) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -41,13 +41,14 @@ class CpfpRepository {
|
|||||||
await transactionRepository.$batchSetCluster(chunk);
|
await transactionRepository.$batchSetCluster(chunk);
|
||||||
chunkIndex += maxChunk;
|
chunkIndex += maxChunk;
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async $batchSaveClusters(clusters: { root: string, height: number, txs: any, effectiveFeePerVsize: number}[]): Promise<void> {
|
public async $batchSaveClusters(clusters: { root: string, height: number, txs: any, effectiveFeePerVsize: number}[]): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
const clusterValues: any[] = [];
|
const clusterValues: any[] = [];
|
||||||
const txs: any[] = [];
|
const txs: any[] = [];
|
||||||
@ -73,7 +74,7 @@ class CpfpRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!clusterValues.length) {
|
if (!clusterValues.length) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxChunk = 100;
|
const maxChunk = 100;
|
||||||
@ -103,7 +104,7 @@ class CpfpRepository {
|
|||||||
);
|
);
|
||||||
chunkIndex += maxChunk;
|
chunkIndex += maxChunk;
|
||||||
}
|
}
|
||||||
return;
|
return true;
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user