Replace json prices.avg_prices
with table columns - update prices logs
This commit is contained in:
parent
c22aee5e60
commit
e8175a90f4
@ -4,7 +4,7 @@ import logger from '../logger';
|
|||||||
import { Common } from './common';
|
import { Common } from './common';
|
||||||
|
|
||||||
class DatabaseMigration {
|
class DatabaseMigration {
|
||||||
private static currentVersion = 22;
|
private static currentVersion = 23;
|
||||||
private queryTimeout = 120000;
|
private queryTimeout = 120000;
|
||||||
private statisticsAddedIndexed = false;
|
private statisticsAddedIndexed = false;
|
||||||
private uniqueLogs: string[] = [];
|
private uniqueLogs: string[] = [];
|
||||||
@ -231,6 +231,18 @@ class DatabaseMigration {
|
|||||||
await this.$executeQuery('DROP TABLE IF EXISTS `difficulty_adjustments`');
|
await this.$executeQuery('DROP TABLE IF EXISTS `difficulty_adjustments`');
|
||||||
await this.$executeQuery(this.getCreateDifficultyAdjustmentsTableQuery(), await this.$checkIfTableExists('difficulty_adjustments'));
|
await this.$executeQuery(this.getCreateDifficultyAdjustmentsTableQuery(), await this.$checkIfTableExists('difficulty_adjustments'));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (databaseSchemaVersion < 23) {
|
||||||
|
await this.$executeQuery('TRUNCATE `prices`');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` DROP `avg_prices`');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `USD` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `EUR` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `GBP` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `CAD` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `CHF` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `AUD` float DEFAULT "0"');
|
||||||
|
await this.$executeQuery('ALTER TABLE `prices` ADD `JPY` float DEFAULT "0"');
|
||||||
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,11 @@ import { Prices } from '../tasks/price-updater';
|
|||||||
class PricesRepository {
|
class PricesRepository {
|
||||||
public async $savePrices(time: number, prices: Prices): Promise<void> {
|
public async $savePrices(time: number, prices: Prices): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await DB.query(`INSERT INTO prices(time, avg_prices) VALUE (FROM_UNIXTIME(?), ?)`, [time, JSON.stringify(prices)]);
|
await DB.query(`
|
||||||
|
INSERT INTO prices(time, USD, EUR, GBP, CAD, CHF, AUD, JPY)
|
||||||
|
VALUE (FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ? )`,
|
||||||
|
[time, prices.USD, prices.EUR, prices.GBP, prices.CAD, prices.CHF, prices.AUD, prices.JPY]
|
||||||
|
);
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
logger.err(`Cannot save exchange rate into db. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.err(`Cannot save exchange rate into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -87,7 +87,7 @@ class KrakenApi implements PriceFeed {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Object.keys(priceHistory).length > 0) {
|
if (Object.keys(priceHistory).length > 0) {
|
||||||
logger.info(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`);
|
logger.notice(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ class PriceUpdater {
|
|||||||
++insertedCount;
|
++insertedCount;
|
||||||
}
|
}
|
||||||
if (insertedCount > 0) {
|
if (insertedCount > 0) {
|
||||||
logger.info(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
|
logger.notice(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert Kraken weekly prices
|
// Insert Kraken weekly prices
|
||||||
@ -205,23 +205,23 @@ class PriceUpdater {
|
|||||||
try {
|
try {
|
||||||
historicalPrices.push(await feed.$fetchRecentHourlyPrice(this.currencies));
|
historicalPrices.push(await feed.$fetchRecentHourlyPrice(this.currencies));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.info(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`);
|
logger.err(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group them by timestamp and currency, for example
|
// Group them by timestamp and currency, for example
|
||||||
// grouped[123456789]['USD'] = [1, 2, 3, 4];
|
// grouped[123456789]['USD'] = [1, 2, 3, 4];
|
||||||
let grouped: Object = {};
|
const grouped: Object = {};
|
||||||
for (const historicalEntry of historicalPrices) {
|
for (const historicalEntry of historicalPrices) {
|
||||||
for (const time in historicalEntry) {
|
for (const time in historicalEntry) {
|
||||||
if (existingPriceTimes.includes(parseInt(time, 10))) {
|
if (existingPriceTimes.includes(parseInt(time, 10))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (grouped[time] == undefined) {
|
if (grouped[time] === undefined) {
|
||||||
grouped[time] = {
|
grouped[time] = {
|
||||||
USD: [], EUR: [], GBP: [], CAD: [], CHF: [], AUD: [], JPY: []
|
USD: [], EUR: [], GBP: [], CAD: [], CHF: [], AUD: [], JPY: []
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const currency of this.currencies) {
|
for (const currency of this.currencies) {
|
||||||
@ -238,13 +238,20 @@ class PriceUpdater {
|
|||||||
for (const time in grouped) {
|
for (const time in grouped) {
|
||||||
const prices: Prices = this.getEmptyPricesObj();
|
const prices: Prices = this.getEmptyPricesObj();
|
||||||
for (const currency in grouped[time]) {
|
for (const currency in grouped[time]) {
|
||||||
prices[currency] = Math.round((grouped[time][currency].reduce((partialSum, a) => partialSum + a, 0)) / grouped[time][currency].length);
|
if (grouped[time][currency].length === 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
prices[currency] = Math.round((grouped[time][currency].reduce(
|
||||||
|
(partialSum, a) => partialSum + a, 0)
|
||||||
|
) / grouped[time][currency].length);
|
||||||
}
|
}
|
||||||
await PricesRepository.$savePrices(parseInt(time, 10), prices);
|
await PricesRepository.$savePrices(parseInt(time, 10), prices);
|
||||||
++totalInserted;
|
++totalInserted;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Inserted ${totalInserted} hourly historical prices into the db`);
|
if (totalInserted > 0) {
|
||||||
|
logger.notice(`Inserted ${totalInserted} hourly historical prices into the db`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,10 +50,14 @@ export async function query(path): Promise<object | undefined> {
|
|||||||
}
|
}
|
||||||
return data.data;
|
return data.data;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.err(`Could not connect to ${path}. Reason: ` + (e instanceof Error ? e.message : e));
|
logger.warn(`Could not connect to ${path} (Attempt ${retry + 1}/${config.MEMPOOL.EXTERNAL_MAX_RETRY}). Reason: ` + (e instanceof Error ? e.message : e));
|
||||||
retry++;
|
retry++;
|
||||||
}
|
}
|
||||||
await setDelay(config.MEMPOOL.EXTERNAL_RETRY_INTERVAL);
|
if (retry < config.MEMPOOL.EXTERNAL_MAX_RETRY) {
|
||||||
|
await setDelay(config.MEMPOOL.EXTERNAL_RETRY_INTERVAL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.err(`Could not connect to ${path}. All ${config.MEMPOOL.EXTERNAL_MAX_RETRY} attempts failed`);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user