Merge branch 'master' into mononaut/effective-rate-templates

This commit is contained in:
softsimon
2023-07-02 18:11:28 +02:00
committed by GitHub
27 changed files with 484 additions and 206 deletions

View File

@@ -224,7 +224,12 @@ class BitcoinRoutes {
} else {
let cpfpInfo;
if (config.DATABASE.ENABLED) {
cpfpInfo = await transactionRepository.$getCpfpInfo(req.params.txId);
try {
cpfpInfo = await transactionRepository.$getCpfpInfo(req.params.txId);
} catch (e) {
res.status(500).send('failed to get CPFP info');
return;
}
}
if (cpfpInfo) {
res.json(cpfpInfo);

View File

@@ -738,6 +738,11 @@ class Blocks {
this.currentDifficulty = block.difficulty;
}
// wait for pending async callbacks to finish
this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
await Promise.all(callbackPromises);
this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
this.blocks.push(blockExtended);
if (this.blocks.length > config.MEMPOOL.INITIAL_BLOCKS_AMOUNT * 4) {
this.blocks = this.blocks.slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT * 4);
@@ -754,11 +759,6 @@ class Blocks {
diskCache.$saveCacheToDisk();
}
// wait for pending async callbacks to finish
this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`);
await Promise.all(callbackPromises);
this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`);
handledBlocks++;
}
@@ -1072,9 +1072,13 @@ class Blocks {
}
public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise<void> {
const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
if (!result) {
await cpfpRepository.$insertProgressMarker(height);
try {
const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
if (!result) {
await cpfpRepository.$insertProgressMarker(height);
}
} catch (e) {
// not a fatal error, we'll try again next time the indexer runs
}
}
}

View File

@@ -143,7 +143,7 @@ class MempoolBlocks {
const stackWeight = transactionsSorted.slice(index).reduce((total, tx) => total + (tx.weight || 0), 0);
if (stackWeight > config.MEMPOOL.BLOCK_WEIGHT_UNITS) {
onlineStats = true;
feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5);
feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5, [10, 20, 30, 40, 50, 60, 70, 80, 90]);
feeStatsCalculator.processNext(tx);
}
}
@@ -334,7 +334,7 @@ class MempoolBlocks {
if (hasBlockStack) {
stackWeight = blocks[blocks.length - 1].reduce((total, tx) => total + (mempool[tx]?.weight || 0), 0);
hasBlockStack = stackWeight > config.MEMPOOL.BLOCK_WEIGHT_UNITS;
feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5);
feeStatsCalculator = new OnlineFeeStatsCalculator(stackWeight, 0.5, [10, 20, 30, 40, 50, 60, 70, 80, 90]);
}
const readyBlocks: { transactionIds, transactions, totalSize, totalWeight, totalFees, feeStats }[] = [];

View File

