From 60a07fb093a36eec9c93ba9d6dd9ee6282408d33 Mon Sep 17 00:00:00 2001 From: natsoni Date: Tue, 21 May 2024 11:33:37 +0200 Subject: [PATCH] Improve statistics replication service --- .../src/replication/StatisticsReplication.ts | 111 ++++++++++-------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/backend/src/replication/StatisticsReplication.ts b/backend/src/replication/StatisticsReplication.ts index 905e7a15e..f3ebb5a97 100644 --- a/backend/src/replication/StatisticsReplication.ts +++ b/backend/src/replication/StatisticsReplication.ts @@ -6,13 +6,13 @@ import { Common } from '../api/common'; import statistics from '../api/statistics/statistics-api'; interface MissingStatistics { - '24h': number[]; - '1w': number[]; - '1m': number[]; - '3m': number[]; - '6m': number[]; - '2y': number[]; - 'all': number[]; + '24h': Set; + '1w': Set; + '1m': Set; + '3m': Set; + '6m': Set; + '2y': Set; + 'all': Set; } const steps = { @@ -30,7 +30,6 @@ const steps = { */ class StatisticsReplication { inProgress: boolean = false; - skip: Set = new Set(); public async $sync(): Promise { if (!config.REPLICATION.ENABLED || !config.REPLICATION.STATISTICS || !config.STATISTICS.ENABLED) { @@ -44,8 +43,8 @@ class StatisticsReplication { this.inProgress = true; const missingStatistics = await this.$getMissingStatistics(); - const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].length > 0); - const totalMissing = missingIntervals.reduce((total, key) => total + missingStatistics[key].length, 0); + const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].size > 0); + const totalMissing = missingIntervals.reduce((total, key) => total + missingStatistics[key].size, 0); if (totalMissing === 0) { this.inProgress = false; @@ -54,7 +53,7 @@ class StatisticsReplication { } for (const interval of missingIntervals) { - logger.debug(`Missing ${missingStatistics[interval].length} statistics rows in '${interval}' timespan`, 'Replication'); + logger.debug(`Missing ${missingStatistics[interval].size} statistics rows in '${interval}' timespan`, 'Replication'); } logger.debug(`Fetching ${missingIntervals.join(', ')} statistics endpoints from trusted servers to fill ${totalMissing} rows missing in statistics`, 'Replication'); @@ -75,31 +74,35 @@ class StatisticsReplication { this.inProgress = false; } - private async $syncStatistics(interval: string, missingTimes: number[]): Promise { + private async $syncStatistics(interval: string, missingTimes: Set): Promise { let success = false; let synced = 0; - let missed = 0; + let missed = new Set(missingTimes); const syncResult = await $sync(`/api/v1/statistics/${interval}`); - if (syncResult) { - if (syncResult.data?.length) { - success = true; - logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`); - const fetchedTimes = syncResult.data.map((stat: any) => stat.added); - for (const time of missingTimes) { - const closest = syncResult.data[this.getClosestElement(time, fetchedTimes).index]; - if (Math.abs(time - closest.added) < steps[interval]) { - await statistics.$create(statistics.mapOptimizedStatisticToStatistic([closest])[0], true); - synced++; - } else { - missed++; - this.skip.add(time); + if (syncResult && syncResult.data?.length) { + success = true; + logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`); + + for (const stat of syncResult.data) { + const time = this.roundToNearestStep(stat.added, steps[interval]); + if (missingTimes.has(time)) { + try { + await statistics.$create(statistics.mapOptimizedStatisticToStatistic([stat])[0], true); + if (missed.delete(time)) { + synced++; + } + } catch (e: any) { + logger.err(`Failed to insert statistics row at ${stat.added} (${interval}) from ${syncResult.server}. Reason: ` + (e instanceof Error ? e.message : e)); } } } + + } else { + logger.warn(`An error occured when trying to fetch /api/v1/statistics/${interval}`); } - return { success, synced, missed }; + return { success, synced, missed: missed.size }; } private async $getMissingStatistics(): Promise { @@ -109,7 +112,15 @@ class StatisticsReplication { const startTime = this.getStartTimeFromConfig(); - const missingStatistics: MissingStatistics = { '24h': [], '1w': [], '1m': [], '3m': [], '6m': [], '2y': [], 'all': [] }; + const missingStatistics: MissingStatistics = { + '24h': new Set(), + '1w': new Set(), + '1m': new Set(), + '3m': new Set(), + '6m': new Set(), + '2y': new Set(), + 'all': new Set() + }; const intervals = [ // [start, end, label ] [now - day, now - 60, '24h'] , // from 24 hours ago to now = 1 minute granularity @@ -135,7 +146,7 @@ class StatisticsReplication { } } - private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise { + private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise> { try { const start = interval[0]; const end = interval[1]; @@ -145,23 +156,26 @@ class StatisticsReplication { SELECT UNIX_TIMESTAMP(added) as added FROM statistics WHERE added >= FROM_UNIXTIME(?) AND added <= FROM_UNIXTIME(?) + GROUP BY UNIX_TIMESTAMP(added) DIV ${step} ORDER BY statistics.added DESC `, [start, end]); - const timesAlreadyHere = rows.map(row => row.added); - const missingTimes: number[] = []; + const startingTime = Math.max(startTime, start) - Math.max(startTime, start) % step; - if (timesAlreadyHere.length === 0) { - for (let time = Math.max(startTime, start); time < end; time += step) { - missingTimes.push(time); - } - return missingTimes; + const timeSteps: number[] = []; + for (let time = startingTime; time < end; time += step) { + timeSteps.push(time); } - for (let time = Math.max(startTime, start); time < end; time += step) { - const closest = this.getClosestElement(time, timesAlreadyHere); - if (Math.abs(time - closest.value) > step && !this.skip.has(time)) { - missingTimes.push(time); - } + if (timeSteps.length === 0) { + return new Set(); + } + + const roundedTimesAlreadyHere = new Set(rows.map(row => this.roundToNearestStep(row.added, step))); + const missingTimes = new Set(timeSteps.filter(time => !roundedTimesAlreadyHere.has(time))); + + // Don't bother fetching if very few rows are missing + if (missingTimes.size < timeSteps.length * 0.005) { + return new Set(); } return missingTimes; @@ -171,16 +185,13 @@ class StatisticsReplication { } } - private getClosestElement(element: number, list: number[]): {index: number, value: number} { - let closest = list[0]; - let index = 0; - for (let i = 0; i < list.length; i++) { - if (Math.abs(list[i] - element) < Math.abs(closest - element)) { - closest = list[i]; - index = i; - } + private roundToNearestStep(time: number, step: number): number { + const remainder = time % step; + if (remainder < step / 2) { + return time - remainder; + } else { + return time + (step - remainder); } - return { index, value: closest }; } private getStartTimeFromConfig(): number {