From 69e6b164b98d4f87ecf10bb7f367027be35a7961 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 20 Jun 2023 16:21:54 -0400 Subject: [PATCH] Add audit data replication service --- backend/mempool-config.sample.json | 11 ++ .../__fixtures__/mempool-config.template.json | 6 + backend/src/__tests__/config.test.ts | 7 + backend/src/config.ts | 14 ++ backend/src/indexer.ts | 2 + backend/src/mempool.interfaces.ts | 9 ++ backend/src/replication/AuditReplication.ts | 123 ++++++++++++++++++ backend/src/replication/replicator.ts | 70 ++++++++++ production/mempool-config.mainnet.json | 25 ++++ 9 files changed, 267 insertions(+) create mode 100644 backend/src/replication/AuditReplication.ts create mode 100644 backend/src/replication/replicator.ts diff --git a/backend/mempool-config.sample.json b/backend/mempool-config.sample.json index c0a2d9d62..e3df7d2fe 100644 --- a/backend/mempool-config.sample.json +++ b/backend/mempool-config.sample.json @@ -125,5 +125,16 @@ "LIQUID_ONION": "http://liquidmom47f6s3m53ebfxn47p76a6tlnxib3wp6deux7wuzotdr6cyd.onion/api/v1", "BISQ_URL": "https://bisq.markets/api", "BISQ_ONION": "http://bisqmktse2cabavbr2xjq7xw3h6g5ottemo5rolfcwt6aly6tp5fdryd.onion/api" + }, + "REPLICATION": { + "ENABLED": false, + "AUDIT": false, + "AUDIT_START_HEIGHT": 774000, + "SERVERS": [ + "list", + "of", + "trusted", + "servers" + ] } } diff --git a/backend/src/__fixtures__/mempool-config.template.json b/backend/src/__fixtures__/mempool-config.template.json index 776f01de1..4213f0ffb 100644 --- a/backend/src/__fixtures__/mempool-config.template.json +++ b/backend/src/__fixtures__/mempool-config.template.json @@ -121,5 +121,11 @@ }, "CLIGHTNING": { "SOCKET": "__CLIGHTNING_SOCKET__" + }, + "REPLICATION": { + "ENABLED": false, + "AUDIT": false, + "AUDIT_START_HEIGHT": 774000, + "SERVERS": [] } } diff --git a/backend/src/__tests__/config.test.ts b/backend/src/__tests__/config.test.ts index fdd8a02de..dc1beaa46 100644 --- a/backend/src/__tests__/config.test.ts +++ b/backend/src/__tests__/config.test.ts @@ -120,6 +120,13 @@ describe('Mempool Backend Config', () => { GEOLITE2_ASN: '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb', GEOIP2_ISP: '/usr/local/share/GeoIP/GeoIP2-ISP.mmdb' }); + + expect(config.REPLICATION).toStrictEqual({ + ENABLED: false, + AUDIT: false, + AUDIT_START_HEIGHT: 774000, + SERVERS: [] + }); }); }); diff --git a/backend/src/config.ts b/backend/src/config.ts index 40b407a57..09d279537 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -132,6 +132,12 @@ interface IConfig { GEOLITE2_ASN: string; GEOIP2_ISP: string; }, + REPLICATION: { + ENABLED: boolean; + AUDIT: boolean; + AUDIT_START_HEIGHT: number; + SERVERS: string[]; + } } const defaults: IConfig = { @@ -264,6 +270,12 @@ const defaults: IConfig = { 'GEOLITE2_ASN': '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb', 'GEOIP2_ISP': '/usr/local/share/GeoIP/GeoIP2-ISP.mmdb' }, + 'REPLICATION': { + 'ENABLED': false, + 'AUDIT': false, + 'AUDIT_START_HEIGHT': 774000, + 'SERVERS': [], + } }; class Config implements IConfig { @@ -283,6 +295,7 @@ class Config implements IConfig { PRICE_DATA_SERVER: IConfig['PRICE_DATA_SERVER']; EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER']; MAXMIND: IConfig['MAXMIND']; + REPLICATION: IConfig['REPLICATION']; constructor() { const configs = this.merge(configFromFile, defaults); @@ -302,6 +315,7 @@ class Config implements IConfig { this.PRICE_DATA_SERVER = configs.PRICE_DATA_SERVER; this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER; this.MAXMIND = configs.MAXMIND; + this.REPLICATION = configs.REPLICATION; } merge = (...objects: object[]): IConfig => { diff --git a/backend/src/indexer.ts b/backend/src/indexer.ts index 88f44d587..d89a2647f 100644 --- a/backend/src/indexer.ts +++ b/backend/src/indexer.ts @@ -7,6 +7,7 @@ import bitcoinClient from './api/bitcoin/bitcoin-client'; import priceUpdater from './tasks/price-updater'; import PricesRepository from './repositories/PricesRepository'; import config from './config'; +import auditReplicator from './replication/AuditReplication'; export interface CoreIndex { name: string; @@ -136,6 +137,7 @@ class Indexer { await blocks.$generateBlocksSummariesDatabase(); await blocks.$generateCPFPDatabase(); await blocks.$generateAuditStats(); + await auditReplicator.$sync(); } catch (e) { this.indexerRunning = false; logger.err(`Indexer failed, trying again in 10 seconds. Reason: ` + (e instanceof Error ? e.message : e)); diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index a051eea4f..1971234f8 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -236,6 +236,15 @@ export interface BlockSummary { transactions: TransactionStripped[]; } +export interface AuditSummary extends BlockAudit { + timestamp?: number, + size?: number, + weight?: number, + tx_count?: number, + transactions: TransactionStripped[]; + template?: TransactionStripped[]; +} + export interface BlockPrice { height: number; priceId: number; diff --git a/backend/src/replication/AuditReplication.ts b/backend/src/replication/AuditReplication.ts new file mode 100644 index 000000000..b950acb6c --- /dev/null +++ b/backend/src/replication/AuditReplication.ts @@ -0,0 +1,123 @@ +import DB from '../database'; +import logger from '../logger'; +import { AuditSummary } from '../mempool.interfaces'; +import blocksAuditsRepository from '../repositories/BlocksAuditsRepository'; +import blocksSummariesRepository from '../repositories/BlocksSummariesRepository'; +import { $sync } from './replicator'; +import config from '../config'; +import { Common } from '../api/common'; + +const BATCH_SIZE = 16; + +/** + * Syncs missing block template and audit data from trusted servers + */ +class AuditReplication { + inProgress: boolean = false; + skip: Set = new Set(); + + public async $sync(): Promise { + if (!config.REPLICATION.ENABLED || !config.REPLICATION.AUDIT) { + // replication not enabled + return; + } + if (this.inProgress) { + logger.info(`AuditReplication sync already in progress`, 'Replication'); + return; + } + this.inProgress = true; + + const missingAudits = await this.$getMissingAuditBlocks(); + + logger.debug(`Fetching missing audit data for ${missingAudits.length} blocks from trusted servers`, 'Replication'); + + let totalSynced = 0; + let totalMissed = 0; + let loggerTimer = Date.now(); + // process missing audits in batches of + for (let i = 0; i < missingAudits.length; i += BATCH_SIZE) { + const results = await Promise.all(missingAudits.slice(i * BATCH_SIZE, (i + 1) * BATCH_SIZE).map(hash => this.$syncAudit(hash))); + const synced = results.reduce((total, status) => status ? total + 1 : total, 0); + totalSynced += synced; + totalMissed += (BATCH_SIZE - synced); + if (Date.now() - loggerTimer > 10000) { + loggerTimer = Date.now(); + logger.info(`Found ${totalSynced} / ${totalSynced + totalMissed} of ${missingAudits.length} missing audits`, 'Replication'); + } + await Common.sleep$(1000); + } + + logger.debug(`Fetched ${totalSynced} audits, ${totalMissed} still missing`, 'Replication'); + + this.inProgress = false; + } + + private async $syncAudit(hash: string): Promise { + if (this.skip.has(hash)) { + // we already know none of our trusted servers have this audit + return false; + } + + let success = false; + // start with a random server so load is uniformly spread + const syncResult = await $sync(`/api/v1/block/${hash}/audit-summary`); + if (syncResult) { + if (syncResult.data?.template?.length) { + await this.$saveAuditData(hash, syncResult.data); + success = true; + } + if (!syncResult.data && !syncResult.exists) { + this.skip.add(hash); + } + } + + return success; + } + + private async $getMissingAuditBlocks(): Promise { + try { + const startHeight = config.REPLICATION.AUDIT_START_HEIGHT || 0; + const [rows]: any[] = await DB.query(` + SELECT auditable.hash, auditable.height + FROM ( + SELECT hash, height + FROM blocks + WHERE height >= ? + ) AS auditable + LEFT JOIN blocks_audits ON auditable.hash = blocks_audits.hash + WHERE blocks_audits.hash IS NULL + ORDER BY auditable.height DESC + `, [startHeight]); + return rows.map(row => row.hash); + } catch (e: any) { + logger.err(`Cannot fetch missing audit blocks from db. Reason: ` + (e instanceof Error ? e.message : e)); + throw e; + } + } + + private async $saveAuditData(blockHash: string, auditSummary: AuditSummary): Promise { + // save audit & template to DB + await blocksSummariesRepository.$saveTemplate({ + height: auditSummary.height, + template: { + id: blockHash, + transactions: auditSummary.template || [] + } + }); + await blocksAuditsRepository.$saveAudit({ + hash: blockHash, + height: auditSummary.height, + time: auditSummary.timestamp || auditSummary.time, + missingTxs: auditSummary.missingTxs || [], + addedTxs: auditSummary.addedTxs || [], + freshTxs: auditSummary.freshTxs || [], + sigopTxs: auditSummary.sigopTxs || [], + matchRate: auditSummary.matchRate, + expectedFees: auditSummary.expectedFees, + expectedWeight: auditSummary.expectedWeight, + }); + } +} + +export default new AuditReplication(); + diff --git a/backend/src/replication/replicator.ts b/backend/src/replication/replicator.ts new file mode 100644 index 000000000..60dfa8a2d --- /dev/null +++ b/backend/src/replication/replicator.ts @@ -0,0 +1,70 @@ +import config from '../config'; +import backendInfo from '../api/backend-info'; +import axios, { AxiosResponse } from 'axios'; +import { SocksProxyAgent } from 'socks-proxy-agent'; +import * as https from 'https'; + +export async function $sync(path): Promise<{ data?: any, exists: boolean }> { + // start with a random server so load is uniformly spread + let allMissing = true; + const offset = Math.floor(Math.random() * config.REPLICATION.SERVERS.length); + for (let i = 0; i < config.REPLICATION.SERVERS.length; i++) { + const server = config.REPLICATION.SERVERS[(i + offset) % config.REPLICATION.SERVERS.length]; + // don't query ourself + if (server === backendInfo.getBackendInfo().hostname) { + continue; + } + + try { + const result = await query(`https://${server}${path}`); + if (result) { + return { data: result, exists: true }; + } + } catch (e: any) { + if (e?.response?.status === 404) { + // this server is also missing this data + } else { + // something else went wrong + allMissing = false; + } + } + } + + return { exists: !allMissing }; +} + +export async function query(path): Promise { + type axiosOptions = { + headers: { + 'User-Agent': string + }; + timeout: number; + httpsAgent?: https.Agent; + }; + const axiosOptions: axiosOptions = { + headers: { + 'User-Agent': (config.MEMPOOL.USER_AGENT === 'mempool') ? `mempool/v${backendInfo.getBackendInfo().version}` : `${config.MEMPOOL.USER_AGENT}` + }, + timeout: config.SOCKS5PROXY.ENABLED ? 30000 : 10000 + }; + + if (config.SOCKS5PROXY.ENABLED) { + const socksOptions = { + agentOptions: { + keepAlive: true, + }, + hostname: config.SOCKS5PROXY.HOST, + port: config.SOCKS5PROXY.PORT, + username: config.SOCKS5PROXY.USERNAME || 'circuit0', + password: config.SOCKS5PROXY.PASSWORD, + }; + + axiosOptions.httpsAgent = new SocksProxyAgent(socksOptions); + } + + const data: AxiosResponse = await axios.get(path, axiosOptions); + if (data.statusText === 'error' || !data.data) { + throw new Error(`${data.status}`); + } + return data.data; +} \ No newline at end of file diff --git a/production/mempool-config.mainnet.json b/production/mempool-config.mainnet.json index a76053913..5e25bcb76 100644 --- a/production/mempool-config.mainnet.json +++ b/production/mempool-config.mainnet.json @@ -48,5 +48,30 @@ "STATISTICS": { "ENABLED": true, "TX_PER_SECOND_SAMPLE_PERIOD": 150 + }, + "REPLICATION": { + "ENABLED": true, + "AUDIT": true, + "AUDIT_START_HEIGHT": 774000, + "SERVERS": [ + "node201.fmt.mempool.space", + "node202.fmt.mempool.space", + "node203.fmt.mempool.space", + "node204.fmt.mempool.space", + "node205.fmt.mempool.space", + "node206.fmt.mempool.space", + "node201.fra.mempool.space", + "node202.fra.mempool.space", + "node203.fra.mempool.space", + "node204.fra.mempool.space", + "node205.fra.mempool.space", + "node206.fra.mempool.space", + "node201.tk7.mempool.space", + "node202.tk7.mempool.space", + "node203.tk7.mempool.space", + "node204.tk7.mempool.space", + "node205.tk7.mempool.space", + "node206.tk7.mempool.space" + ] } }