Improve statistics replication service
This commit is contained in:
		
							parent
							
								
									b0630de3cc
								
							
						
					
					
						commit
						60a07fb093
					
				@ -6,13 +6,13 @@ import { Common } from '../api/common';
 | 
			
		||||
import statistics from '../api/statistics/statistics-api';
 | 
			
		||||
 | 
			
		||||
interface MissingStatistics {
 | 
			
		||||
  '24h': number[];
 | 
			
		||||
  '1w': number[];
 | 
			
		||||
  '1m': number[];
 | 
			
		||||
  '3m': number[];
 | 
			
		||||
  '6m': number[];
 | 
			
		||||
  '2y': number[];
 | 
			
		||||
  'all': number[];
 | 
			
		||||
  '24h': Set<number>;
 | 
			
		||||
  '1w': Set<number>;
 | 
			
		||||
  '1m': Set<number>;
 | 
			
		||||
  '3m': Set<number>;
 | 
			
		||||
  '6m': Set<number>;
 | 
			
		||||
  '2y': Set<number>;
 | 
			
		||||
  'all': Set<number>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const steps = {
 | 
			
		||||
@ -30,7 +30,6 @@ const steps = {
 | 
			
		||||
 */
 | 
			
		||||
class StatisticsReplication {
 | 
			
		||||
  inProgress: boolean = false;
 | 
			
		||||
  skip: Set<number> = new Set();
 | 
			
		||||
 | 
			
		||||
  public async $sync(): Promise<void> {
 | 
			
		||||
    if (!config.REPLICATION.ENABLED || !config.REPLICATION.STATISTICS || !config.STATISTICS.ENABLED) {
 | 
			
		||||
@ -44,8 +43,8 @@ class StatisticsReplication {
 | 
			
		||||
    this.inProgress = true;
 | 
			
		||||
 | 
			
		||||
    const missingStatistics = await this.$getMissingStatistics();
 | 
			
		||||
    const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].length > 0);
 | 
			
		||||
    const totalMissing =  missingIntervals.reduce((total, key) => total + missingStatistics[key].length, 0);
 | 
			
		||||
    const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].size > 0);
 | 
			
		||||
    const totalMissing =  missingIntervals.reduce((total, key) => total + missingStatistics[key].size, 0);
 | 
			
		||||
 | 
			
		||||
    if (totalMissing === 0) {
 | 
			
		||||
      this.inProgress = false;
 | 
			
		||||
@ -54,7 +53,7 @@ class StatisticsReplication {
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    for (const interval of missingIntervals) {
 | 
			
		||||
      logger.debug(`Missing ${missingStatistics[interval].length} statistics rows in '${interval}' timespan`, 'Replication');
 | 
			
		||||
      logger.debug(`Missing ${missingStatistics[interval].size} statistics rows in '${interval}' timespan`, 'Replication');
 | 
			
		||||
    }
 | 
			
		||||
    logger.debug(`Fetching ${missingIntervals.join(', ')} statistics endpoints from trusted servers to fill ${totalMissing} rows missing in statistics`, 'Replication');
 | 
			
		||||
    
 | 
			
		||||
@ -75,31 +74,35 @@ class StatisticsReplication {
 | 
			
		||||
    this.inProgress = false;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $syncStatistics(interval: string, missingTimes: number[]): Promise<any> {
 | 
			
		||||
  private async $syncStatistics(interval: string, missingTimes: Set<number>): Promise<any> {
 | 
			
		||||
  
 | 
			
		||||
    let success = false;
 | 
			
		||||
    let synced = 0;
 | 
			
		||||
    let missed = 0;
 | 
			
		||||
    let missed = new Set(missingTimes);
 | 
			
		||||
    const syncResult = await $sync(`/api/v1/statistics/${interval}`);
 | 
			
		||||
    if (syncResult) {
 | 
			
		||||
      if (syncResult.data?.length) {
 | 
			
		||||
    if (syncResult && syncResult.data?.length) {
 | 
			
		||||
      success = true;
 | 
			
		||||
      logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`);     
 | 
			
		||||
        const fetchedTimes = syncResult.data.map((stat: any) => stat.added);
 | 
			
		||||
        for (const time of missingTimes) {
 | 
			
		||||
          const closest = syncResult.data[this.getClosestElement(time, fetchedTimes).index];
 | 
			
		||||
          if (Math.abs(time - closest.added) < steps[interval]) {
 | 
			
		||||
            await statistics.$create(statistics.mapOptimizedStatisticToStatistic([closest])[0], true);
 | 
			
		||||
    
 | 
			
		||||
      for (const stat of syncResult.data) {
 | 
			
		||||
        const time = this.roundToNearestStep(stat.added, steps[interval]);
 | 
			
		||||
        if (missingTimes.has(time)) {
 | 
			
		||||
          try {
 | 
			
		||||
            await statistics.$create(statistics.mapOptimizedStatisticToStatistic([stat])[0], true);
 | 
			
		||||
            if (missed.delete(time)) {
 | 
			
		||||
              synced++;
 | 
			
		||||
          } else {
 | 
			
		||||
            missed++;
 | 
			
		||||
            this.skip.add(time);
 | 
			
		||||
            }
 | 
			
		||||
          } catch (e: any) {
 | 
			
		||||
            logger.err(`Failed to insert statistics row at ${stat.added} (${interval}) from ${syncResult.server}. Reason: ` + (e instanceof Error ? e.message : e));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
    return { success, synced, missed };
 | 
			
		||||
    } else {
 | 
			
		||||
      logger.warn(`An error occured when trying to fetch /api/v1/statistics/${interval}`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return { success, synced, missed: missed.size };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $getMissingStatistics(): Promise<MissingStatistics> {
 | 
			
		||||
@ -109,7 +112,15 @@ class StatisticsReplication {
 | 
			
		||||
 | 
			
		||||
      const startTime = this.getStartTimeFromConfig();
 | 
			
		||||
 | 
			
		||||
      const missingStatistics: MissingStatistics = { '24h': [], '1w': [], '1m': [], '3m': [], '6m': [], '2y': [], 'all': [] };
 | 
			
		||||
      const missingStatistics: MissingStatistics = {
 | 
			
		||||
        '24h': new Set<number>(),
 | 
			
		||||
        '1w': new Set<number>(),
 | 
			
		||||
        '1m': new Set<number>(),
 | 
			
		||||
        '3m': new Set<number>(),
 | 
			
		||||
        '6m': new Set<number>(),
 | 
			
		||||
        '2y': new Set<number>(),
 | 
			
		||||
        'all': new Set<number>()
 | 
			
		||||
      };
 | 
			
		||||
 | 
			
		||||
      const intervals = [              // [start,               end,                 label ]
 | 
			
		||||
                                          [now - day,           now - 60,            '24h']       , // from 24 hours ago to now = 1 minute granularity
 | 
			
		||||
@ -135,7 +146,7 @@ class StatisticsReplication {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise<number[]> {
 | 
			
		||||
  private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise<Set<number>> {
 | 
			
		||||
    try {
 | 
			
		||||
      const start = interval[0];
 | 
			
		||||
      const end = interval[1];
 | 
			
		||||
@ -145,23 +156,26 @@ class StatisticsReplication {
 | 
			
		||||
        SELECT UNIX_TIMESTAMP(added) as added
 | 
			
		||||
        FROM statistics
 | 
			
		||||
        WHERE added >= FROM_UNIXTIME(?) AND added <= FROM_UNIXTIME(?)
 | 
			
		||||
        GROUP BY UNIX_TIMESTAMP(added) DIV ${step} ORDER BY statistics.added DESC
 | 
			
		||||
      `, [start, end]);
 | 
			
		||||
      const timesAlreadyHere = rows.map(row => row.added);
 | 
			
		||||
 | 
			
		||||
      const missingTimes: number[] = [];
 | 
			
		||||
      const startingTime = Math.max(startTime, start) - Math.max(startTime, start) % step;
 | 
			
		||||
 | 
			
		||||
      if (timesAlreadyHere.length === 0) {
 | 
			
		||||
        for (let time = Math.max(startTime, start); time < end; time += step) {
 | 
			
		||||
            missingTimes.push(time);
 | 
			
		||||
        }
 | 
			
		||||
        return missingTimes;
 | 
			
		||||
      const timeSteps: number[] = [];
 | 
			
		||||
      for (let time = startingTime; time < end; time += step) {
 | 
			
		||||
        timeSteps.push(time);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      for (let time = Math.max(startTime, start); time < end; time += step) {
 | 
			
		||||
        const closest = this.getClosestElement(time, timesAlreadyHere);
 | 
			
		||||
        if (Math.abs(time - closest.value) > step && !this.skip.has(time)) { 
 | 
			
		||||
          missingTimes.push(time);
 | 
			
		||||
      if (timeSteps.length === 0) {
 | 
			
		||||
        return new Set<number>();
 | 
			
		||||
      }
 | 
			
		||||
      
 | 
			
		||||
      const roundedTimesAlreadyHere = new Set(rows.map(row => this.roundToNearestStep(row.added, step)));
 | 
			
		||||
      const missingTimes = new Set(timeSteps.filter(time => !roundedTimesAlreadyHere.has(time)));
 | 
			
		||||
 | 
			
		||||
      // Don't bother fetching if very few rows are missing
 | 
			
		||||
      if (missingTimes.size < timeSteps.length * 0.005) {
 | 
			
		||||
        return new Set();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      return missingTimes;
 | 
			
		||||
@ -171,17 +185,14 @@ class StatisticsReplication {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getClosestElement(element: number, list: number[]): {index: number, value: number} {
 | 
			
		||||
    let closest = list[0];
 | 
			
		||||
    let index = 0;
 | 
			
		||||
    for (let i = 0; i < list.length; i++) {
 | 
			
		||||
      if (Math.abs(list[i] - element) < Math.abs(closest - element)) {
 | 
			
		||||
        closest = list[i];
 | 
			
		||||
        index = i;
 | 
			
		||||
  private roundToNearestStep(time: number, step: number): number {
 | 
			
		||||
    const remainder = time % step;
 | 
			
		||||
    if (remainder < step / 2) {
 | 
			
		||||
      return time - remainder;
 | 
			
		||||
    } else {
 | 
			
		||||
      return time + (step - remainder);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
    return { index, value: closest };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getStartTimeFromConfig(): number {
 | 
			
		||||
    const now = Math.floor(Date.now() / 1000);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user