Improvments to the mempool transaction subscription.
This commit is contained in:
parent
e2671df4be
commit
590f1d2b04
@ -2,7 +2,7 @@ const config = require('../../mempool-config.json');
|
|||||||
|
|
||||||
import * as WebSocket from 'ws';
|
import * as WebSocket from 'ws';
|
||||||
import * as fs from 'fs';
|
import * as fs from 'fs';
|
||||||
import { Block, TransactionExtended, Statistic } from '../interfaces';
|
import { Block, TransactionExtended, Statistic, WebsocketResponse } from '../interfaces';
|
||||||
import blocks from './blocks';
|
import blocks from './blocks';
|
||||||
import memPool from './mempool';
|
import memPool from './mempool';
|
||||||
import mempoolBlocks from './mempool-blocks';
|
import mempoolBlocks from './mempool-blocks';
|
||||||
@ -36,7 +36,8 @@ class WebsocketHandler {
|
|||||||
this.wss.on('connection', (client: WebSocket) => {
|
this.wss.on('connection', (client: WebSocket) => {
|
||||||
client.on('message', (message: any) => {
|
client.on('message', (message: any) => {
|
||||||
try {
|
try {
|
||||||
const parsedMessage = JSON.parse(message);
|
const parsedMessage: WebsocketResponse = JSON.parse(message);
|
||||||
|
const response = {};
|
||||||
|
|
||||||
if (parsedMessage.action === 'want') {
|
if (parsedMessage.action === 'want') {
|
||||||
client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1;
|
client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1;
|
||||||
@ -48,6 +49,15 @@ class WebsocketHandler {
|
|||||||
if (parsedMessage && parsedMessage['track-tx']) {
|
if (parsedMessage && parsedMessage['track-tx']) {
|
||||||
if (/^[a-fA-F0-9]{64}$/.test(parsedMessage['track-tx'])) {
|
if (/^[a-fA-F0-9]{64}$/.test(parsedMessage['track-tx'])) {
|
||||||
client['track-tx'] = parsedMessage['track-tx'];
|
client['track-tx'] = parsedMessage['track-tx'];
|
||||||
|
// Client is telling the transaction wasn't found but it might have appeared before we had the time to start watching for it
|
||||||
|
if (parsedMessage['watch-mempool']) {
|
||||||
|
const tx = memPool.getMempool()[client['track-tx']];
|
||||||
|
if (tx) {
|
||||||
|
response['tx'] = tx;
|
||||||
|
} else {
|
||||||
|
client['track-mempool-tx'] = parsedMessage['track-tx'];
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
client['track-tx'] = null;
|
client['track-tx'] = null;
|
||||||
}
|
}
|
||||||
@ -78,7 +88,11 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (parsedMessage.action === 'ping') {
|
if (parsedMessage.action === 'ping') {
|
||||||
client.send(JSON.stringify({'pong': true}));
|
response['pong'] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Object.keys(response).length) {
|
||||||
|
client.send(JSON.stringify(response));
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.log(e);
|
console.log(e);
|
||||||
@ -133,10 +147,11 @@ class WebsocketHandler {
|
|||||||
response['mempool-blocks'] = mBlocks;
|
response['mempool-blocks'] = mBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client['track-tx']) {
|
if (client['track-mempool-tx']) {
|
||||||
const tx = newTransactions.find((t) => t.txid === client['track-tx']);
|
const tx = newTransactions.find((t) => t.txid === client['track-mempool-tx']);
|
||||||
if (tx) {
|
if (tx) {
|
||||||
response['tx'] = tx;
|
response['tx'] = tx;
|
||||||
|
client['track-mempool-tx'] = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,4 +180,10 @@ export interface Outspend {
|
|||||||
vin: number;
|
vin: number;
|
||||||
status: Status;
|
status: Status;
|
||||||
}
|
}
|
||||||
|
export interface WebsocketResponse {
|
||||||
|
action: string;
|
||||||
|
data: string[];
|
||||||
|
'track-tx': string;
|
||||||
|
'track-address': string;
|
||||||
|
'watch-mempool': boolean;
|
||||||
|
}
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import { Component, OnInit, OnDestroy } from '@angular/core';
|
import { Component, OnInit, OnDestroy } from '@angular/core';
|
||||||
import { ElectrsApiService } from '../../services/electrs-api.service';
|
import { ElectrsApiService } from '../../services/electrs-api.service';
|
||||||
import { ActivatedRoute, ParamMap } from '@angular/router';
|
import { ActivatedRoute, ParamMap } from '@angular/router';
|
||||||
import { switchMap, filter, take, catchError, mergeMap, flatMap, mergeAll, tap, map } from 'rxjs/operators';
|
import { switchMap, filter, take, catchError, flatMap } from 'rxjs/operators';
|
||||||
import { Transaction, Block } from '../../interfaces/electrs.interface';
|
import { Transaction, Block } from '../../interfaces/electrs.interface';
|
||||||
import { of, merge, Subscription, Observable, scheduled } from 'rxjs';
|
import { of, merge, Subscription, Observable } from 'rxjs';
|
||||||
import { StateService } from '../../services/state.service';
|
import { StateService } from '../../services/state.service';
|
||||||
import { WebsocketService } from '../../services/websocket.service';
|
import { WebsocketService } from '../../services/websocket.service';
|
||||||
import { AudioService } from 'src/app/services/audio.service';
|
import { AudioService } from 'src/app/services/audio.service';
|
||||||
@ -122,7 +122,7 @@ export class TransactionComponent implements OnInit, OnDestroy {
|
|||||||
|
|
||||||
handleLoadElectrsTransactionError(error: any): Observable<any> {
|
handleLoadElectrsTransactionError(error: any): Observable<any> {
|
||||||
if (error.status === 404 && /^[a-fA-F0-9]{64}$/.test(this.txId)) {
|
if (error.status === 404 && /^[a-fA-F0-9]{64}$/.test(this.txId)) {
|
||||||
this.websocketService.startTrackTransaction(this.txId);
|
this.websocketService.startMultiTrackTransaction(this.txId);
|
||||||
this.waitingForTransaction = true;
|
this.waitingForTransaction = true;
|
||||||
}
|
}
|
||||||
this.error = error;
|
this.error = error;
|
||||||
|
@ -13,6 +13,7 @@ export interface WebsocketResponse {
|
|||||||
tx?: Transaction;
|
tx?: Transaction;
|
||||||
'track-tx'?: string;
|
'track-tx'?: string;
|
||||||
'track-address'?: string;
|
'track-address'?: string;
|
||||||
|
'watch-mempool'?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface MempoolBlock {
|
export interface MempoolBlock {
|
||||||
|
@ -133,6 +133,11 @@ export class WebsocketService {
|
|||||||
this.isTrackingTx = true;
|
this.isTrackingTx = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startMultiTrackTransaction(txId: string) {
|
||||||
|
this.websocketSubject.next({ 'track-tx': txId, 'watch-mempool': true });
|
||||||
|
this.isTrackingTx = true;
|
||||||
|
}
|
||||||
|
|
||||||
stopTrackingTransaction() {
|
stopTrackingTransaction() {
|
||||||
if (this.isTrackingTx === false) {
|
if (this.isTrackingTx === false) {
|
||||||
return;
|
return;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user