[Indexing] Link blocks to their closest known price

This commit is contained in:
nymkappa 2022-07-09 16:53:29 +02:00
parent 80b3b91a82
commit 40634a0eb8
No known key found for this signature in database
GPG Key ID: E155910B16E8BD04
8 changed files with 338 additions and 206 deletions

View File

@ -18,13 +18,10 @@ import BlocksRepository from '../repositories/BlocksRepository';
import HashratesRepository from '../repositories/HashratesRepository';
import indexer from '../indexer';
import fiatConversion from './fiat-conversion';
import RatesRepository from '../repositories/RatesRepository';
import database from '../database';
import poolsParser from './pools-parser';
import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository';
import mining from './mining/mining';
import DifficultyAdjustmentsRepository from '../repositories/DifficultyAdjustmentsRepository';
import difficultyAdjustment from './difficulty-adjustment';
class Blocks {
private blocks: BlockExtended[] = [];

View File

@ -4,7 +4,7 @@ import logger from '../logger';
import { Common } from './common';
class DatabaseMigration {
private static currentVersion = 29;
private static currentVersion = 30;
private queryTimeout = 120000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@ -12,8 +12,6 @@ class DatabaseMigration {
private blocksTruncatedMessage = `'blocks' table has been truncated. Re-indexing from scratch.`;
private hashratesTruncatedMessage = `'hashrates' table has been truncated. Re-indexing from scratch.`;
constructor() { }
/**
* Avoid printing multiple time the same message
*/
@ -104,7 +102,7 @@ class DatabaseMigration {
await this.$setStatisticsAddedIndexedFlag(databaseSchemaVersion);
const isBitcoin = ['mainnet', 'testnet', 'signet'].includes(config.MEMPOOL.NETWORK);
try {
await this.$executeQuery(this.getCreateElementsTableQuery(), await this.$checkIfTableExists('elements_pegs'));
await this.$executeQuery(this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics'));
if (databaseSchemaVersion < 2 && this.statisticsAddedIndexed === false) {
@ -265,15 +263,6 @@ class DatabaseMigration {
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD unannounced_nodes int(11) NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 27 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD avg_capacity bigint(20) unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD avg_fee_rate int(11) unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD avg_base_fee_mtokens bigint(20) unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD med_capacity bigint(20) unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD med_fee_rate int(11) unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD med_base_fee_mtokens bigint(20) unsigned NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 28 && isBitcoin === true) {
await this.$executeQuery(`TRUNCATE lightning_stats`);
await this.$executeQuery(`TRUNCATE node_stats`);
@ -290,9 +279,10 @@ class DatabaseMigration {
await this.$executeQuery('ALTER TABLE `nodes` ADD longitude double NULL DEFAULT NULL');
await this.$executeQuery('ALTER TABLE `nodes` ADD latitude double NULL DEFAULT NULL');
}
} catch (e) {
throw e;
if (databaseSchemaVersion < 25 && isBitcoin == true) { // Link blocks to prices
await this.$executeQuery('ALTER TABLE `prices` ADD `id` int NULL AUTO_INCREMENT UNIQUE');
await this.$executeQuery('DROP TABLE IF EXISTS `blocks_prices`');
await this.$executeQuery(this.getCreateBlocksPricesTableQuery(), await this.$checkIfTableExists('blocks_prices'));
}
}
@ -331,7 +321,7 @@ class DatabaseMigration {
/**
* Small query execution wrapper to log all executed queries
*/
private async $executeQuery(query: string, silent: boolean = false): Promise<any> {
private async $executeQuery(query: string, silent = false): Promise<any> {
if (!silent) {
logger.debug('MIGRATIONS: Execute query:\n' + query);
}
@ -360,7 +350,6 @@ class DatabaseMigration {
* Create the `state` table
*/
private async $createMigrationStateTable(): Promise<void> {
try {
const query = `CREATE TABLE IF NOT EXISTS state (
name varchar(25) NOT NULL,
number int(11) NULL,
@ -372,9 +361,6 @@ class DatabaseMigration {
// Set initial values
await this.$executeQuery(`INSERT INTO state VALUES('schema_version', 0, NULL);`);
await this.$executeQuery(`INSERT INTO state VALUES('last_elements_block', 0, NULL);`);
} catch (e) {
throw e;
}
}
/**
@ -714,6 +700,15 @@ class DatabaseMigration {
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`
}
private getCreateBlocksPricesTableQuery(): string {
return `CREATE TABLE IF NOT EXISTS blocks_prices (
height int(10) unsigned NOT NULL,
price_id int(10) unsigned NOT NULL,
PRIMARY KEY (height),
INDEX (price_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
}
public async $truncateIndexedData(tables: string[]) {
const allowedTables = ['blocks', 'hashrates', 'prices'];

View File

@ -11,8 +11,11 @@ import indexer from '../../indexer';
import DifficultyAdjustmentsRepository from '../../repositories/DifficultyAdjustmentsRepository';
import config from '../../config';
import BlocksAuditsRepository from '../../repositories/BlocksAuditsRepository';
import PricesRepository from '../repositories/PricesRepository';
class Mining {
blocksPriceIndexingRunning = false;
constructor() {
}
@ -453,6 +456,70 @@ class Mining {
}
}
/**
* Create a link between blocks and the latest price at when they were mined
*/
public async $indexBlockPrices() {
if (this.blocksPriceIndexingRunning === true) {
return;
}
this.blocksPriceIndexingRunning = true;
try {
const prices: any[] = await PricesRepository.$getPricesTimesAndId();
const blocksWithoutPrices: any[] = await BlocksRepository.$getBlocksWithoutPrice();
let totalInserted = 0;
const blocksPrices: BlockPrice[] = [];
for (const block of blocksWithoutPrices) {
// Quick optimisation, out mtgox feed only goes back to 2010-07-19 02:00:00, so skip the first 68951 blocks
if (block.height < 68951) {
blocksPrices.push({
height: block.height,
priceId: prices[0].id,
});
continue;
}
for (const price of prices) {
if (block.timestamp < price.time) {
blocksPrices.push({
height: block.height,
priceId: price.id,
});
break;
};
}
if (blocksPrices.length >= 100000) {
totalInserted += blocksPrices.length;
if (blocksWithoutPrices.length > 200000) {
logger.debug(`Linking ${blocksPrices.length} newly indexed blocks to their closest price | Progress ${Math.round(totalInserted / blocksWithoutPrices.length * 100)}%`);
} else {
logger.debug(`Linking ${blocksPrices.length} newly indexed blocks to their closest price`);
}
await BlocksRepository.$saveBlockPrices(blocksPrices);
blocksPrices.length = 0;
}
}
if (blocksPrices.length > 0) {
totalInserted += blocksPrices.length;
if (blocksWithoutPrices.length > 200000) {
logger.debug(`Linking ${blocksPrices.length} newly indexed blocks to their closest price | Progress ${Math.round(totalInserted / blocksWithoutPrices.length * 100)}%`);
} else {
logger.debug(`Linking ${blocksPrices.length} newly indexed blocks to their closest price`);
}
await BlocksRepository.$saveBlockPrices(blocksPrices);
}
} catch (e) {
this.blocksPriceIndexingRunning = false;
throw e;
}
this.blocksPriceIndexingRunning = false;
}
private getDateMidnight(date: Date): Date {
date.setUTCHours(0);
date.setUTCMinutes(0);

View File

@ -35,6 +35,8 @@ import miningRoutes from "./api/mining/mining-routes";
import bisqRoutes from "./api/bisq/bisq.routes";
import liquidRoutes from "./api/liquid/liquid.routes";
import bitcoinRoutes from "./api/bitcoin/bitcoin.routes";
import BlocksAuditsRepository from './repositories/BlocksAuditsRepository';
import mining from "./api/mining";
class Server {
private wss: WebSocket.Server | undefined;
@ -166,7 +168,7 @@ class Server {
await blocks.$updateBlocks();
await memPool.$updateMempool();
indexer.$run();
priceUpdater.$run();
priceUpdater.$run().then(mining.$indexBlockPrices.bind(this));
setTimeout(this.runMainUpdateLoop.bind(this), config.MEMPOOL.POLL_RATE_MS);
this.currentBackendRetryInterval = 5;

View File

@ -48,7 +48,7 @@ class Indexer {
}
await mining.$indexDifficultyAdjustments();
await this.$resetHashratesIndexingState();
await this.$resetHashratesIndexingState(); // TODO - Remove this as it's not efficient
await mining.$generateNetworkHashrateHistory();
await mining.$generatePoolHashrateHistory();
await blocks.$generateBlocksSummariesDatabase();

View File

@ -121,6 +121,11 @@ export interface BlockSummary {
transactions: TransactionStripped[];
}
export interface BlockPrice {
height: number;
priceId: number;
}
export interface TransactionMinerInfo {
vin: VinStrippedToScriptsig[];
vout: VoutStrippedToScriptPubkey[];

View File

@ -1,4 +1,4 @@
import { BlockExtended } from '../mempool.interfaces';
import { BlockExtended, BlockPrice } from '../mempool.interfaces';
import DB from '../database';
import logger from '../logger';
import { Common } from '../api/common';
@ -275,9 +275,7 @@ class BlocksRepository {
previous_block_hash as previousblockhash,
avg_fee,
avg_fee_rate,
IFNULL(JSON_EXTRACT(rates.bisq_rates, '$.USD'), null) as usd
FROM blocks
LEFT JOIN rates on rates.height = blocks.height
WHERE pool_id = ?`;
params.push(pool.id);
@ -335,12 +333,10 @@ class BlocksRepository {
merkle_root,
previous_block_hash as previousblockhash,
avg_fee,
avg_fee_rate,
IFNULL(JSON_EXTRACT(rates.bisq_rates, '$.USD'), null) as usd
avg_fee_rate
FROM blocks
JOIN pools ON blocks.pool_id = pools.id
LEFT JOIN rates on rates.height = blocks.height
WHERE blocks.height = ${height};
WHERE blocks.height = ${height}
`);
if (rows.length <= 0) {
@ -365,10 +361,8 @@ class BlocksRepository {
pools.id as pool_id, pools.name as pool_name, pools.link as pool_link, pools.slug as pool_slug,
pools.addresses as pool_addresses, pools.regexes as pool_regexes,
previous_block_hash as previousblockhash,
IFNULL(JSON_EXTRACT(rates.bisq_rates, '$.USD'), null) as usd
FROM blocks
JOIN pools ON blocks.pool_id = pools.id
LEFT JOIN rates on rates.height = blocks.height
WHERE hash = '${hash}';
`;
const [rows]: any[] = await DB.query(query);
@ -393,7 +387,20 @@ class BlocksRepository {
const [rows]: any[] = await DB.query(`SELECT UNIX_TIMESTAMP(blockTimestamp) as time, height, difficulty FROM blocks`);
return rows;
} catch (e) {
logger.err('Cannot generate difficulty history. Reason: ' + (e instanceof Error ? e.message : e));
logger.err('Cannot get blocks difficulty list from the db. Reason: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
/**
* Return blocks height
*/
public async $getBlocksHeightsAndTimestamp(): Promise<object[]> {
try {
const [rows]: any[] = await DB.query(`SELECT height, blockTimestamp as timestamp FROM blocks`);
return rows;
} catch (e) {
logger.err('Cannot get blocks height and timestamp from the db. Reason: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
@ -481,10 +488,9 @@ class BlocksRepository {
let query = `SELECT
CAST(AVG(blocks.height) as INT) as avgHeight,
CAST(AVG(UNIX_TIMESTAMP(blockTimestamp)) as INT) as timestamp,
CAST(AVG(fees) as INT) as avgFees,
IFNULL(JSON_EXTRACT(rates.bisq_rates, '$.USD'), null) as usd
CAST(AVG(fees) as INT) as avgFees
FROM blocks
LEFT JOIN rates on rates.height = blocks.height`;
`;
if (interval !== null) {
query += ` WHERE blockTimestamp BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW()`;
@ -508,10 +514,9 @@ class BlocksRepository {
let query = `SELECT
CAST(AVG(blocks.height) as INT) as avgHeight,
CAST(AVG(UNIX_TIMESTAMP(blockTimestamp)) as INT) as timestamp,
CAST(AVG(reward) as INT) as avgRewards,
IFNULL(JSON_EXTRACT(rates.bisq_rates, '$.USD'), null) as usd
CAST(AVG(reward) as INT) as avgRewards
FROM blocks
LEFT JOIN rates on rates.height = blocks.height`;
`;
if (interval !== null) {
query += ` WHERE blockTimestamp BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW()`;
@ -638,6 +643,62 @@ class BlocksRepository {
throw e;
}
}
/**
* Get all blocks which have not be linked to a price yet
*/
public async $getBlocksWithoutPrice(): Promise<object[]> {
try {
const [rows]: any[] = await DB.query(`
SELECT UNIX_TIMESTAMP(blocks.blockTimestamp) as timestamp, blocks.height
FROM blocks
LEFT JOIN blocks_prices ON blocks.height = blocks_prices.height
WHERE blocks_prices.height IS NULL
ORDER BY blocks.height
`);
return rows;
} catch (e) {
logger.err('Cannot get blocks height and timestamp from the db. Reason: ' + (e instanceof Error ? e.message : e));
throw e;
}
}
/**
* Save block price
*/
public async $saveBlockPrice(blockPrice: BlockPrice): Promise<void> {
try {
await DB.query(`INSERT INTO blocks_prices(height, price_id) VALUE (?, ?)`, [blockPrice.height, blockPrice.priceId]);
} catch (e: any) {
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
logger.debug(`Cannot save block price for block ${blockPrice.height} because it has already been indexed, ignoring`);
} else {
logger.err(`Cannot save block price into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
}
/**
* Save block price by batch
*/
public async $saveBlockPrices(blockPrices: BlockPrice[]): Promise<void> {
try {
let query = `INSERT INTO blocks_prices(height, price_id) VALUES`;
for (const price of blockPrices) {
query += ` (${price.height}, ${price.priceId}),`
}
query = query.slice(0, -1);
await DB.query(query);
} catch (e: any) {
if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart
logger.debug(`Cannot save blocks prices for blocks [${blockPrices[0].height} to ${blockPrices[blockPrices.length - 1].height}] because it has already been indexed, ignoring`);
} else {
logger.err(`Cannot save blocks prices for blocks [${blockPrices[0].height} to ${blockPrices[blockPrices.length - 1].height}] into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
}
}
export default new BlocksRepository();

View File

@ -33,9 +33,14 @@ class PricesRepository {
}
public async $getPricesTimes(): Promise<number[]> {
const [times]: any[] = await DB.query(`SELECT UNIX_TIMESTAMP(time) as time from prices WHERE USD != -1`);
const [times]: any[] = await DB.query(`SELECT UNIX_TIMESTAMP(time) as time from prices WHERE USD != -1 ORDER BY time`);
return times.map(time => time.time);
}
public async $getPricesTimesAndId(): Promise<number[]> {
const [times]: any[] = await DB.query(`SELECT UNIX_TIMESTAMP(time) as time, id, USD from prices ORDER BY time`);
return times;
}
}
export default new PricesRepository();