Add statistics to replication service
This commit is contained in:
		
							parent
							
								
									d92bf14b50
								
							
						
					
					
						commit
						f85f3a4eb5
					
				| @ -139,6 +139,8 @@ | ||||
|     "ENABLED": false, | ||||
|     "AUDIT": false, | ||||
|     "AUDIT_START_HEIGHT": 774000, | ||||
|     "STATISTICS": false, | ||||
|     "STATISTICS_START_TIME": 1481932800, | ||||
|     "SERVERS": [ | ||||
|       "list", | ||||
|       "of", | ||||
|  | ||||
| @ -131,6 +131,8 @@ | ||||
|     "ENABLED": false, | ||||
|     "AUDIT": false, | ||||
|     "AUDIT_START_HEIGHT": 774000, | ||||
|     "STATISTICS": false, | ||||
|     "STATISTICS_START_TIME": 1481932800, | ||||
|     "SERVERS": [] | ||||
|   }, | ||||
|   "MEMPOOL_SERVICES": { | ||||
|  | ||||
| @ -135,6 +135,8 @@ describe('Mempool Backend Config', () => { | ||||
|         ENABLED: false, | ||||
|         AUDIT: false, | ||||
|         AUDIT_START_HEIGHT: 774000, | ||||
|         STATISTICS: false, | ||||
|         STATISTICS_START_TIME: 1481932800, | ||||
|         SERVERS: [] | ||||
|       }); | ||||
| 
 | ||||
|  | ||||
| @ -64,7 +64,7 @@ class StatisticsApi { | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   public async $create(statistics: Statistic): Promise<number | undefined> { | ||||
|   public async $create(statistics: Statistic, convertToDatetime = false): Promise<number | undefined> { | ||||
|     try { | ||||
|       const query = `INSERT INTO statistics(
 | ||||
|               added, | ||||
| @ -114,7 +114,7 @@ class StatisticsApi { | ||||
|               vsize_1800, | ||||
|               vsize_2000 | ||||
|             ) | ||||
|             VALUES (${statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, | ||||
|             VALUES (${convertToDatetime ? `FROM_UNIXTIME(${statistics.added})` : statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, | ||||
|                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`;
 | ||||
| 
 | ||||
|       const params: (string | number)[] = [ | ||||
| @ -456,6 +456,59 @@ class StatisticsApi { | ||||
|       }; | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   public mapOptimizedStatisticToStatistic(statistic: OptimizedStatistic[]): Statistic[] { | ||||
|     return statistic.map((s) => { | ||||
|       return { | ||||
|         added: s.added, | ||||
|         unconfirmed_transactions: s.count, | ||||
|         tx_per_second: 0, | ||||
|         vbytes_per_second: s.vbytes_per_second, | ||||
|         mempool_byte_weight: s.mempool_byte_weight || 0, | ||||
|         total_fee: s.total_fee || 0, | ||||
|         min_fee: s.min_fee, | ||||
|         fee_data: '', | ||||
|         vsize_1: s.vsizes[0], | ||||
|         vsize_2: s.vsizes[1], | ||||
|         vsize_3: s.vsizes[2], | ||||
|         vsize_4: s.vsizes[3], | ||||
|         vsize_5: s.vsizes[4], | ||||
|         vsize_6: s.vsizes[5], | ||||
|         vsize_8: s.vsizes[6], | ||||
|         vsize_10: s.vsizes[7], | ||||
|         vsize_12: s.vsizes[8], | ||||
|         vsize_15: s.vsizes[9], | ||||
|         vsize_20: s.vsizes[10], | ||||
|         vsize_30: s.vsizes[11], | ||||
|         vsize_40: s.vsizes[12], | ||||
|         vsize_50: s.vsizes[13], | ||||
|         vsize_60: s.vsizes[14], | ||||
|         vsize_70: s.vsizes[15], | ||||
|         vsize_80: s.vsizes[16], | ||||
|         vsize_90: s.vsizes[17], | ||||
|         vsize_100: s.vsizes[18], | ||||
|         vsize_125: s.vsizes[19], | ||||
|         vsize_150: s.vsizes[20], | ||||
|         vsize_175: s.vsizes[21], | ||||
|         vsize_200: s.vsizes[22], | ||||
|         vsize_250: s.vsizes[23], | ||||
|         vsize_300: s.vsizes[24], | ||||
|         vsize_350: s.vsizes[25], | ||||
|         vsize_400: s.vsizes[26], | ||||
|         vsize_500: s.vsizes[27], | ||||
|         vsize_600: s.vsizes[28], | ||||
|         vsize_700: s.vsizes[29], | ||||
|         vsize_800: s.vsizes[30], | ||||
|         vsize_900: s.vsizes[31], | ||||
|         vsize_1000: s.vsizes[32], | ||||
|         vsize_1200: s.vsizes[33], | ||||
|         vsize_1400: s.vsizes[34], | ||||
|         vsize_1600: s.vsizes[35], | ||||
|         vsize_1800: s.vsizes[36], | ||||
|         vsize_2000: s.vsizes[37], | ||||
|       } | ||||
|     }); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| export default new StatisticsApi(); | ||||
|  | ||||
| @ -141,6 +141,8 @@ interface IConfig { | ||||
|     ENABLED: boolean; | ||||
|     AUDIT: boolean; | ||||
|     AUDIT_START_HEIGHT: number; | ||||
|     STATISTICS: boolean; | ||||
|     STATISTICS_START_TIME: number | string; | ||||
|     SERVERS: string[]; | ||||
|   }, | ||||
|   MEMPOOL_SERVICES: { | ||||
| @ -298,6 +300,8 @@ const defaults: IConfig = { | ||||
|     'ENABLED': false, | ||||
|     'AUDIT': false, | ||||
|     'AUDIT_START_HEIGHT': 774000, | ||||
|     'STATISTICS': false, | ||||
|     'STATISTICS_START_TIME': 1481932800, | ||||
|     'SERVERS': [], | ||||
|   }, | ||||
|   'MEMPOOL_SERVICES': { | ||||
|  | ||||
| @ -8,6 +8,7 @@ import priceUpdater from './tasks/price-updater'; | ||||
| import PricesRepository from './repositories/PricesRepository'; | ||||
| import config from './config'; | ||||
| import auditReplicator from './replication/AuditReplication'; | ||||
| import statisticsReplicator from './replication/StatisticsReplication'; | ||||
| import AccelerationRepository from './repositories/AccelerationRepository'; | ||||
| 
 | ||||
| export interface CoreIndex { | ||||
| @ -188,6 +189,7 @@ class Indexer { | ||||
|       await blocks.$generateCPFPDatabase(); | ||||
|       await blocks.$generateAuditStats(); | ||||
|       await auditReplicator.$sync(); | ||||
|       await statisticsReplicator.$sync(); | ||||
|       await AccelerationRepository.$indexPastAccelerations(); | ||||
|       // do not wait for classify blocks to finish
 | ||||
|       blocks.$classifyBlocks(); | ||||
|  | ||||
| @ -422,6 +422,7 @@ export interface Statistic { | ||||
| 
 | ||||
| export interface OptimizedStatistic { | ||||
|   added: string; | ||||
|   count: number; | ||||
|   vbytes_per_second: number; | ||||
|   total_fee: number; | ||||
|   mempool_byte_weight: number; | ||||
|  | ||||
							
								
								
									
										217
									
								
								backend/src/replication/StatisticsReplication.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										217
									
								
								backend/src/replication/StatisticsReplication.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,217 @@ | ||||
| import DB from '../database'; | ||||
| import logger from '../logger'; | ||||
| import { $sync } from './replicator'; | ||||
| import config from '../config'; | ||||
| import { Common } from '../api/common'; | ||||
| import statistics from '../api/statistics/statistics-api'; | ||||
| 
 | ||||
| interface MissingStatistics { | ||||
|   '24h': number[]; | ||||
|   '1w': number[]; | ||||
|   '1m': number[]; | ||||
|   '3m': number[]; | ||||
|   '6m': number[]; | ||||
|   '2y': number[]; | ||||
|   'all': number[]; | ||||
| } | ||||
| 
 | ||||
| const steps = { | ||||
|   '24h': 60, | ||||
|   '1w': 300, | ||||
|   '1m': 1800, | ||||
|   '3m': 7200, | ||||
|   '6m': 10800, | ||||
|   '2y': 28800, | ||||
|   'all': 43200, | ||||
| }; | ||||
| 
 | ||||
| /** | ||||
|  * Syncs missing statistics data from trusted servers | ||||
|  */ | ||||
| class StatisticsReplication { | ||||
|   inProgress: boolean = false; | ||||
|   skip: Set<number> = new Set(); | ||||
| 
 | ||||
|   public async $sync(): Promise<void> { | ||||
|     if (!config.REPLICATION.ENABLED || !config.REPLICATION.STATISTICS || !config.STATISTICS.ENABLED) { | ||||
|       // replication not enabled, or statistics not enabled
 | ||||
|       return; | ||||
|     } | ||||
|     if (this.inProgress) { | ||||
|       logger.info(`StatisticsReplication sync already in progress`, 'Replication'); | ||||
|       return; | ||||
|     } | ||||
|     this.inProgress = true; | ||||
| 
 | ||||
|     const missingStatistics = await this.$getMissingStatistics(); | ||||
|     const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].length > 0); | ||||
|     const totalMissing =  missingIntervals.reduce((total, key) => total + missingStatistics[key].length, 0); | ||||
| 
 | ||||
|     if (totalMissing === 0) { | ||||
|       this.inProgress = false; | ||||
|       logger.info(`Statistics table is complete, no replication needed`, 'Replication'); | ||||
|       return; | ||||
|     } | ||||
|      | ||||
|     for (const interval of missingIntervals) { | ||||
|       logger.debug(`Missing ${missingStatistics[interval].length} statistics rows in '${interval}' timespan`, 'Replication'); | ||||
|     } | ||||
|     logger.debug(`Fetching ${missingIntervals.join(', ')} statistics endpoints from trusted servers to fill ${totalMissing} rows missing in statistics`, 'Replication'); | ||||
|      | ||||
|     let totalSynced = 0; | ||||
|     let totalMissed = 0; | ||||
| 
 | ||||
|     for (const interval of missingIntervals) { | ||||
|       const results = await this.$syncStatistics(interval, missingStatistics[interval]); | ||||
|       totalSynced += results.synced; | ||||
|       totalMissed += results.missed; | ||||
| 
 | ||||
|       logger.info(`Found ${totalSynced} / ${totalSynced + totalMissed} of ${totalMissing} missing statistics rows`, 'Replication'); | ||||
|       await Common.sleep$(3000); | ||||
|     } | ||||
| 
 | ||||
|     logger.debug(`Synced ${totalSynced} statistics rows, ${totalMissed} still missing`, 'Replication'); | ||||
| 
 | ||||
|     this.inProgress = false; | ||||
|   } | ||||
| 
 | ||||
|   private async $syncStatistics(interval: string, missingTimes: number[]): Promise<any> { | ||||
|    | ||||
|     let success = false; | ||||
|     let synced = 0; | ||||
|     let missed = 0; | ||||
|     const syncResult = await $sync(`/api/v1/statistics/${interval}`); | ||||
|     if (syncResult) { | ||||
|       if (syncResult.data?.length) { | ||||
|         success = true; | ||||
|         logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`); | ||||
|         const fetchedTimes = syncResult.data.map((stat: any) => stat.added); | ||||
|         for (const time of missingTimes) { | ||||
|           const closest = syncResult.data[this.getClosestElement(time, fetchedTimes).index]; | ||||
|           if (Math.abs(time - closest.added) < steps[interval]) { | ||||
|             await statistics.$create(statistics.mapOptimizedStatisticToStatistic([closest])[0], true); | ||||
|             synced++; | ||||
|           } else { | ||||
|             missed++; | ||||
|             this.skip.add(time); | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|     return { success, synced, missed }; | ||||
|   } | ||||
| 
 | ||||
|   private async $getMissingStatistics(): Promise<MissingStatistics> { | ||||
|     try { | ||||
|       const now = Math.floor(Date.now() / 1000); | ||||
|       const day = 60 * 60 * 24; | ||||
| 
 | ||||
|       const startTime = this.getStartTimeFromConfig(); | ||||
| 
 | ||||
|       const missingStatistics: MissingStatistics = { '24h': [], '1w': [], '1m': [], '3m': [], '6m': [], '2y': [], 'all': [] }; | ||||
| 
 | ||||
|       const intervals = [              // [start,               end,                 label ]
 | ||||
|                                           [now - day,           now - 60,            '24h']       , // from 24 hours ago to now = 1 minute granularity
 | ||||
|         startTime < now - day ?           [now - day * 7,       now - day,           '1w' ] : null, // from 1 week ago to 24 hours ago = 5 minutes granularity
 | ||||
|         startTime < now - day * 7 ?       [now - day * 30,      now - day * 7,       '1m' ] : null, // from 1 month ago to 1 week ago = 30 minutes granularity
 | ||||
|         startTime < now - day * 30 ?      [now - day * 90,      now - day * 30,      '3m' ] : null, // from 3 months ago to 1 month ago = 2 hours granularity
 | ||||
|         startTime < now - day * 90 ?      [now - day * 180,     now - day * 90,      '6m' ] : null, // from 6 months ago to 3 months ago = 3 hours granularity
 | ||||
|         startTime < now - day * 180 ?     [now - day * 365 * 2, now - day * 180,     '2y' ] : null, // from 2 years ago to 6 months ago = 8 hours granularity
 | ||||
|         startTime < now - day * 365 * 2 ? [startTime,           now - day * 365 * 2, 'all'] : null, // from start of statistics to 2 years ago = 12 hours granularity   
 | ||||
|       ]; | ||||
| 
 | ||||
|       for (const interval of intervals) { | ||||
|         if (!interval) { | ||||
|           continue; | ||||
|         } | ||||
|         missingStatistics[interval[2] as string] = await this.$getMissingStatisticsInterval(interval, startTime); | ||||
|       } | ||||
|        | ||||
|       return missingStatistics; | ||||
|     } catch (e: any) { | ||||
|       logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e)); | ||||
|       throw e; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise<number[]> { | ||||
|     try { | ||||
|       const start = interval[0]; | ||||
|       const end = interval[1]; | ||||
|       const step = steps[interval[2]]; | ||||
| 
 | ||||
|       const [rows]: any[] = await DB.query(` | ||||
|         SELECT UNIX_TIMESTAMP(added) as added | ||||
|         FROM statistics | ||||
|         WHERE added >= FROM_UNIXTIME(?) AND added <= FROM_UNIXTIME(?) | ||||
|       `, [start, end]);
 | ||||
|       const timesAlreadyHere = rows.map(row => row.added); | ||||
| 
 | ||||
|       const missingTimes: number[] = []; | ||||
| 
 | ||||
|       if (timesAlreadyHere.length === 0) { | ||||
|         for (let time = Math.max(startTime, start); time < end; time += step) { | ||||
|             missingTimes.push(time); | ||||
|         } | ||||
|         return missingTimes; | ||||
|       } | ||||
| 
 | ||||
|       for (let time = Math.max(startTime, start); time < end; time += step) { | ||||
|         const closest = this.getClosestElement(time, timesAlreadyHere); | ||||
|         if (Math.abs(time - closest.value) > step && !this.skip.has(time)) {  | ||||
|           missingTimes.push(time); | ||||
|         } | ||||
|       } | ||||
| 
 | ||||
|       return missingTimes; | ||||
|     } catch (e: any) { | ||||
|       logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e)); | ||||
|       throw e; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private getClosestElement(element: number, list: number[]): {index: number, value: number} { | ||||
|     let closest = list[0]; | ||||
|     let index = 0; | ||||
|     for (let i = 0; i < list.length; i++) { | ||||
|       if (Math.abs(list[i] - element) < Math.abs(closest - element)) { | ||||
|         closest = list[i]; | ||||
|         index = i; | ||||
|       } | ||||
|     } | ||||
|     return { index, value: closest }; | ||||
|   } | ||||
| 
 | ||||
|   private getStartTimeFromConfig(): number { | ||||
|     const now = Math.floor(Date.now() / 1000); | ||||
|     const day = 60 * 60 * 24; | ||||
| 
 | ||||
|     let startTime: number; | ||||
|     if (typeof(config.REPLICATION.STATISTICS_START_TIME) === 'string' && ['24h', '1w', '1m', '3m', '6m', '2y', 'all'].includes(config.REPLICATION.STATISTICS_START_TIME)) { | ||||
|       if (config.REPLICATION.STATISTICS_START_TIME === 'all') { | ||||
|         startTime = 1481932800; | ||||
|       } else if (config.REPLICATION.STATISTICS_START_TIME === '2y') { | ||||
|         startTime = now - day * 365 * 2; | ||||
|       } else if (config.REPLICATION.STATISTICS_START_TIME === '6m') { | ||||
|         startTime = now - day * 180; | ||||
|       } else if (config.REPLICATION.STATISTICS_START_TIME === '3m') { | ||||
|         startTime = now - day * 90; | ||||
|       } else if (config.REPLICATION.STATISTICS_START_TIME === '1m') { | ||||
|         startTime = now - day * 30; | ||||
|       } else if (config.REPLICATION.STATISTICS_START_TIME === '1w') { | ||||
|         startTime = now - day * 7; | ||||
|       } else { | ||||
|         startTime = now - day; | ||||
|       } | ||||
|     } else { | ||||
|       startTime = Math.max(config.REPLICATION.STATISTICS_START_TIME as number || 1481932800, 1481932800); | ||||
|     } | ||||
| 
 | ||||
|     return startTime; | ||||
|   } | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| export default new StatisticsReplication(); | ||||
| 
 | ||||
| @ -137,6 +137,8 @@ | ||||
|     "ENABLED": __REPLICATION_ENABLED__, | ||||
|     "AUDIT": __REPLICATION_AUDIT__, | ||||
|     "AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__, | ||||
|     "STATISTICS": __REPLICATION_STATISTICS__, | ||||
|     "STATISTICS_START_TIME": __REPLICATION_STATISTICS_START_TIME__, | ||||
|     "SERVERS": __REPLICATION_SERVERS__ | ||||
|   }, | ||||
|   "MEMPOOL_SERVICES": { | ||||
|  | ||||
| @ -138,6 +138,8 @@ __MAXMIND_GEOIP2_ISP__=${MAXMIND_GEOIP2_ISP:=""} | ||||
| __REPLICATION_ENABLED__=${REPLICATION_ENABLED:=false} | ||||
| __REPLICATION_AUDIT__=${REPLICATION_AUDIT:=false} | ||||
| __REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000} | ||||
| __REPLICATION_STATISTICS__=${REPLICATION_STATISTICS:=false} | ||||
| __REPLICATION_STATISTICS_START_TIME__=${REPLICATION_STATISTICS_START_TIME:=1481932800} | ||||
| __REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]} | ||||
| 
 | ||||
| # MEMPOOL_SERVICES | ||||
| @ -284,6 +286,8 @@ sed -i "s!__MAXMIND_GEOIP2_ISP__!${__MAXMIND_GEOIP2_ISP__}!g" mempool-config.jso | ||||
| sed -i "s!__REPLICATION_ENABLED__!${__REPLICATION_ENABLED__}!g" mempool-config.json | ||||
| sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json | ||||
| sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json | ||||
| sed -i "s!__REPLICATION_STATISTICS__!${__REPLICATION_STATISTICS__}!g" mempool-config.json | ||||
| sed -i "s!__REPLICATION_STATISTICS_START_TIME__!${__REPLICATION_STATISTICS_START_TIME__}!g" mempool-config.json | ||||
| sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json | ||||
| 
 | ||||
| # MEMPOOL_SERVICES | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user