diff --git a/backend/src/__fixtures__/mempool-config.template.json b/backend/src/__fixtures__/mempool-config.template.json index 1c973c45b..e2719c495 100644 --- a/backend/src/__fixtures__/mempool-config.template.json +++ b/backend/src/__fixtures__/mempool-config.template.json @@ -7,6 +7,7 @@ "BLOCKS_SUMMARIES_INDEXING": true, "GOGGLES_INDEXING": false, "HTTP_PORT": 1, + "UNIX_SOCKET_PATH": "/mempool/socket/mempool-bitcoin-mainnet", "SPAWN_CLUSTER_PROCS": 2, "API_URL_PREFIX": "__MEMPOOL_API_URL_PREFIX__", "AUTOMATIC_BLOCK_REINDEXING": false, diff --git a/backend/src/__tests__/config.test.ts b/backend/src/__tests__/config.test.ts index a7f447941..379e29f8e 100644 --- a/backend/src/__tests__/config.test.ts +++ b/backend/src/__tests__/config.test.ts @@ -20,6 +20,7 @@ describe('Mempool Backend Config', () => { BLOCKS_SUMMARIES_INDEXING: false, GOGGLES_INDEXING: false, HTTP_PORT: 8999, + UNIX_SOCKET_PATH: '/mempool/socket/mempool-bitcoin-mainnet', SPAWN_CLUSTER_PROCS: 0, API_URL_PREFIX: '/api/v1/', AUTOMATIC_BLOCK_REINDEXING: false, diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index ef4a02d4f..621f2996a 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -44,7 +44,7 @@ const wantable = [ ]; class WebsocketHandler { - private wss: WebSocket.Server | undefined; + private webSocketServers: WebSocket.Server[] = []; private extraInitProperties = {}; private numClients = 0; @@ -57,8 +57,8 @@ class WebsocketHandler { constructor() { } - setWebsocketServer(wss: WebSocket.Server) { - this.wss = wss; + addWebsocketServer(wss: WebSocket.Server) { + this.webSocketServers.push(wss); } setExtraInitData(property: string, value: any) { @@ -102,11 +102,13 @@ class WebsocketHandler { } setupConnectionHandling() { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } - this.wss.on('connection', (client: WebSocket, req) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.on('connection', (client: WebSocket, req) => { this.numConnected++; client['remoteAddress'] = req.headers['x-forwarded-for'] || req.socket?.remoteAddress || 'unknown'; client.on('error', (e) => { @@ -369,14 +371,17 @@ class WebsocketHandler { } }); }); + } } handleNewDonation(id: string) { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } @@ -384,43 +389,50 @@ class WebsocketHandler { client.send(JSON.stringify({ donationConfirmed: true })); } }); + } } handleLoadingChanged(indicators: ILoadingIndicators) { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } this.updateSocketDataFields({ 'loadingIndicators': indicators }); const response = JSON.stringify({ loadingIndicators: indicators }); - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } client.send(response); }); + } } handleNewConversionRates(conversionRates: ApiPrice) { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } this.updateSocketDataFields({ 'conversions': conversionRates }); const response = JSON.stringify({ conversions: conversionRates }); - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } client.send(response); }); + } } handleNewStatistic(stats: OptimizedStatistic) { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } this.printLogs(); @@ -429,7 +441,9 @@ class WebsocketHandler { 'live-2h-chart': stats }); - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } @@ -440,11 +454,12 @@ class WebsocketHandler { client.send(response); }); + } } handleReorg(): void { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } const da = difficultyAdjustment.getDifficultyAdjustment(); @@ -455,7 +470,9 @@ class WebsocketHandler { 'da': da?.previousTime ? da : undefined, }); - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } @@ -473,13 +490,14 @@ class WebsocketHandler { client.send(this.serializeResponse(response)); } }); + } } async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates): Promise { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } this.printLogs(); @@ -552,7 +570,9 @@ class WebsocketHandler { // pre-compute new tracked outspends const outspendCache: { [txid: string]: { [vout: number]: { vin: number, txid: string } } } = {}; const trackedTxs = new Set(); - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client['track-tx']) { trackedTxs.add(client['track-tx']); } @@ -562,6 +582,7 @@ class WebsocketHandler { } } }); + } if (trackedTxs.size > 0) { for (const tx of newTransactions) { for (let i = 0; i < tx.vin.length; i++) { @@ -581,7 +602,9 @@ class WebsocketHandler { const addressCache = this.makeAddressCache(newTransactions); const removedAddressCache = this.makeAddressCache(deletedTransactions); - this.wss.clients.forEach(async (client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach(async (client) => { if (client.readyState !== WebSocket.OPEN) { return; } @@ -821,11 +844,12 @@ class WebsocketHandler { client.send(this.serializeResponse(response)); } }); + } } async handleNewBlock(block: BlockExtended, txIds: string[], transactions: MempoolTransactionExtended[]): Promise { - if (!this.wss) { - throw new Error('WebSocket.Server is not set'); + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server have been set'); } this.printLogs(); @@ -969,7 +993,10 @@ class WebsocketHandler { return responseCache[key]; } - this.wss.clients.forEach((client) => { + + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } @@ -1150,6 +1177,7 @@ class WebsocketHandler { client.send(this.serializeResponse(response)); } }); + } await statistics.runStatistics(); } @@ -1231,13 +1259,15 @@ class WebsocketHandler { } private printLogs(): void { - if (this.wss) { + if (this.webSocketServers.length) { let numTxSubs = 0; let numTxsSubs = 0; let numProjectedSubs = 0; let numRbfSubs = 0; - this.wss.clients.forEach((client) => { + // TODO - Fix indentation after PR is merged + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { if (client['track-tx']) { numTxSubs++; } @@ -1251,8 +1281,12 @@ class WebsocketHandler { numRbfSubs++; } }) + } - const count = this.wss?.clients?.size || 0; + let count = 0; + for (const server of this.webSocketServers) { + count += server.clients?.size || 0; + } const diff = count - this.numClients; this.numClients = count; logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`); diff --git a/backend/src/config.ts b/backend/src/config.ts index 93ac90834..daaca34ba 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -9,6 +9,7 @@ interface IConfig { NETWORK: 'mainnet' | 'testnet' | 'signet' | 'liquid' | 'liquidtestnet'; BACKEND: 'esplora' | 'electrum' | 'none'; HTTP_PORT: number; + UNIX_SOCKET_PATH: string; SPAWN_CLUSTER_PROCS: number; API_URL_PREFIX: string; POLL_RATE_MS: number; @@ -164,6 +165,7 @@ const defaults: IConfig = { 'NETWORK': 'mainnet', 'BACKEND': 'none', 'HTTP_PORT': 8999, + 'UNIX_SOCKET_PATH': '/mempool/socket/mempool-bitcoin-mainnet', 'SPAWN_CLUSTER_PROCS': 0, 'API_URL_PREFIX': '/api/v1/', 'POLL_RATE_MS': 2000, diff --git a/backend/src/index.ts b/backend/src/index.ts index 0b2cbb003..3840afa10 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -48,7 +48,9 @@ import aboutRoutes from './api/about.routes'; class Server { private wss: WebSocket.Server | undefined; + private wssUnixSocket: WebSocket.Server | undefined; private server: http.Server | undefined; + private serverUnixSocket: http.Server | undefined; private app: Application; private currentBackendRetryInterval = 1; private backendRetryCount = 0; @@ -136,7 +138,9 @@ class Server { } this.server = http.createServer(this.app); + this.serverUnixSocket = http.createServer(this.app); this.wss = new WebSocket.Server({ server: this.server }); + this.wssUnixSocket = new WebSocket.Server({ server: this.serverUnixSocket }); this.setUpWebsocketHandling(); @@ -192,6 +196,14 @@ class Server { logger.notice(`Mempool Server is running on port ${config.MEMPOOL.HTTP_PORT}`); } }); + + this.serverUnixSocket.listen(config.MEMPOOL.UNIX_SOCKET_PATH, () => { + if (worker) { + logger.info(`Mempool Server worker #${process.pid} started`); + } else { + logger.notice(`Mempool Server is listening on ${config.MEMPOOL.UNIX_SOCKET_PATH}`); + } + }); } async runMainUpdateLoop(): Promise { @@ -265,8 +277,12 @@ class Server { setUpWebsocketHandling(): void { if (this.wss) { - websocketHandler.setWebsocketServer(this.wss); + websocketHandler.addWebsocketServer(this.wss); } + if (this.wssUnixSocket) { + websocketHandler.addWebsocketServer(this.wssUnixSocket); + } + if (Common.isLiquid() && config.DATABASE.ENABLED) { blocks.setNewBlockCallback(async () => { try { @@ -338,6 +354,10 @@ class Server { if (config.DATABASE.ENABLED) { DB.releasePidLock(); } + this.server?.close(); + this.serverUnixSocket?.close(); + this.wss?.close(); + this.wssUnixSocket?.close(); process.exit(code); } diff --git a/docker/backend/mempool-config.json b/docker/backend/mempool-config.json index ea3fb56a1..7b662bed4 100644 --- a/docker/backend/mempool-config.json +++ b/docker/backend/mempool-config.json @@ -6,6 +6,7 @@ "OFFICIAL": __MEMPOOL_OFFICIAL__, "HTTP_PORT": __MEMPOOL_HTTP_PORT__, "SPAWN_CLUSTER_PROCS": __MEMPOOL_SPAWN_CLUSTER_PROCS__, + "UNIX_SOCKET_PATH": "__MEMPOOL_UNIX_SOCKET_PATH__", "API_URL_PREFIX": "__MEMPOOL_API_URL_PREFIX__", "POLL_RATE_MS": __MEMPOOL_POLL_RATE_MS__, "CACHE_DIR": "__MEMPOOL_CACHE_DIR__", diff --git a/docker/backend/start.sh b/docker/backend/start.sh index 611cba387..6d0a0d0c4 100755 --- a/docker/backend/start.sh +++ b/docker/backend/start.sh @@ -7,6 +7,7 @@ __MEMPOOL_ENABLED__=${MEMPOOL_ENABLED:=true} __MEMPOOL_OFFICIAL__=${MEMPOOL_OFFICIAL:=false} __MEMPOOL_HTTP_PORT__=${BACKEND_HTTP_PORT:=8999} __MEMPOOL_SPAWN_CLUSTER_PROCS__=${MEMPOOL_SPAWN_CLUSTER_PROCS:=0} +__MEMPOOL_UNIX_SOCKET_PATH__=${MEMPOOL_UNIX_SOCKET_PATH:=/mempool/socket/mempool-bitcoin-mainnet} __MEMPOOL_API_URL_PREFIX__=${MEMPOOL_API_URL_PREFIX:=/api/v1/} __MEMPOOL_POLL_RATE_MS__=${MEMPOOL_POLL_RATE_MS:=2000} __MEMPOOL_CACHE_DIR__=${MEMPOOL_CACHE_DIR:=./cache} @@ -160,6 +161,7 @@ sed -i "s!__MEMPOOL_ENABLED__!${__MEMPOOL_ENABLED__}!g" mempool-config.json sed -i "s!__MEMPOOL_OFFICIAL__!${__MEMPOOL_OFFICIAL__}!g" mempool-config.json sed -i "s!__MEMPOOL_HTTP_PORT__!${__MEMPOOL_HTTP_PORT__}!g" mempool-config.json sed -i "s!__MEMPOOL_SPAWN_CLUSTER_PROCS__!${__MEMPOOL_SPAWN_CLUSTER_PROCS__}!g" mempool-config.json +sed -i "s!__MEMPOOL_UNIX_SOCKET_PATH__!${__MEMPOOL_UNIX_SOCKET_PATH__}!g" mempool-config.json sed -i "s!__MEMPOOL_API_URL_PREFIX__!${__MEMPOOL_API_URL_PREFIX__}!g" mempool-config.json sed -i "s!__MEMPOOL_POLL_RATE_MS__!${__MEMPOOL_POLL_RATE_MS__}!g" mempool-config.json sed -i "s!__MEMPOOL_CACHE_DIR__!${__MEMPOOL_CACHE_DIR__}!g" mempool-config.json