Merge pull request #4541 from mempool/mononaut/optimize-new-block-ws
Optimize websocket updates on new block
This commit is contained in:
commit
8492007a48
@ -282,8 +282,7 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Object.keys(response).length) {
|
if (Object.keys(response).length) {
|
||||||
const serializedResponse = this.serializeResponse(response);
|
client.send(this.serializeResponse(response));
|
||||||
client.send(serializedResponse);
|
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.debug(`Error parsing websocket message from ${client['remoteAddress']}: ` + (e instanceof Error ? e.message : e));
|
logger.debug(`Error parsing websocket message from ${client['remoteAddress']}: ` + (e instanceof Error ? e.message : e));
|
||||||
@ -392,8 +391,7 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Object.keys(response).length) {
|
if (Object.keys(response).length) {
|
||||||
const serializedResponse = this.serializeResponse(response);
|
client.send(this.serializeResponse(response));
|
||||||
client.send(serializedResponse);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -639,8 +637,7 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Object.keys(response).length) {
|
if (Object.keys(response).length) {
|
||||||
const serializedResponse = this.serializeResponse(response);
|
client.send(this.serializeResponse(response));
|
||||||
client.send(serializedResponse);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -738,10 +735,13 @@ class WebsocketHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const confirmedTxids: { [txid: string]: boolean } = {};
|
||||||
|
|
||||||
// Update mempool to remove transactions included in the new block
|
// Update mempool to remove transactions included in the new block
|
||||||
for (const txId of txIds) {
|
for (const txId of txIds) {
|
||||||
delete _memPool[txId];
|
delete _memPool[txId];
|
||||||
rbfCache.mined(txId);
|
rbfCache.mined(txId);
|
||||||
|
confirmedTxids[txId] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
|
if (config.MEMPOOL.ADVANCED_GBT_MEMPOOL) {
|
||||||
@ -773,6 +773,8 @@ class WebsocketHandler {
|
|||||||
'fees': fees,
|
'fees': fees,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions();
|
||||||
|
|
||||||
const responseCache = { ...this.socketData };
|
const responseCache = { ...this.socketData };
|
||||||
function getCachedResponse(key, data): string {
|
function getCachedResponse(key, data): string {
|
||||||
if (!responseCache[key]) {
|
if (!responseCache[key]) {
|
||||||
@ -808,7 +810,7 @@ class WebsocketHandler {
|
|||||||
|
|
||||||
if (client['track-tx']) {
|
if (client['track-tx']) {
|
||||||
const trackTxid = client['track-tx'];
|
const trackTxid = client['track-tx'];
|
||||||
if (trackTxid && txIds.indexOf(trackTxid) > -1) {
|
if (trackTxid && confirmedTxids[trackTxid]) {
|
||||||
response['txConfirmed'] = JSON.stringify(trackTxid);
|
response['txConfirmed'] = JSON.stringify(trackTxid);
|
||||||
} else {
|
} else {
|
||||||
const mempoolTx = _memPool[trackTxid];
|
const mempoolTx = _memPool[trackTxid];
|
||||||
@ -880,17 +882,24 @@ class WebsocketHandler {
|
|||||||
|
|
||||||
if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
|
if (client['track-mempool-block'] >= 0 && memPool.isInSync()) {
|
||||||
const index = client['track-mempool-block'];
|
const index = client['track-mempool-block'];
|
||||||
if (mBlockDeltas && mBlockDeltas[index]) {
|
|
||||||
response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
|
if (mBlockDeltas && mBlockDeltas[index] && mBlocksWithTransactions[index]?.transactions?.length) {
|
||||||
index: index,
|
if (mBlockDeltas[index].added.length > (mBlocksWithTransactions[index]?.transactions.length / 2)) {
|
||||||
delta: mBlockDeltas[index],
|
response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-full-${index}`, {
|
||||||
});
|
index: index,
|
||||||
|
blockTransactions: mBlocksWithTransactions[index].transactions,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-delta-${index}`, {
|
||||||
|
index: index,
|
||||||
|
delta: mBlockDeltas[index],
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Object.keys(response).length) {
|
if (Object.keys(response).length) {
|
||||||
const serializedResponse = this.serializeResponse(response);
|
client.send(this.serializeResponse(response));
|
||||||
client.send(serializedResponse);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -951,10 +960,27 @@ class WebsocketHandler {
|
|||||||
|
|
||||||
private printLogs(): void {
|
private printLogs(): void {
|
||||||
if (this.wss) {
|
if (this.wss) {
|
||||||
|
let numTxSubs = 0;
|
||||||
|
let numProjectedSubs = 0;
|
||||||
|
let numRbfSubs = 0;
|
||||||
|
|
||||||
|
this.wss.clients.forEach((client) => {
|
||||||
|
if (client['track-tx']) {
|
||||||
|
numTxSubs++;
|
||||||
|
}
|
||||||
|
if (client['track-mempool-block'] >= 0) {
|
||||||
|
numProjectedSubs++;
|
||||||
|
}
|
||||||
|
if (client['track-rbf']) {
|
||||||
|
numRbfSubs++;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
const count = this.wss?.clients?.size || 0;
|
const count = this.wss?.clients?.size || 0;
|
||||||
const diff = count - this.numClients;
|
const diff = count - this.numClients;
|
||||||
this.numClients = count;
|
this.numClients = count;
|
||||||
logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);
|
logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);
|
||||||
|
logger.debug(`websocket subscriptions: track-tx: ${numTxSubs}, track-mempool-block: ${numProjectedSubs} track-rbf: ${numRbfSubs}`);
|
||||||
this.numConnected = 0;
|
this.numConnected = 0;
|
||||||
this.numDisconnected = 0;
|
this.numDisconnected = 0;
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,8 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
blockSub: Subscription;
|
blockSub: Subscription;
|
||||||
deltaSub: Subscription;
|
deltaSub: Subscription;
|
||||||
|
|
||||||
|
firstLoad: boolean = true;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
public stateService: StateService,
|
public stateService: StateService,
|
||||||
private websocketService: WebsocketService,
|
private websocketService: WebsocketService,
|
||||||
@ -58,7 +60,40 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
)
|
)
|
||||||
.pipe(switchMap(() => this.stateService.mempoolBlockTransactions$))
|
.pipe(switchMap(() => this.stateService.mempoolBlockTransactions$))
|
||||||
.subscribe((transactionsStripped) => {
|
.subscribe((transactionsStripped) => {
|
||||||
this.replaceBlock(transactionsStripped);
|
if (this.firstLoad) {
|
||||||
|
this.replaceBlock(transactionsStripped);
|
||||||
|
} else {
|
||||||
|
const inOldBlock = {};
|
||||||
|
const inNewBlock = {};
|
||||||
|
const added: TransactionStripped[] = [];
|
||||||
|
const changed: { txid: string, rate: number | undefined, acc: boolean | undefined }[] = [];
|
||||||
|
const removed: string[] = [];
|
||||||
|
for (const tx of transactionsStripped) {
|
||||||
|
inNewBlock[tx.txid] = true;
|
||||||
|
}
|
||||||
|
for (const txid of Object.keys(this.blockGraph?.scene?.txs || {})) {
|
||||||
|
inOldBlock[txid] = true;
|
||||||
|
if (!inNewBlock[txid]) {
|
||||||
|
removed.push(txid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const tx of transactionsStripped) {
|
||||||
|
if (!inOldBlock[tx.txid]) {
|
||||||
|
added.push(tx);
|
||||||
|
} else {
|
||||||
|
changed.push({
|
||||||
|
txid: tx.txid,
|
||||||
|
rate: tx.rate,
|
||||||
|
acc: tx.acc
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.updateBlock({
|
||||||
|
removed,
|
||||||
|
changed,
|
||||||
|
added
|
||||||
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
this.deltaSub = this.stateService.mempoolBlockDelta$.subscribe((delta) => {
|
this.deltaSub = this.stateService.mempoolBlockDelta$.subscribe((delta) => {
|
||||||
this.updateBlock(delta);
|
this.updateBlock(delta);
|
||||||
@ -67,6 +102,7 @@ export class MempoolBlockOverviewComponent implements OnInit, OnDestroy, OnChang
|
|||||||
|
|
||||||
ngOnChanges(changes): void {
|
ngOnChanges(changes): void {
|
||||||
if (changes.index) {
|
if (changes.index) {
|
||||||
|
this.firstLoad = true;
|
||||||
if (this.blockGraph) {
|
if (this.blockGraph) {
|
||||||
this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection);
|
this.blockGraph.clear(changes.index.currentValue > changes.index.previousValue ? this.chainDirection : this.poolDirection);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user