Merge branch 'master' into nymkappa/internal-price-rest-api

This commit is contained in:
nymkappa
2024-12-06 10:17:52 +01:00
27 changed files with 343 additions and 88 deletions

View File

@@ -7,7 +7,7 @@ import cpfpRepository from '../repositories/CpfpRepository';
import { RowDataPacket } from 'mysql2';
class DatabaseMigration {
private static currentVersion = 83;
private static currentVersion = 93;
private queryTimeout = 3600_000;
private statisticsAddedIndexed = false;
private uniqueLogs: string[] = [];
@@ -710,6 +710,97 @@ class DatabaseMigration {
await this.$executeQuery('ALTER TABLE `blocks` ADD first_seen datetime(6) DEFAULT NULL');
await this.updateToSchemaVersion(83);
}
// add new pools indexes
if (databaseSchemaVersion < 84 && isBitcoin === true) {
await this.$executeQuery(`
ALTER TABLE \`pools\`
ADD INDEX \`slug\` (\`slug\`),
ADD INDEX \`unique_id\` (\`unique_id\`)
`);
await this.updateToSchemaVersion(84);
}
// lightning channels indexes
if (databaseSchemaVersion < 85 && isBitcoin === true) {
await this.$executeQuery(`
ALTER TABLE \`channels\`
ADD INDEX \`created\` (\`created\`),
ADD INDEX \`capacity\` (\`capacity\`),
ADD INDEX \`closing_reason\` (\`closing_reason\`),
ADD INDEX \`closing_resolved\` (\`closing_resolved\`)
`);
await this.updateToSchemaVersion(85);
}
// lightning nodes indexes
if (databaseSchemaVersion < 86 && isBitcoin === true) {
await this.$executeQuery(`
ALTER TABLE \`nodes\`
ADD INDEX \`status\` (\`status\`),
ADD INDEX \`channels\` (\`channels\`),
ADD INDEX \`country_id\` (\`country_id\`),
ADD INDEX \`as_number\` (\`as_number\`),
ADD INDEX \`first_seen\` (\`first_seen\`)
`);
await this.updateToSchemaVersion(86);
}
// lightning node sockets indexes
if (databaseSchemaVersion < 87 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `nodes_sockets` ADD INDEX `type` (`type`)');
await this.updateToSchemaVersion(87);
}
// lightning stats indexes
if (databaseSchemaVersion < 88 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `lightning_stats` ADD INDEX `added` (`added`)');
await this.updateToSchemaVersion(88);
}
// geo names indexes
if (databaseSchemaVersion < 89 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `geo_names` ADD INDEX `names` (`names`)');
await this.updateToSchemaVersion(89);
}
// hashrates indexes
if (databaseSchemaVersion < 90 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `hashrates` ADD INDEX `type` (`type`)');
await this.updateToSchemaVersion(90);
}
// block audits indexes
if (databaseSchemaVersion < 91 && isBitcoin === true) {
await this.$executeQuery('ALTER TABLE `blocks_audits` ADD INDEX `time` (`time`)');
await this.updateToSchemaVersion(91);
}
// elements_pegs indexes
if (databaseSchemaVersion < 92 && config.MEMPOOL.NETWORK === 'liquid') {
await this.$executeQuery(`
ALTER TABLE \`elements_pegs\`
ADD INDEX \`block\` (\`block\`),
ADD INDEX \`datetime\` (\`datetime\`),
ADD INDEX \`amount\` (\`amount\`),
ADD INDEX \`bitcoinaddress\` (\`bitcoinaddress\`),
ADD INDEX \`bitcointxid\` (\`bitcointxid\`)
`);
await this.updateToSchemaVersion(92);
}
// federation_txos indexes
if (databaseSchemaVersion < 93 && config.MEMPOOL.NETWORK === 'liquid') {
await this.$executeQuery(`
ALTER TABLE \`federation_txos\`
ADD INDEX \`unspent\` (\`unspent\`),
ADD INDEX \`lastblockupdate\` (\`lastblockupdate\`),
ADD INDEX \`blocktime\` (\`blocktime\`),
ADD INDEX \`emergencyKey\` (\`emergencyKey\`),
ADD INDEX \`expiredAt\` (\`expiredAt\`)
`);
await this.updateToSchemaVersion(93);
}
}
/**

View File

@@ -382,7 +382,7 @@ class MempoolBlocks {
const ancestors: Ancestor[] = [];
const descendants: Ancestor[] = [];
let ancestor: MempoolTransactionExtended
let ancestor: MempoolTransactionExtended;
for (const cluster of clusters) {
for (const memberTxid of cluster) {
const mempoolTx = mempool[memberTxid];
@@ -462,7 +462,7 @@ class MempoolBlocks {
for (let i = 0; i < block.length; i++) {
const txid = block[i];
if (txid) {
if (txid in mempool) {
mempoolTx = mempool[txid];
// save position in projected blocks
mempoolTx.position = {
@@ -481,6 +481,9 @@ class MempoolBlocks {
mempoolTx.acceleratedAt = acceleration?.added;
mempoolTx.feeDelta = acceleration?.feeDelta;
for (const ancestor of mempoolTx.ancestors || []) {
if (!(ancestor.txid in mempool)) {
continue;
}
if (!mempool[ancestor.txid].acceleration) {
mempool[ancestor.txid].cpfpDirty = true;
}
@@ -688,7 +691,7 @@ class MempoolBlocks {
[pool: string]: { name: string, block: number, vsize: number, accelerations: string[], complete: boolean };
} = {};
// prepare a list of accelerations in ascending order (we'll pop items off the end of the list)
const accQueue: { acceleration: Acceleration, rate: number, vsize: number }[] = Object.values(accelerations).map(acc => {
const accQueue: { acceleration: Acceleration, rate: number, vsize: number }[] = Object.values(accelerations).filter(acc => acc.txid in mempoolCache).map(acc => {
let vsize = mempoolCache[acc.txid].vsize;
for (const ancestor of mempoolCache[acc.txid].ancestors || []) {
vsize += (ancestor.weight / 4);

View File

@@ -246,17 +246,22 @@ class AccelerationApi {
this.startedWebsocketLoop = true;
if (!this.ws) {
this.ws = new WebSocket(this.websocketPath);
this.websocketConnected = true;
this.lastPing = 0;
this.ws.on('open', () => {
logger.info(`Acceleration websocket opened to ${this.websocketPath}`);
this.websocketConnected = true;
this.ws?.send(JSON.stringify({
'watch-accelerations': true
}));
});
this.ws.on('error', (error) => {
logger.err(`Acceleration websocket error on ${this.websocketPath}: ` + error);
let errMsg = `Acceleration websocket error on ${this.websocketPath}: ${error['code']}`;
if (error['errors']) {
errMsg += ' - ' + error['errors'].join(' - ');
}
logger.err(errMsg);
this.ws = null;
this.websocketConnected = false;
});
@@ -285,16 +290,28 @@ class AccelerationApi {
logger.debug('received pong from acceleration websocket server');
this.lastPong = Date.now();
});
} else {
if (this.lastPing > this.lastPong && Date.now() - this.lastPing > 10000) {
} else if (this.websocketConnected) {
if (this.lastPing && this.lastPing > this.lastPong && (Date.now() - this.lastPing > 10000)) {
logger.warn('No pong received within 10 seconds, terminating connection');
this.ws.terminate();
this.ws = null;
this.websocketConnected = false;
} else if (Date.now() - this.lastPing > 30000) {
try {
this.ws?.terminate();
} catch (e) {
logger.warn('failed to terminate acceleration websocket connection: ' + (e instanceof Error ? e.message : e));
} finally {
this.ws = null;
this.websocketConnected = false;
this.lastPing = 0;
}
} else if (!this.lastPing || (Date.now() - this.lastPing > 30000)) {
logger.debug('sending ping to acceleration websocket server');
this.ws.ping();
this.lastPing = Date.now();
if (this.ws?.readyState === WebSocket.OPEN) {
try {
this.ws?.ping();
this.lastPing = Date.now();
} catch (e) {
logger.warn('failed to send ping to acceleration websocket server: ' + (e instanceof Error ? e.message : e));
}
}
}
}
await new Promise(resolve => setTimeout(resolve, 5000));