Merge pull request #1940 from mempool/nymkappa/feature/price-columns
Replace json `prices.avg_prices` with table columns - update prices logs
This commit is contained in:
		
						commit
						ec2d079da4
					
				@ -4,7 +4,7 @@ import logger from '../logger';
 | 
			
		||||
import { Common } from './common';
 | 
			
		||||
 | 
			
		||||
class DatabaseMigration {
 | 
			
		||||
  private static currentVersion = 22;
 | 
			
		||||
  private static currentVersion = 23;
 | 
			
		||||
  private queryTimeout = 120000;
 | 
			
		||||
  private statisticsAddedIndexed = false;
 | 
			
		||||
  private uniqueLogs: string[] = [];
 | 
			
		||||
@ -231,6 +231,18 @@ class DatabaseMigration {
 | 
			
		||||
        await this.$executeQuery('DROP TABLE IF EXISTS `difficulty_adjustments`');
 | 
			
		||||
        await this.$executeQuery(this.getCreateDifficultyAdjustmentsTableQuery(), await this.$checkIfTableExists('difficulty_adjustments'));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (databaseSchemaVersion < 23) {
 | 
			
		||||
        await this.$executeQuery('TRUNCATE `prices`');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` DROP `avg_prices`');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `USD` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `EUR` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `GBP` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `CAD` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `CHF` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `AUD` float DEFAULT "0"');
 | 
			
		||||
        await this.$executeQuery('ALTER TABLE `prices` ADD `JPY` float DEFAULT "0"');
 | 
			
		||||
      }
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      throw e;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,11 @@ import { Prices } from '../tasks/price-updater';
 | 
			
		||||
class PricesRepository {
 | 
			
		||||
  public async $savePrices(time: number, prices: Prices): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      await DB.query(`INSERT INTO prices(time, avg_prices) VALUE (FROM_UNIXTIME(?), ?)`, [time, JSON.stringify(prices)]);
 | 
			
		||||
      await DB.query(`
 | 
			
		||||
        INSERT INTO prices(time,             USD, EUR, GBP, CAD, CHF, AUD, JPY)
 | 
			
		||||
        VALUE             (FROM_UNIXTIME(?), ?,   ?,   ?,   ?,   ?,   ?,   ?  )`,
 | 
			
		||||
        [time, prices.USD, prices.EUR, prices.GBP, prices.CAD, prices.CHF, prices.AUD, prices.JPY]
 | 
			
		||||
      );
 | 
			
		||||
    } catch (e: any) {
 | 
			
		||||
      logger.err(`Cannot save exchange rate into db. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
      throw e;
 | 
			
		||||
 | 
			
		||||
@ -87,7 +87,7 @@ class KrakenApi implements PriceFeed {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (Object.keys(priceHistory).length > 0) {
 | 
			
		||||
      logger.info(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`);
 | 
			
		||||
      logger.notice(`Inserted ${Object.keys(priceHistory).length} Kraken EUR, USD, GBP, JPY, CAD, CHF and AUD weekly price history into db`);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -176,7 +176,7 @@ class PriceUpdater {
 | 
			
		||||
      ++insertedCount;
 | 
			
		||||
    }
 | 
			
		||||
    if (insertedCount > 0) {
 | 
			
		||||
      logger.info(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
 | 
			
		||||
      logger.notice(`Inserted ${insertedCount} MtGox USD weekly price history into db`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Insert Kraken weekly prices
 | 
			
		||||
@ -205,23 +205,23 @@ class PriceUpdater {
 | 
			
		||||
      try {
 | 
			
		||||
        historicalPrices.push(await feed.$fetchRecentHourlyPrice(this.currencies));
 | 
			
		||||
      } catch (e) {
 | 
			
		||||
        logger.info(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
        logger.err(`Cannot fetch hourly historical price from ${feed.name}. Ignoring this feed. Reason: ${e instanceof Error ? e.message : e}`);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Group them by timestamp and currency, for example
 | 
			
		||||
    // grouped[123456789]['USD'] = [1, 2, 3, 4];
 | 
			
		||||
    let grouped: Object = {};
 | 
			
		||||
    const grouped: Object = {};
 | 
			
		||||
    for (const historicalEntry of historicalPrices) {
 | 
			
		||||
      for (const time in historicalEntry) {
 | 
			
		||||
        if (existingPriceTimes.includes(parseInt(time, 10))) {
 | 
			
		||||
          continue;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (grouped[time] == undefined) {
 | 
			
		||||
        if (grouped[time] === undefined) {
 | 
			
		||||
          grouped[time] = {
 | 
			
		||||
            USD: [], EUR: [], GBP: [], CAD: [], CHF: [], AUD: [], JPY: []
 | 
			
		||||
          }
 | 
			
		||||
          };
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (const currency of this.currencies) {
 | 
			
		||||
@ -238,13 +238,20 @@ class PriceUpdater {
 | 
			
		||||
    for (const time in grouped) {
 | 
			
		||||
      const prices: Prices = this.getEmptyPricesObj();
 | 
			
		||||
      for (const currency in grouped[time]) {
 | 
			
		||||
        prices[currency] = Math.round((grouped[time][currency].reduce((partialSum, a) => partialSum + a, 0)) / grouped[time][currency].length);
 | 
			
		||||
        if (grouped[time][currency].length === 0) {
 | 
			
		||||
          continue;
 | 
			
		||||
        }
 | 
			
		||||
        prices[currency] = Math.round((grouped[time][currency].reduce(
 | 
			
		||||
          (partialSum, a) => partialSum + a, 0)
 | 
			
		||||
        ) / grouped[time][currency].length);
 | 
			
		||||
      }
 | 
			
		||||
      await PricesRepository.$savePrices(parseInt(time, 10), prices);
 | 
			
		||||
      ++totalInserted;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    logger.info(`Inserted ${totalInserted} hourly historical prices into the db`);
 | 
			
		||||
    if (totalInserted > 0) {
 | 
			
		||||
      logger.notice(`Inserted ${totalInserted} hourly historical prices into the db`);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -50,10 +50,14 @@ export async function query(path): Promise<object | undefined> {
 | 
			
		||||
     }
 | 
			
		||||
     return data.data;
 | 
			
		||||
   } catch (e) {
 | 
			
		||||
     logger.err(`Could not connect to ${path}. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
     logger.warn(`Could not connect to ${path} (Attempt ${retry + 1}/${config.MEMPOOL.EXTERNAL_MAX_RETRY}). Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
     retry++;
 | 
			
		||||
   }
 | 
			
		||||
   await setDelay(config.MEMPOOL.EXTERNAL_RETRY_INTERVAL);
 | 
			
		||||
   if (retry < config.MEMPOOL.EXTERNAL_MAX_RETRY) {
 | 
			
		||||
     await setDelay(config.MEMPOOL.EXTERNAL_RETRY_INTERVAL);
 | 
			
		||||
   }
 | 
			
		||||
 }
 | 
			
		||||
 | 
			
		||||
 logger.err(`Could not connect to ${path}. All ${config.MEMPOOL.EXTERNAL_MAX_RETRY} attempts failed`);
 | 
			
		||||
 return undefined;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user