Merge pull request #3736 from mempool/mononaut/optimize-websocket-updates

Optimize websocket updates
This commit is contained in:
wiz 2023-05-11 09:57:08 -05:00 committed by GitHub
commit 17dd02ed4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 152 additions and 76 deletions

View File

@ -130,8 +130,9 @@ class BitcoinRoutes {
private getInitData(req: Request, res: Response) {
try {
const result = websocketHandler.getInitData();
res.json(result);
const result = websocketHandler.getSerializedInitData();
res.set('Content-Type', 'application/json');
res.send(result);
} catch (e) {
res.status(500).send(e instanceof Error ? e.message : e);
}

View File

@ -30,6 +30,9 @@ class WebsocketHandler {
private numConnected = 0;
private numDisconnected = 0;
private initData: { [key: string]: string } = {};
private serializedInitData: string = '{}';
constructor() { }
setWebsocketServer(wss: WebSocket.Server) {
@ -38,6 +41,41 @@ class WebsocketHandler {
setExtraInitProperties(property: string, value: any) {
this.extraInitProperties[property] = value;
this.setInitDataFields(this.extraInitProperties);
}
private setInitDataFields(data: { [property: string]: any }): void {
for (const property of Object.keys(data)) {
if (data[property] != null) {
this.initData[property] = JSON.stringify(data[property]);
} else {
delete this.initData[property];
}
}
this.serializedInitData = '{'
+ Object.keys(this.initData).map(key => `"${key}": ${this.initData[key]}`).join(', ')
+ '}';
}
private updateInitData(): void {
const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
const da = difficultyAdjustment.getDifficultyAdjustment();
this.setInitDataFields({
'mempoolInfo': memPool.getMempoolInfo(),
'vBytesPerSecond': memPool.getVBytesPerSecond(),
'blocks': _blocks,
'conversions': priceUpdater.getLatestPrices(),
'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
'transactions': memPool.getLatestTransactions(),
'backendInfo': backendInfo.getBackendInfo(),
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined,
'fees': feeApi.getRecommendedFee(),
});
}
public getSerializedInitData(): string {
return this.serializedInitData;
}
setupConnectionHandling() {
@ -157,11 +195,13 @@ class WebsocketHandler {
}
if (parsedMessage.action === 'init') {
const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
if (!_blocks) {
if (!this.initData['blocks']?.length || !this.initData['da']) {
this.updateInitData();
}
if (!this.initData['blocks']?.length) {
return;
}
client.send(JSON.stringify(this.getInitData(_blocks)));
client.send(this.serializedInitData);
}
if (parsedMessage.action === 'ping') {
@ -210,11 +250,14 @@ class WebsocketHandler {
throw new Error('WebSocket.Server is not set');
}
this.setInitDataFields({ 'loadingIndicators': indicators });
const response = JSON.stringify({ loadingIndicators: indicators });
this.wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(JSON.stringify({ loadingIndicators: indicators }));
client.send(response);
});
}
@ -223,34 +266,17 @@ class WebsocketHandler {
throw new Error('WebSocket.Server is not set');
}
this.setInitDataFields({ 'conversions': conversionRates });
const response = JSON.stringify({ conversions: conversionRates });
this.wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(JSON.stringify({ conversions: conversionRates }));
client.send(response);
});
}
getInitData(_blocks?: BlockExtended[]) {
if (!_blocks) {
_blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
}
const da = difficultyAdjustment.getDifficultyAdjustment();
return {
'mempoolInfo': memPool.getMempoolInfo(),
'vBytesPerSecond': memPool.getVBytesPerSecond(),
'blocks': _blocks,
'conversions': priceUpdater.getLatestPrices(),
'mempool-blocks': mempoolBlocks.getMempoolBlocks(),
'transactions': memPool.getLatestTransactions(),
'backendInfo': backendInfo.getBackendInfo(),
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined,
'fees': feeApi.getRecommendedFee(),
...this.extraInitProperties
};
}
handleNewStatistic(stats: OptimizedStatistic) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
@ -258,6 +284,10 @@ class WebsocketHandler {
this.printLogs();
const response = JSON.stringify({
'live-2h-chart': stats
});
this.wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
@ -267,9 +297,7 @@ class WebsocketHandler {
return;
}
client.send(JSON.stringify({
'live-2h-chart': stats
}));
client.send(response);
});
}
@ -306,6 +334,43 @@ class WebsocketHandler {
}
const recommendedFees = feeApi.getRecommendedFee();
// update init data
this.updateInitData();
// cache serialized objects to avoid stringify-ing the same thing for every client
const responseCache = { ...this.initData };
function getCachedResponse(key: string, data): string {
if (!responseCache[key]) {
responseCache[key] = JSON.stringify(data);
}
return responseCache[key];
}
// pre-compute new tracked outspends
const outspendCache: { [txid: string]: { [vout: number]: { vin: number, txid: string } } } = {};
const trackedTxs = new Set<string>();
this.wss.clients.forEach((client) => {
if (client['track-tx']) {
trackedTxs.add(client['track-tx']);
}
});
if (trackedTxs.size > 0) {
for (const tx of newTransactions) {
for (let i = 0; i < tx.vin.length; i++) {
const vin = tx.vin[i];
if (trackedTxs.has(vin.txid)) {
if (!outspendCache[vin.txid]) {
outspendCache[vin.txid] = { [vin.vout]: { vin: i, txid: tx.txid }};
} else {
outspendCache[vin.txid][vin.vout] = { vin: i, txid: tx.txid };
}
}
}
}
}
const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
this.wss.clients.forEach(async (client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
@ -314,17 +379,17 @@ class WebsocketHandler {
const response = {};
if (client['want-stats']) {
response['mempoolInfo'] = mempoolInfo;
response['vBytesPerSecond'] = vBytesPerSecond;
response['transactions'] = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', vBytesPerSecond);
response['transactions'] = getCachedResponse('transactions', latestTransactions);
if (da?.previousTime) {
response['da'] = da;
response['da'] = getCachedResponse('da', da);
}
response['fees'] = recommendedFees;
response['fees'] = getCachedResponse('fees', recommendedFees);
}
if (client['want-mempool-blocks']) {
response['mempool-blocks'] = mBlocks;
response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks);
}
if (client['track-mempool-tx']) {
@ -333,12 +398,12 @@ class WebsocketHandler {
if (config.MEMPOOL.BACKEND !== 'esplora') {
try {
const fullTx = await transactionUtils.$getTransactionExtended(tx.txid, true);
response['tx'] = fullTx;
response['tx'] = JSON.stringify(fullTx);
} catch (e) {
logger.debug('Error finding transaction in mempool: ' + (e instanceof Error ? e.message : e));
}
} else {
response['tx'] = tx;
response['tx'] = JSON.stringify(tx);
}
client['track-mempool-tx'] = null;
}
@ -378,7 +443,7 @@ class WebsocketHandler {
}
if (foundTransactions.length) {
response['address-transactions'] = foundTransactions;
response['address-transactions'] = JSON.stringify(foundTransactions);
}
}
@ -407,65 +472,60 @@ class WebsocketHandler {
});
if (foundTransactions.length) {
response['address-transactions'] = foundTransactions;
response['address-transactions'] = JSON.stringify(foundTransactions);
}
}
if (client['track-tx']) {
const trackTxid = client['track-tx'];
const outspends: object = {};
newTransactions.forEach((tx) => tx.vin.forEach((vin, i) => {
if (vin.txid === trackTxid) {
outspends[vin.vout] = {
vin: i,
txid: tx.txid,
};
}
}));
const outspends = outspendCache[trackTxid];
if (Object.keys(outspends).length) {
response['utxoSpent'] = outspends;
if (outspends && Object.keys(outspends).length) {
response['utxoSpent'] = JSON.stringify(outspends);
}
const rbfReplacedBy = rbfCache.getReplacedBy(client['track-tx']);
if (rbfReplacedBy) {
response['rbfTransaction'] = {
response['rbfTransaction'] = JSON.stringify({
txid: rbfReplacedBy,
}
})
}
const rbfChange = rbfChanges.map[client['track-tx']];
if (rbfChange) {
response['rbfInfo'] = rbfChanges.trees[rbfChange];
response['rbfInfo'] = JSON.stringify(rbfChanges.trees[rbfChange]);
}
const mempoolTx = newMempool[trackTxid];
if (mempoolTx && mempoolTx.position) {
response['txPosition'] = {
response['txPosition'] = JSON.stringify({
txid: trackTxid,
position: mempoolTx.position,
};
});
}
}
if (client['track-mempool-block'] >= 0) {
const index = client['track-mempool-block'];
if (mBlockDeltas[index]) {
response['projected-block-transactions'] = {
response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
index: index,
delta: mBlockDeltas[index],
};
});
}
}
if (client['track-rbf'] === 'all' && rbfReplacements) {
response['rbfLatest'] = rbfReplacements;
response['rbfLatest'] = getCachedResponse('rbfLatest', rbfReplacements);
} else if (client['track-rbf'] === 'fullRbf' && fullRbfReplacements) {
response['rbfLatest'] = fullRbfReplacements;
response['rbfLatest'] = getCachedResponse('fullrbfLatest', fullRbfReplacements);
}
if (Object.keys(response).length) {
client.send(JSON.stringify(response));
const serializedResponse = '{'
+ Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
+ '}';
client.send(serializedResponse);
}
});
}
@ -556,6 +616,19 @@ class WebsocketHandler {
const da = difficultyAdjustment.getDifficultyAdjustment();
const fees = feeApi.getRecommendedFee();
// update init data
this.updateInitData();
const responseCache = { ...this.initData };
function getCachedResponse(key, data): string {
if (!responseCache[key]) {
responseCache[key] = JSON.stringify(data);
}
return responseCache[key];
}
const mempoolInfo = memPool.getMempoolInfo();
this.wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
@ -565,28 +638,27 @@ class WebsocketHandler {
return;
}
const response = {
'block': block,
'mempoolInfo': memPool.getMempoolInfo(),
'da': da?.previousTime ? da : undefined,
'fees': fees,
};
const response = {};
response['block'] = getCachedResponse('block', block);
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['da'] = getCachedResponse('da', da?.previousTime ? da : undefined);
response['fees'] = getCachedResponse('fees', fees);
if (mBlocks && client['want-mempool-blocks']) {
response['mempool-blocks'] = mBlocks;
response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks);
}
if (client['track-tx']) {
const trackTxid = client['track-tx'];
if (txIds.indexOf(trackTxid) > -1) {
response['txConfirmed'] = true;
response['txConfirmed'] = 'true';
} else {
const mempoolTx = _memPool[trackTxid];
if (mempoolTx && mempoolTx.position) {
response['txPosition'] = {
response['txPosition'] = JSON.stringify({
txid: trackTxid,
position: mempoolTx.position,
};
});
}
}
}
@ -614,7 +686,7 @@ class WebsocketHandler {
};
});
response['block-transactions'] = foundTransactions;
response['block-transactions'] = JSON.stringify(foundTransactions);
}
}
@ -651,21 +723,24 @@ class WebsocketHandler {
};
});
response['block-transactions'] = foundTransactions;
response['block-transactions'] = JSON.stringify(foundTransactions);
}
}
if (client['track-mempool-block'] >= 0) {
const index = client['track-mempool-block'];
if (mBlockDeltas && mBlockDeltas[index]) {
response['projected-block-transactions'] = {
response['projected-block-transactions'] = getCachedResponse(`projected-block-transactions-${index}`, {
index: index,
delta: mBlockDeltas[index],
};
});
}
}
client.send(JSON.stringify(response));
const serializedResponse = '{'
+ Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
+ '}';
client.send(serializedResponse);
});
}