@@ -211,7 +211,7 @@ class StatisticsApi {
CAST(avg(vsize_1800) as DOUBLE) as vsize_1800,
CAST(avg(vsize_2000) as DOUBLE) as vsize_2000 \
FROM statistics \
WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW() \
${interval === 'all' ? '' : `WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW()`} \
GROUP BY UNIX_TIMESTAMP(added) DIV ${div} \
ORDER BY statistics.added DESC;`;
}
@@ -259,7 +259,7 @@ class StatisticsApi {
vsize_1800,
vsize_2000 \
FROM statistics \
WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW() \
${interval === 'all' ? '' : `WHERE added BETWEEN DATE_SUB(NOW(), INTERVAL ${interval}) AND NOW()`} \
GROUP BY UNIX_TIMESTAMP(added) DIV ${div} \
ORDER BY statistics.added DESC;`;
}
@@ -386,6 +386,17 @@ class StatisticsApi {
}
}
public async $listAll(): Promise<OptimizedStatistic[]> {
try {
const query = this.getQueryForDays(43200, 'all'); // 12h interval
const [rows] = await DB.query({ sql: query, timeout: this.queryTimeout });
return this.mapStatisticToOptimizedStatistic(rows as Statistic[]);
} catch (e) {
logger.err('$listAll() error' + (e instanceof Error ? e.message : e));
return [];
}
}
private mapStatisticToOptimizedStatistic(statistic: Statistic[]): OptimizedStatistic[] {
return statistic.map((s) => {
return {

View File

@@ -15,10 +15,11 @@ class StatisticsRoutes {
.get(config.MEMPOOL.API_URL_PREFIX + 'statistics/2y', this.$getStatisticsByTime.bind(this, '2y'))
.get(config.MEMPOOL.API_URL_PREFIX + 'statistics/3y', this.$getStatisticsByTime.bind(this, '3y'))
.get(config.MEMPOOL.API_URL_PREFIX + 'statistics/4y', this.$getStatisticsByTime.bind(this, '4y'))
.get(config.MEMPOOL.API_URL_PREFIX + 'statistics/all', this.$getStatisticsByTime.bind(this, 'all'))
;
}
private async $getStatisticsByTime(time: '2h' | '24h' | '1w' | '1m' | '3m' | '6m' | '1y' | '2y' | '3y' | '4y', req: Request, res: Response) {
private async $getStatisticsByTime(time: '2h' | '24h' | '1w' | '1m' | '3m' | '6m' | '1y' | '2y' | '3y' | '4y' | 'all', req: Request, res: Response) {
res.header('Pragma', 'public');
res.header('Cache-control', 'public');
res.setHeader('Expires', new Date(Date.now() + 1000 * 300).toUTCString());
@@ -26,10 +27,6 @@ class StatisticsRoutes {
try {
let result;
switch (time as string) {
case '2h':
result = await statisticsApi.$list2H();
res.setHeader('Expires', new Date(Date.now() + 1000 * 30).toUTCString());
break;
case '24h':
result = await statisticsApi.$list24H();
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
@@ -58,8 +55,13 @@ class StatisticsRoutes {
case '4y':
result = await statisticsApi.$list4Y();
break;
case 'all':
result = await statisticsApi.$listAll();
break;
default:
result = await statisticsApi.$list2H();
res.setHeader('Expires', new Date(Date.now() + 1000 * 30).toUTCString());
break;
}
res.json(result);
} catch (e) {

View File

@@ -22,6 +22,14 @@ import { deepClone } from '../utils/clone';
import priceUpdater from '../tasks/price-updater';
import { ApiPrice } from '../repositories/PricesRepository';
// valid 'want' subscriptions
const wantable = [
'blocks',
'mempool-blocks',
'live-2h-chart',
'stats',
];
class WebsocketHandler {
private wss: WebSocket.Server | undefined;
private extraInitProperties = {};
@@ -30,7 +38,7 @@ class WebsocketHandler {
private numConnected = 0;
private numDisconnected = 0;
private initData: { [key: string]: string } = {};
private socketData: { [key: string]: string } = {};
private serializedInitData: string = '{}';
constructor() { }
@@ -39,28 +47,28 @@ class WebsocketHandler {
this.wss = wss;
}
setExtraInitProperties(property: string, value: any) {
setExtraInitData(property: string, value: any) {
this.extraInitProperties[property] = value;
this.setInitDataFields(this.extraInitProperties);
this.updateSocketDataFields(this.extraInitProperties);
}
private setInitDataFields(data: { [property: string]: any }): void {
private updateSocketDataFields(data: { [property: string]: any }): void {
for (const property of Object.keys(data)) {
if (data[property] != null) {
this.initData[property] = JSON.stringify(data[property]);
this.socketData[property] = JSON.stringify(data[property]);
} else {
delete this.initData[property];
delete this.socketData[property];
}
}
this.serializedInitData = '{'
+ Object.keys(this.initData).map(key => `"${key}": ${this.initData[key]}`).join(', ')
+ '}';
+ Object.keys(this.socketData).map(key => `"${key}": ${this.socketData[key]}`).join(', ')
+ '}';
}
private updateInitData(): void {
private updateSocketData(): void {
const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT);
const da = difficultyAdjustment.getDifficultyAdjustment();
this.setInitDataFields({
this.updateSocketDataFields({
'mempoolInfo': memPool.getMempoolInfo(),
'vBytesPerSecond': memPool.getVBytesPerSecond(),
'blocks': _blocks,
@@ -94,11 +102,33 @@ class WebsocketHandler {
const parsedMessage: WebsocketResponse = JSON.parse(message);
const response = {};
if (parsedMessage.action === 'want') {
client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1;
client['want-mempool-blocks'] = parsedMessage.data.indexOf('mempool-blocks') > -1;
client['want-live-2h-chart'] = parsedMessage.data.indexOf('live-2h-chart') > -1;
client['want-stats'] = parsedMessage.data.indexOf('stats') > -1;
const wantNow = {};
if (parsedMessage && parsedMessage.action === 'want' && Array.isArray(parsedMessage.data)) {
for (const sub of wantable) {
const key = `want-${sub}`;
const wants = parsedMessage.data.includes(sub);
if (wants && client['wants'] && !client[key]) {
wantNow[key] = true;
}
client[key] = wants;
}
client['wants'] = true;
}
// send initial data when a client first starts a subscription
if (wantNow['want-blocks'] || (parsedMessage && parsedMessage['refresh-blocks'])) {
response['blocks'] = this.socketData['blocks'];
}
if (wantNow['want-mempool-blocks']) {
response['mempool-blocks'] = this.socketData['mempool-blocks'];
}
if (wantNow['want-stats']) {
response['mempoolInfo'] = this.socketData['mempoolInfo'];
response['vBytesPerSecond'] = this.socketData['vBytesPerSecond'];
response['fees'] = this.socketData['fees'];
response['da'] = this.socketData['da'];
}
if (parsedMessage && parsedMessage['track-tx']) {
@@ -109,21 +139,21 @@ class WebsocketHandler {
if (parsedMessage['watch-mempool']) {
const rbfCacheTxid = rbfCache.getReplacedBy(trackTxid);
if (rbfCacheTxid) {
response['txReplaced'] = {
response['txReplaced'] = JSON.stringify({
txid: rbfCacheTxid,
};
});
client['track-tx'] = null;
} else {
// It might have appeared before we had the time to start watching for it
const tx = memPool.getMempool()[trackTxid];
if (tx) {
if (config.MEMPOOL.BACKEND === 'esplora') {
response['tx'] = tx;
response['tx'] = JSON.stringify(tx);
} else {
// tx.prevout is missing from transactions when in bitcoind mode
try {
const fullTx = await transactionUtils.$getMempoolTransactionExtended(tx.txid, true);
response['tx'] = fullTx;
response['tx'] = JSON.stringify(fullTx);
} catch (e) {
logger.debug('Error finding transaction: ' + (e instanceof Error ? e.message : e));
}
@@ -131,7 +161,7 @@ class WebsocketHandler {
} else {
try {
const fullTx = await transactionUtils.$getMempoolTransactionExtended(client['track-tx'], true);
response['tx'] = fullTx;
response['tx'] = JSON.stringify(fullTx);
} catch (e) {
logger.debug('Error finding transaction. ' + (e instanceof Error ? e.message : e));
client['track-mempool-tx'] = parsedMessage['track-tx'];
@@ -141,10 +171,10 @@ class WebsocketHandler {
}
const tx = memPool.getMempool()[trackTxid];
if (tx && tx.position) {
response['txPosition'] = {
response['txPosition'] = JSON.stringify({
txid: trackTxid,
position: tx.position,
};
});
}
} else {
client['track-tx'] = null;
@@ -177,10 +207,10 @@ class WebsocketHandler {
const index = parsedMessage['track-mempool-block'];
client['track-mempool-block'] = index;
const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions();
response['projected-block-transactions'] = {
response['projected-block-transactions'] = JSON.stringify({
index: index,
blockTransactions: mBlocksWithTransactions[index]?.transactions || [],
};
});
} else {
client['track-mempool-block'] = null;
}
@@ -189,23 +219,24 @@ class WebsocketHandler {
if (parsedMessage && parsedMessage['track-rbf'] !== undefined) {
if (['all', 'fullRbf'].includes(parsedMessage['track-rbf'])) {
client['track-rbf'] = parsedMessage['track-rbf'];
response['rbfLatest'] = JSON.stringify(rbfCache.getRbfTrees(parsedMessage['track-rbf'] === 'fullRbf'));
} else {
client['track-rbf'] = false;
}
}
if (parsedMessage.action === 'init') {
if (!this.initData['blocks']?.length || !this.initData['da']) {
this.updateInitData();
if (!this.socketData['blocks']?.length || !this.socketData['da']) {
this.updateSocketData();
}
if (!this.initData['blocks']?.length) {
if (!this.socketData['blocks']?.length) {
return;
}
client.send(this.serializedInitData);
}
if (parsedMessage.action === 'ping') {
response['pong'] = true;
response['pong'] = JSON.stringify(true);
}
if (parsedMessage['track-donation'] && parsedMessage['track-donation'].length === 22) {
@@ -221,7 +252,8 @@ class WebsocketHandler {
}
if (Object.keys(response).length) {
client.send(JSON.stringify(response));
const serializedResponse = this.serializeResponse(response);
client.send(serializedResponse);
}
} catch (e) {
logger.debug('Error parsing websocket message: ' + (e instanceof Error ? e.message : e));
@@ -250,7 +282,7 @@ class WebsocketHandler {
throw new Error('WebSocket.Server is not set');
}
this.setInitDataFields({ 'loadingIndicators': indicators });
this.updateSocketDataFields({ 'loadingIndicators': indicators });
const response = JSON.stringify({ loadingIndicators: indicators });
this.wss.clients.forEach((client) => {
@@ -266,7 +298,7 @@ class WebsocketHandler {
throw new Error('WebSocket.Server is not set');
}
this.setInitDataFields({ 'conversions': conversionRates });
this.updateSocketDataFields({ 'conversions': conversionRates });
const response = JSON.stringify({ conversions: conversionRates });
this.wss.clients.forEach((client) => {
@@ -336,11 +368,21 @@ class WebsocketHandler {
memPool.addToSpendMap(newTransactions);
const recommendedFees = feeApi.getRecommendedFee();
const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
// update init data
this.updateInitData();
this.updateSocketDataFields({
'mempoolInfo': mempoolInfo,
'vBytesPerSecond': vBytesPerSecond,
'mempool-blocks': mBlocks,
'transactions': latestTransactions,
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined,
'fees': recommendedFees,
});
// cache serialized objects to avoid stringify-ing the same thing for every client
const responseCache = { ...this.initData };
const responseCache = { ...this.socketData };
function getCachedResponse(key: string, data): string {
if (!responseCache[key]) {
responseCache[key] = JSON.stringify(data);
@@ -371,8 +413,6 @@ class WebsocketHandler {
}
}
const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx));
this.wss.clients.forEach(async (client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
@@ -490,7 +530,7 @@ class WebsocketHandler {
if (rbfReplacedBy) {
response['rbfTransaction'] = JSON.stringify({
txid: rbfReplacedBy,
})
});
}
const rbfChange = rbfChanges.map[client['track-tx']];
@@ -524,9 +564,7 @@ class WebsocketHandler {
}
if (Object.keys(response).length) {
const serializedResponse = '{'
+ Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
+ '}';
const serializedResponse = this.serializeResponse(response);
client.send(serializedResponse);
}
});
@@ -626,11 +664,19 @@ class WebsocketHandler {
const da = difficultyAdjustment.getDifficultyAdjustment();
const fees = feeApi.getRecommendedFee();
const mempoolInfo = memPool.getMempoolInfo();
// update init data
this.updateInitData();
this.updateSocketDataFields({
'mempoolInfo': mempoolInfo,
'blocks': [...blocks.getBlocks(), block].slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT),
'mempool-blocks': mBlocks,
'loadingIndicators': loadingIndicators.getLoadingIndicators(),
'da': da?.previousTime ? da : undefined,
'fees': fees,
});
const responseCache = { ...this.initData };
const responseCache = { ...this.socketData };
function getCachedResponse(key, data): string {
if (!responseCache[key]) {
responseCache[key] = JSON.stringify(data);
@@ -638,22 +684,26 @@ class WebsocketHandler {
return responseCache[key];
}
const mempoolInfo = memPool.getMempoolInfo();
this.wss.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
if (!client['want-blocks']) {
return;
const response = {};
if (client['want-blocks']) {
response['block'] = getCachedResponse('block', block);
}
const response = {};
response['block'] = getCachedResponse('block', block);
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['da'] = getCachedResponse('da', da?.previousTime ? da : undefined);
response['fees'] = getCachedResponse('fees', fees);
if (client['want-stats']) {
response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo);
response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond());
response['fees'] = getCachedResponse('fees', fees);
if (da?.previousTime) {
response['da'] = getCachedResponse('da', da);
}
}
if (mBlocks && client['want-mempool-blocks']) {
response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks);
@@ -748,11 +798,19 @@ class WebsocketHandler {
}
}
const serializedResponse = '{'
if (Object.keys(response).length) {
const serializedResponse = this.serializeResponse(response);
client.send(serializedResponse);
}
});
}
// takes a dictionary of JSON serialized values
// and zips it together into a valid JSON object
private serializeResponse(response): string {
return '{'
+ Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ')
+ '}';
client.send(serializedResponse);
});
}
private printLogs(): void {

View File

@@ -30,7 +30,7 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
}
public async query<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
OkPacket[] | ResultSetHeader>(query, params?): Promise<[T, FieldPacket[]]>
OkPacket[] | ResultSetHeader>(query, params?, connection?: PoolConnection): Promise<[T, FieldPacket[]]>
{
this.checkDBFlag();
let hardTimeout;
@@ -45,7 +45,9 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
reject(new Error(`DB query failed to return, reject or time out within ${hardTimeout / 1000}s - ${query?.sql?.slice(0, 160) || (typeof(query) === 'string' || query instanceof String ? query?.slice(0, 160) : 'unknown query')}`));
}, hardTimeout);
this.getPool().then(pool => {
// Use a specific connection if provided, otherwise delegate to the pool
const connectionPromise = connection ? Promise.resolve(connection) : this.getPool();
connectionPromise.then((pool: PoolConnection | Pool) => {
return pool.query(query, params) as Promise<[T, FieldPacket[]]>;
}).then(result => {
resolve(result);
@@ -61,6 +63,33 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
}
}
public async $atomicQuery<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
OkPacket[] | ResultSetHeader>(queries: { query, params }[]): Promise<[T, FieldPacket[]][]>
{
const pool = await this.getPool();
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
const results: [T, FieldPacket[]][] = [];
for (const query of queries) {
const result = await this.query(query.query, query.params, connection) as [T, FieldPacket[]];
results.push(result);
}
await connection.commit();
return results;
} catch (e) {
logger.err('Could not complete db transaction, rolling back: ' + (e instanceof Error ? e.message : e));
connection.rollback();
connection.release();
throw e;
} finally {
connection.release();
}
}
public async checkDbConnection() {
this.checkDBFlag();
try {

View File

@@ -150,7 +150,7 @@ class Server {
if (config.BISQ.ENABLED) {
bisq.startBisqService();
bisq.setPriceCallbackFunction((price) => websocketHandler.setExtraInitProperties('bsq-price', price));
bisq.setPriceCallbackFunction((price) => websocketHandler.setExtraInitData('bsq-price', price));
blocks.setNewBlockCallback(bisq.handleNewBitcoinBlock.bind(bisq));
bisqMarkets.startBisqService();
}

View File

@@ -5,52 +5,10 @@ import { Ancestor, CpfpCluster } from '../mempool.interfaces';
import transactionRepository from '../repositories/TransactionRepository';
class CpfpRepository {
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
if (!txs[0]) {
return false;
}
// skip clusters of transactions with the same fees
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
const equalFee = txs.length > 1 && txs.reduce((acc, tx) => {
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
}, true);
if (equalFee) {
return false;
}
try {
const packedTxs = Buffer.from(this.pack(txs));
await DB.query(
`
INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
VALUE (UNHEX(?), ?, ?, ?)
ON DUPLICATE KEY UPDATE
height = ?,
txs = ?,
fee_rate = ?
`,
[clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
);
const maxChunk = 10;
let chunkIndex = 0;
while (chunkIndex < txs.length) {
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
return { txid: tx.txid, cluster: clusterRoot };
});
await transactionRepository.$batchSetCluster(chunk);
chunkIndex += maxChunk;
}
return true;
} catch (e: any) {
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
throw e;
}
}
public async $batchSaveClusters(clusters: { root: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number }[]): Promise<boolean> {
try {
const clusterValues: any[] = [];
const txs: any[] = [];
const clusterValues: [string, number, Buffer, number][] = [];
const txs: { txid: string, cluster: string }[] = [];
for (const cluster of clusters) {
if (cluster.txs?.length) {
@@ -76,16 +34,10 @@ class CpfpRepository {
return false;
}
const queries: { query, params }[] = [];
const maxChunk = 100;
let chunkIndex = 0;
// insert transactions in batches of up to 100 rows
while (chunkIndex < txs.length) {
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
await transactionRepository.$batchSetCluster(chunk);
chunkIndex += maxChunk;
}
chunkIndex = 0;
// insert clusters in batches of up to 100 rows
while (chunkIndex < clusterValues.length) {
const chunk = clusterValues.slice(chunkIndex, chunkIndex + maxChunk);
@@ -97,12 +49,23 @@ class CpfpRepository {
return (' (UNHEX(?), ?, ?, ?)');
}) + ';';
const values = chunk.flat();
await DB.query(
queries.push({
query,
values
);
params: values,
});
chunkIndex += maxChunk;
}
chunkIndex = 0;
// insert transactions in batches of up to 100 rows
while (chunkIndex < txs.length) {
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
queries.push(transactionRepository.buildBatchSetQuery(chunk));
chunkIndex += maxChunk;
}
await DB.$atomicQuery(queries);
return true;
} catch (e: any) {
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
@@ -120,8 +83,8 @@ class CpfpRepository {
[clusterRoot]
);
const cluster = clusterRows[0];
cluster.effectiveFeePerVsize = cluster.fee_rate;
if (cluster?.txs) {
cluster.effectiveFeePerVsize = cluster.fee_rate;
cluster.txs = this.unpack(cluster.txs);
return cluster;
}

View File

@@ -25,9 +25,8 @@ class TransactionRepository {
}
}
public async $batchSetCluster(txs): Promise<void> {
try {
let query = `
public buildBatchSetQuery(txs: { txid: string, cluster: string }[]): { query, params } {
let query = `
INSERT IGNORE INTO compact_transactions
(
txid,
@@ -35,13 +34,22 @@ class TransactionRepository {
)
VALUES
`;
query += txs.map(tx => {
return (' (UNHEX(?), UNHEX(?))');
}) + ';';
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
query += txs.map(tx => {
return (' (UNHEX(?), UNHEX(?))');
}) + ';';
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
return {
query,
params: values,
};
}
public async $batchSetCluster(txs): Promise<void> {
try {
const query = this.buildBatchSetQuery(txs);
await DB.query(
query,
values
query.query,
query.params,
);
} catch (e: any) {
logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));