Merge branch 'master' into nymkappa/feature/rename-mining-pool
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
{
|
||||
"MEMPOOL": {
|
||||
"ENABLED": true,
|
||||
"NETWORK": "__MEMPOOL_NETWORK__",
|
||||
"BACKEND": "__MEMPOOL_BACKEND__",
|
||||
"ENABLED": true,
|
||||
"BLOCKS_SUMMARIES_INDEXING": true,
|
||||
"HTTP_PORT": 1,
|
||||
"SPAWN_CLUSTER_PROCS": 2,
|
||||
|
||||
@@ -13,6 +13,7 @@ describe('Mempool Backend Config', () => {
|
||||
const config = jest.requireActual('../config').default;
|
||||
|
||||
expect(config.MEMPOOL).toStrictEqual({
|
||||
ENABLED: true,
|
||||
NETWORK: 'mainnet',
|
||||
BACKEND: 'none',
|
||||
BLOCKS_SUMMARIES_INDEXING: false,
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import logger from '../logger';
|
||||
import { BlockExtended, TransactionExtended, MempoolBlockWithTransactions } from '../mempool.interfaces';
|
||||
import config from '../config';
|
||||
import bitcoinApi from './bitcoin/bitcoin-api-factory';
|
||||
import { Common } from './common';
|
||||
import { TransactionExtended, MempoolBlockWithTransactions, AuditScore } from '../mempool.interfaces';
|
||||
import blocksRepository from '../repositories/BlocksRepository';
|
||||
import blocksAuditsRepository from '../repositories/BlocksAuditsRepository';
|
||||
import blocks from '../api/blocks';
|
||||
|
||||
const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners
|
||||
|
||||
@@ -44,8 +49,6 @@ class Audit {
|
||||
|
||||
displacedWeight += (4000 - transactions[0].weight);
|
||||
|
||||
logger.warn(`${fresh.length} fresh, ${Object.keys(isCensored).length} possibly censored, ${displacedWeight} displaced weight`);
|
||||
|
||||
// we can expect an honest miner to include 'displaced' transactions in place of recent arrivals and censored txs
|
||||
// these displaced transactions should occupy the first N weight units of the next projected block
|
||||
let displacedWeightRemaining = displacedWeight;
|
||||
@@ -73,6 +76,7 @@ class Audit {
|
||||
|
||||
// mark unexpected transactions in the mined block as 'added'
|
||||
let overflowWeight = 0;
|
||||
let totalWeight = 0;
|
||||
for (const tx of transactions) {
|
||||
if (inTemplate[tx.txid]) {
|
||||
matches.push(tx.txid);
|
||||
@@ -82,11 +86,13 @@ class Audit {
|
||||
}
|
||||
overflowWeight += tx.weight;
|
||||
}
|
||||
totalWeight += tx.weight;
|
||||
}
|
||||
|
||||
// transactions missing from near the end of our template are probably not being censored
|
||||
let overflowWeightRemaining = overflowWeight;
|
||||
let lastOverflowRate = 1.00;
|
||||
let overflowWeightRemaining = overflowWeight - (config.MEMPOOL.BLOCK_WEIGHT_UNITS - totalWeight);
|
||||
let maxOverflowRate = 0;
|
||||
let rateThreshold = 0;
|
||||
index = projectedBlocks[0].transactionIds.length - 1;
|
||||
while (index >= 0) {
|
||||
const txid = projectedBlocks[0].transactionIds[index];
|
||||
@@ -94,8 +100,11 @@ class Audit {
|
||||
if (isCensored[txid]) {
|
||||
delete isCensored[txid];
|
||||
}
|
||||
lastOverflowRate = mempool[txid].effectiveFeePerVsize;
|
||||
} else if (Math.floor(mempool[txid].effectiveFeePerVsize * 100) <= Math.ceil(lastOverflowRate * 100)) { // tolerance of 0.01 sat/vb
|
||||
if (mempool[txid].effectiveFeePerVsize > maxOverflowRate) {
|
||||
maxOverflowRate = mempool[txid].effectiveFeePerVsize;
|
||||
rateThreshold = (Math.ceil(maxOverflowRate * 100) / 100) + 0.005;
|
||||
}
|
||||
} else if (mempool[txid].effectiveFeePerVsize <= rateThreshold) { // tolerance of 0.01 sat/vb + rounding
|
||||
if (isCensored[txid]) {
|
||||
delete isCensored[txid];
|
||||
}
|
||||
@@ -113,6 +122,45 @@ class Audit {
|
||||
score
|
||||
};
|
||||
}
|
||||
|
||||
public async $getBlockAuditScores(fromHeight?: number, limit: number = 15): Promise<AuditScore[]> {
|
||||
let currentHeight = fromHeight !== undefined ? fromHeight : await blocksRepository.$mostRecentBlockHeight();
|
||||
const returnScores: AuditScore[] = [];
|
||||
|
||||
if (currentHeight < 0) {
|
||||
return returnScores;
|
||||
}
|
||||
|
||||
for (let i = 0; i < limit && currentHeight >= 0; i++) {
|
||||
const block = blocks.getBlocks().find((b) => b.height === currentHeight);
|
||||
if (block?.extras?.matchRate != null) {
|
||||
returnScores.push({
|
||||
hash: block.id,
|
||||
matchRate: block.extras.matchRate
|
||||
});
|
||||
} else {
|
||||
let currentHash;
|
||||
if (!currentHash && Common.indexingEnabled()) {
|
||||
const dbBlock = await blocksRepository.$getBlockByHeight(currentHeight);
|
||||
if (dbBlock && dbBlock['id']) {
|
||||
currentHash = dbBlock['id'];
|
||||
}
|
||||
}
|
||||
if (!currentHash) {
|
||||
currentHash = await bitcoinApi.$getBlockHash(currentHeight);
|
||||
}
|
||||
if (currentHash) {
|
||||
const auditScore = await blocksAuditsRepository.$getBlockAuditScore(currentHash);
|
||||
returnScores.push({
|
||||
hash: currentHash,
|
||||
matchRate: auditScore?.matchRate
|
||||
});
|
||||
}
|
||||
}
|
||||
currentHeight--;
|
||||
}
|
||||
return returnScores;
|
||||
}
|
||||
}
|
||||
|
||||
export default new Audit();
|
||||
@@ -130,7 +130,7 @@ class Blocks {
|
||||
const stripped = block.tx.map((tx) => {
|
||||
return {
|
||||
txid: tx.txid,
|
||||
vsize: tx.vsize,
|
||||
vsize: tx.weight / 4,
|
||||
fee: tx.fee ? Math.round(tx.fee * 100000000) : 0,
|
||||
value: Math.round(tx.vout.reduce((acc, vout) => acc + (vout.value ? vout.value : 0), 0) * 100000000)
|
||||
};
|
||||
@@ -195,9 +195,9 @@ class Blocks {
|
||||
};
|
||||
}
|
||||
|
||||
const auditSummary = await BlocksAuditsRepository.$getShortBlockAudit(block.id);
|
||||
if (auditSummary) {
|
||||
blockExtended.extras.matchRate = auditSummary.matchRate;
|
||||
const auditScore = await BlocksAuditsRepository.$getBlockAuditScore(block.id);
|
||||
if (auditScore != null) {
|
||||
blockExtended.extras.matchRate = auditScore.matchRate;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import logger from '../logger';
|
||||
import { Common } from './common';
|
||||
|
||||
class DatabaseMigration {
|
||||
private static currentVersion = 41;
|
||||
private static currentVersion = 43;
|
||||
private queryTimeout = 120000;
|
||||
private statisticsAddedIndexed = false;
|
||||
private uniqueLogs: string[] = [];
|
||||
@@ -352,6 +352,14 @@ class DatabaseMigration {
|
||||
if (databaseSchemaVersion < 41 && isBitcoin === true) {
|
||||
await this.$executeQuery('UPDATE channels SET closing_reason = NULL WHERE closing_reason = 1');
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 42 && isBitcoin === true) {
|
||||
await this.$executeQuery('ALTER TABLE `channels` ADD closing_resolved tinyint(1) DEFAULT 0');
|
||||
}
|
||||
|
||||
if (databaseSchemaVersion < 43 && isBitcoin === true) {
|
||||
await this.$executeQuery(this.getCreateLNNodeRecordsTableQuery(), await this.$checkIfTableExists('nodes_records'));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -787,6 +795,19 @@ class DatabaseMigration {
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
private getCreateLNNodeRecordsTableQuery(): string {
|
||||
return `CREATE TABLE IF NOT EXISTS nodes_records (
|
||||
public_key varchar(66) NOT NULL,
|
||||
type int(10) unsigned NOT NULL,
|
||||
payload blob NOT NULL,
|
||||
UNIQUE KEY public_key_type (public_key, type),
|
||||
INDEX (public_key),
|
||||
FOREIGN KEY (public_key)
|
||||
REFERENCES nodes (public_key)
|
||||
ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
|
||||
}
|
||||
|
||||
public async $truncateIndexedData(tables: string[]) {
|
||||
const allowedTables = ['blocks', 'hashrates', 'prices'];
|
||||
|
||||
|
||||
@@ -117,6 +117,17 @@ class ChannelsApi {
|
||||
}
|
||||
}
|
||||
|
||||
public async $getUnresolvedClosedChannels(): Promise<any[]> {
|
||||
try {
|
||||
const query = `SELECT * FROM channels WHERE status = 2 AND closing_reason = 2 AND closing_resolved = 0 AND closing_transaction_id != ''`;
|
||||
const [rows]: any = await DB.query(query);
|
||||
return rows;
|
||||
} catch (e) {
|
||||
logger.err('$getUnresolvedClosedChannels error: ' + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async $getChannelsWithoutCreatedDate(): Promise<any[]> {
|
||||
try {
|
||||
const query = `SELECT * FROM channels WHERE created IS NULL`;
|
||||
|
||||
@@ -105,6 +105,18 @@ class NodesApi {
|
||||
node.closed_channel_count = rows[0].closed_channel_count;
|
||||
}
|
||||
|
||||
// Custom records
|
||||
query = `
|
||||
SELECT type, payload
|
||||
FROM nodes_records
|
||||
WHERE public_key = ?
|
||||
`;
|
||||
[rows] = await DB.query(query, [public_key]);
|
||||
node.custom_records = {};
|
||||
for (const record of rows) {
|
||||
node.custom_records[record.type] = Buffer.from(record.payload, 'binary').toString('hex');
|
||||
}
|
||||
|
||||
return node;
|
||||
} catch (e) {
|
||||
logger.err(`Cannot get node information for ${public_key}. Reason: ${(e instanceof Error ? e.message : e)}`);
|
||||
|
||||
@@ -7,6 +7,15 @@ import { Common } from '../../common';
|
||||
* Convert a clightning "listnode" entry to a lnd node entry
|
||||
*/
|
||||
export function convertNode(clNode: any): ILightningApi.Node {
|
||||
let custom_records: { [type: number]: string } | undefined = undefined;
|
||||
if (clNode.option_will_fund) {
|
||||
try {
|
||||
custom_records = { '1': Buffer.from(clNode.option_will_fund.compact_lease || '', 'hex').toString('base64') };
|
||||
} catch (e) {
|
||||
logger.err(`Cannot decode option_will_fund compact_lease for ${clNode.nodeid}). Reason: ` + (e instanceof Error ? e.message : e));
|
||||
custom_records = undefined;
|
||||
}
|
||||
}
|
||||
return {
|
||||
alias: clNode.alias ?? '',
|
||||
color: `#${clNode.color ?? ''}`,
|
||||
@@ -23,6 +32,7 @@ export function convertNode(clNode: any): ILightningApi.Node {
|
||||
};
|
||||
}) ?? [],
|
||||
last_update: clNode?.last_timestamp ?? 0,
|
||||
custom_records
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ export namespace ILightningApi {
|
||||
}[];
|
||||
color: string;
|
||||
features: { [key: number]: Feature };
|
||||
custom_records?: { [type: number]: string };
|
||||
}
|
||||
|
||||
export interface Info {
|
||||
|
||||
@@ -103,12 +103,11 @@ class Mempool {
|
||||
return txTimes;
|
||||
}
|
||||
|
||||
public async $updateMempool() {
|
||||
logger.debug('Updating mempool');
|
||||
public async $updateMempool(): Promise<void> {
|
||||
logger.debug(`Updating mempool...`);
|
||||
const start = new Date().getTime();
|
||||
let hasChange: boolean = false;
|
||||
const currentMempoolSize = Object.keys(this.mempoolCache).length;
|
||||
let txCount = 0;
|
||||
const transactions = await bitcoinApi.$getRawMempool();
|
||||
const diff = transactions.length - currentMempoolSize;
|
||||
const newTransactions: TransactionExtended[] = [];
|
||||
@@ -124,7 +123,6 @@ class Mempool {
|
||||
try {
|
||||
const transaction = await transactionUtils.$getTransactionExtended(txid);
|
||||
this.mempoolCache[txid] = transaction;
|
||||
txCount++;
|
||||
if (this.inSync) {
|
||||
this.txPerSecondArray.push(new Date().getTime());
|
||||
this.vBytesPerSecondArray.push({
|
||||
@@ -133,14 +131,9 @@ class Mempool {
|
||||
});
|
||||
}
|
||||
hasChange = true;
|
||||
if (diff > 0) {
|
||||
logger.debug('Fetched transaction ' + txCount + ' / ' + diff);
|
||||
} else {
|
||||
logger.debug('Fetched transaction ' + txCount);
|
||||
}
|
||||
newTransactions.push(transaction);
|
||||
} catch (e) {
|
||||
logger.debug('Error finding transaction in mempool: ' + (e instanceof Error ? e.message : e));
|
||||
logger.debug(`Error finding transaction '${txid}' in the mempool: ` + (e instanceof Error ? e.message : e));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,8 +190,7 @@ class Mempool {
|
||||
|
||||
const end = new Date().getTime();
|
||||
const time = end - start;
|
||||
logger.debug(`New mempool size: ${Object.keys(this.mempoolCache).length} Change: ${diff}`);
|
||||
logger.debug('Mempool updated in ' + time / 1000 + ' seconds');
|
||||
logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`);
|
||||
}
|
||||
|
||||
public handleRbfTransactions(rbfTransactions: { [txid: string]: TransactionExtended; }) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Application, Request, Response } from 'express';
|
||||
import config from "../../config";
|
||||
import logger from '../../logger';
|
||||
import audits from '../audit';
|
||||
import BlocksAuditsRepository from '../../repositories/BlocksAuditsRepository';
|
||||
import BlocksRepository from '../../repositories/BlocksRepository';
|
||||
import DifficultyAdjustmentsRepository from '../../repositories/DifficultyAdjustmentsRepository';
|
||||
@@ -26,7 +27,11 @@ class MiningRoutes {
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/sizes-weights/:interval', this.$getHistoricalBlockSizeAndWeight)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/difficulty-adjustments/:interval', this.$getDifficultyAdjustments)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/predictions/:interval', this.$getHistoricalBlockPrediction)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/audit/scores', this.$getBlockAuditScores)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/audit/scores/:height', this.$getBlockAuditScores)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/audit/score/:hash', this.$getBlockAuditScore)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/audit/:hash', this.$getBlockAudit)
|
||||
.get(config.MEMPOOL.API_URL_PREFIX + 'mining/blocks/timestamp/:timestamp', this.$getHeightFromTimestamp)
|
||||
;
|
||||
}
|
||||
|
||||
@@ -252,6 +257,52 @@ class MiningRoutes {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
}
|
||||
|
||||
private async $getHeightFromTimestamp(req: Request, res: Response) {
|
||||
try {
|
||||
const timestamp = parseInt(req.params.timestamp, 10);
|
||||
// This will prevent people from entering milliseconds etc.
|
||||
// Block timestamps are allowed to be up to 2 hours off, so 24 hours
|
||||
// will never put the maximum value before the most recent block
|
||||
const nowPlus1day = Math.floor(Date.now() / 1000) + 60 * 60 * 24;
|
||||
// Prevent non-integers that are not seconds
|
||||
if (!/^[1-9][0-9]*$/.test(req.params.timestamp) || timestamp > nowPlus1day) {
|
||||
throw new Error(`Invalid timestamp, value must be Unix seconds`);
|
||||
}
|
||||
const result = await BlocksRepository.$getBlockHeightFromTimestamp(
|
||||
timestamp,
|
||||
);
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 300).toUTCString());
|
||||
res.json(result);
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
}
|
||||
|
||||
private async $getBlockAuditScores(req: Request, res: Response) {
|
||||
try {
|
||||
const height = req.params.height === undefined ? undefined : parseInt(req.params.height, 10);
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 60).toUTCString());
|
||||
res.json(await audits.$getBlockAuditScores(height, 15));
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
}
|
||||
|
||||
public async $getBlockAuditScore(req: Request, res: Response) {
|
||||
try {
|
||||
const audit = await BlocksAuditsRepository.$getBlockAuditScore(req.params.hash);
|
||||
|
||||
res.header('Pragma', 'public');
|
||||
res.header('Cache-control', 'public');
|
||||
res.setHeader('Expires', new Date(Date.now() + 1000 * 3600 * 24).toUTCString());
|
||||
res.json(audit || 'null');
|
||||
} catch (e) {
|
||||
res.status(500).send(e instanceof Error ? e.message : e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new MiningRoutes();
|
||||
|
||||
@@ -4,6 +4,7 @@ const configFromFile = require(
|
||||
|
||||
interface IConfig {
|
||||
MEMPOOL: {
|
||||
ENABLED: boolean;
|
||||
NETWORK: 'mainnet' | 'testnet' | 'signet' | 'liquid' | 'liquidtestnet';
|
||||
BACKEND: 'esplora' | 'electrum' | 'none';
|
||||
HTTP_PORT: number;
|
||||
@@ -119,6 +120,7 @@ interface IConfig {
|
||||
|
||||
const defaults: IConfig = {
|
||||
'MEMPOOL': {
|
||||
'ENABLED': true,
|
||||
'NETWORK': 'mainnet',
|
||||
'BACKEND': 'none',
|
||||
'HTTP_PORT': 8999,
|
||||
@@ -224,11 +226,11 @@ const defaults: IConfig = {
|
||||
'BISQ_URL': 'https://bisq.markets/api',
|
||||
'BISQ_ONION': 'http://bisqmktse2cabavbr2xjq7xw3h6g5ottemo5rolfcwt6aly6tp5fdryd.onion/api'
|
||||
},
|
||||
"MAXMIND": {
|
||||
'MAXMIND': {
|
||||
'ENABLED': false,
|
||||
"GEOLITE2_CITY": "/usr/local/share/GeoIP/GeoLite2-City.mmdb",
|
||||
"GEOLITE2_ASN": "/usr/local/share/GeoIP/GeoLite2-ASN.mmdb",
|
||||
"GEOIP2_ISP": "/usr/local/share/GeoIP/GeoIP2-ISP.mmdb"
|
||||
'GEOLITE2_CITY': '/usr/local/share/GeoIP/GeoLite2-City.mmdb',
|
||||
'GEOLITE2_ASN': '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb',
|
||||
'GEOIP2_ISP': '/usr/local/share/GeoIP/GeoIP2-ISP.mmdb'
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import express from "express";
|
||||
import express from 'express';
|
||||
import { Application, Request, Response, NextFunction } from 'express';
|
||||
import * as http from 'http';
|
||||
import * as WebSocket from 'ws';
|
||||
@@ -34,7 +34,7 @@ import miningRoutes from './api/mining/mining-routes';
|
||||
import bisqRoutes from './api/bisq/bisq.routes';
|
||||
import liquidRoutes from './api/liquid/liquid.routes';
|
||||
import bitcoinRoutes from './api/bitcoin/bitcoin.routes';
|
||||
import fundingTxFetcher from "./tasks/lightning/sync-tasks/funding-tx-fetcher";
|
||||
import fundingTxFetcher from './tasks/lightning/sync-tasks/funding-tx-fetcher';
|
||||
|
||||
class Server {
|
||||
private wss: WebSocket.Server | undefined;
|
||||
@@ -74,7 +74,7 @@ class Server {
|
||||
}
|
||||
}
|
||||
|
||||
async startServer(worker = false) {
|
||||
async startServer(worker = false): Promise<void> {
|
||||
logger.notice(`Starting Mempool Server${worker ? ' (worker)' : ''}... (${backendInfo.getShortCommitHash()})`);
|
||||
|
||||
this.app
|
||||
@@ -92,7 +92,9 @@ class Server {
|
||||
this.setUpWebsocketHandling();
|
||||
|
||||
await syncAssets.syncAssets$();
|
||||
diskCache.loadMempoolCache();
|
||||
if (config.MEMPOOL.ENABLED) {
|
||||
diskCache.loadMempoolCache();
|
||||
}
|
||||
|
||||
if (config.DATABASE.ENABLED) {
|
||||
await DB.checkDbConnection();
|
||||
@@ -127,7 +129,10 @@ class Server {
|
||||
fiatConversion.startService();
|
||||
|
||||
this.setUpHttpApiRoutes();
|
||||
this.runMainUpdateLoop();
|
||||
|
||||
if (config.MEMPOOL.ENABLED) {
|
||||
this.runMainUpdateLoop();
|
||||
}
|
||||
|
||||
if (config.BISQ.ENABLED) {
|
||||
bisq.startBisqService();
|
||||
@@ -149,7 +154,7 @@ class Server {
|
||||
});
|
||||
}
|
||||
|
||||
async runMainUpdateLoop() {
|
||||
async runMainUpdateLoop(): Promise<void> {
|
||||
try {
|
||||
try {
|
||||
await memPool.$updateMemPoolInfo();
|
||||
@@ -183,7 +188,7 @@ class Server {
|
||||
}
|
||||
}
|
||||
|
||||
async $runLightningBackend() {
|
||||
async $runLightningBackend(): Promise<void> {
|
||||
try {
|
||||
await fundingTxFetcher.$init();
|
||||
await networkSyncService.$startService();
|
||||
@@ -195,7 +200,7 @@ class Server {
|
||||
};
|
||||
}
|
||||
|
||||
setUpWebsocketHandling() {
|
||||
setUpWebsocketHandling(): void {
|
||||
if (this.wss) {
|
||||
websocketHandler.setWebsocketServer(this.wss);
|
||||
}
|
||||
@@ -209,19 +214,21 @@ class Server {
|
||||
});
|
||||
}
|
||||
websocketHandler.setupConnectionHandling();
|
||||
statistics.setNewStatisticsEntryCallback(websocketHandler.handleNewStatistic.bind(websocketHandler));
|
||||
blocks.setNewBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler));
|
||||
memPool.setMempoolChangedCallback(websocketHandler.handleMempoolChange.bind(websocketHandler));
|
||||
if (config.MEMPOOL.ENABLED) {
|
||||
statistics.setNewStatisticsEntryCallback(websocketHandler.handleNewStatistic.bind(websocketHandler));
|
||||
blocks.setNewBlockCallback(websocketHandler.handleNewBlock.bind(websocketHandler));
|
||||
memPool.setMempoolChangedCallback(websocketHandler.handleMempoolChange.bind(websocketHandler));
|
||||
}
|
||||
fiatConversion.setProgressChangedCallback(websocketHandler.handleNewConversionRates.bind(websocketHandler));
|
||||
loadingIndicators.setProgressChangedCallback(websocketHandler.handleLoadingChanged.bind(websocketHandler));
|
||||
}
|
||||
|
||||
setUpHttpApiRoutes() {
|
||||
|
||||
setUpHttpApiRoutes(): void {
|
||||
bitcoinRoutes.initRoutes(this.app);
|
||||
if (config.STATISTICS.ENABLED && config.DATABASE.ENABLED) {
|
||||
if (config.STATISTICS.ENABLED && config.DATABASE.ENABLED && config.MEMPOOL.ENABLED) {
|
||||
statisticsRoutes.initRoutes(this.app);
|
||||
}
|
||||
if (Common.indexingEnabled()) {
|
||||
if (Common.indexingEnabled() && config.MEMPOOL.ENABLED) {
|
||||
miningRoutes.initRoutes(this.app);
|
||||
}
|
||||
if (config.BISQ.ENABLED) {
|
||||
@@ -238,4 +245,4 @@ class Server {
|
||||
}
|
||||
}
|
||||
|
||||
const server = new Server();
|
||||
((): Server => new Server())();
|
||||
|
||||
@@ -32,6 +32,11 @@ export interface BlockAudit {
|
||||
matchRate: number,
|
||||
}
|
||||
|
||||
export interface AuditScore {
|
||||
hash: string,
|
||||
matchRate?: number,
|
||||
}
|
||||
|
||||
export interface MempoolBlock {
|
||||
blockSize: number;
|
||||
blockVSize: number;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
import { BlockAudit } from '../mempool.interfaces';
|
||||
import { BlockAudit, AuditScore } from '../mempool.interfaces';
|
||||
|
||||
class BlocksAuditRepositories {
|
||||
public async $saveAudit(audit: BlockAudit): Promise<void> {
|
||||
@@ -72,10 +72,10 @@ class BlocksAuditRepositories {
|
||||
}
|
||||
}
|
||||
|
||||
public async $getShortBlockAudit(hash: string): Promise<any> {
|
||||
public async $getBlockAuditScore(hash: string): Promise<AuditScore> {
|
||||
try {
|
||||
const [rows]: any[] = await DB.query(
|
||||
`SELECT hash as id, match_rate as matchRate
|
||||
`SELECT hash, match_rate as matchRate
|
||||
FROM blocks_audits
|
||||
WHERE blocks_audits.hash = "${hash}"
|
||||
`);
|
||||
|
||||
@@ -392,6 +392,36 @@ class BlocksRepository {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the first block at or directly after a given timestamp
|
||||
* @param timestamp number unix time in seconds
|
||||
* @returns The height and timestamp of a block (timestamp might vary from given timestamp)
|
||||
*/
|
||||
public async $getBlockHeightFromTimestamp(
|
||||
timestamp: number,
|
||||
): Promise<{ height: number; hash: string; timestamp: number }> {
|
||||
try {
|
||||
// Get first block at or after the given timestamp
|
||||
const query = `SELECT height, hash, blockTimestamp as timestamp FROM blocks
|
||||
WHERE blockTimestamp <= FROM_UNIXTIME(?)
|
||||
ORDER BY blockTimestamp DESC
|
||||
LIMIT 1`;
|
||||
const params = [timestamp];
|
||||
const [rows]: any[][] = await DB.query(query, params);
|
||||
if (rows.length === 0) {
|
||||
throw new Error(`No block was found before timestamp ${timestamp}`);
|
||||
}
|
||||
|
||||
return rows[0];
|
||||
} catch (e) {
|
||||
logger.err(
|
||||
'Cannot get block height from timestamp from the db. Reason: ' +
|
||||
(e instanceof Error ? e.message : e),
|
||||
);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return blocks height
|
||||
*/
|
||||
|
||||
67
backend/src/repositories/NodeRecordsRepository.ts
Normal file
67
backend/src/repositories/NodeRecordsRepository.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { ResultSetHeader, RowDataPacket } from 'mysql2';
|
||||
import DB from '../database';
|
||||
import logger from '../logger';
|
||||
|
||||
export interface NodeRecord {
|
||||
publicKey: string; // node public key
|
||||
type: number; // TLV extension record type
|
||||
payload: string; // base64 record payload
|
||||
}
|
||||
|
||||
class NodesRecordsRepository {
|
||||
public async $saveRecord(record: NodeRecord): Promise<void> {
|
||||
try {
|
||||
const payloadBytes = Buffer.from(record.payload, 'base64');
|
||||
await DB.query(`
|
||||
INSERT INTO nodes_records(public_key, type, payload)
|
||||
VALUE (?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
payload = ?
|
||||
`, [record.publicKey, record.type, payloadBytes, payloadBytes]);
|
||||
} catch (e: any) {
|
||||
if (e.errno !== 1062) { // ER_DUP_ENTRY - Not an issue, just ignore this
|
||||
logger.err(`Cannot save node record (${[record.publicKey, record.type, record.payload]}) into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
// We don't throw, not a critical issue if we miss some nodes records
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async $getRecordTypes(publicKey: string): Promise<any> {
|
||||
try {
|
||||
const query = `
|
||||
SELECT type FROM nodes_records
|
||||
WHERE public_key = ?
|
||||
`;
|
||||
const [rows] = await DB.query<RowDataPacket[][]>(query, [publicKey]);
|
||||
return rows.map(row => row['type']);
|
||||
} catch (e) {
|
||||
logger.err(`Cannot retrieve custom records for ${publicKey} from db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
public async $deleteUnusedRecords(publicKey: string, recordTypes: number[]): Promise<number> {
|
||||
try {
|
||||
let query;
|
||||
if (recordTypes.length) {
|
||||
query = `
|
||||
DELETE FROM nodes_records
|
||||
WHERE public_key = ?
|
||||
AND type NOT IN (${recordTypes.map(type => `${type}`).join(',')})
|
||||
`;
|
||||
} else {
|
||||
query = `
|
||||
DELETE FROM nodes_records
|
||||
WHERE public_key = ?
|
||||
`;
|
||||
}
|
||||
const [result] = await DB.query<ResultSetHeader>(query, [publicKey]);
|
||||
return result.affectedRows;
|
||||
} catch (e) {
|
||||
logger.err(`Cannot delete unused custom records for ${publicKey} from db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default new NodesRecordsRepository();
|
||||
@@ -13,6 +13,7 @@ import fundingTxFetcher from './sync-tasks/funding-tx-fetcher';
|
||||
import NodesSocketsRepository from '../../repositories/NodesSocketsRepository';
|
||||
import { Common } from '../../api/common';
|
||||
import blocks from '../../api/blocks';
|
||||
import NodeRecordsRepository from '../../repositories/NodeRecordsRepository';
|
||||
|
||||
class NetworkSyncService {
|
||||
loggerTimer = 0;
|
||||
@@ -63,6 +64,7 @@ class NetworkSyncService {
|
||||
let progress = 0;
|
||||
|
||||
let deletedSockets = 0;
|
||||
let deletedRecords = 0;
|
||||
const graphNodesPubkeys: string[] = [];
|
||||
for (const node of nodes) {
|
||||
const latestUpdated = await channelsApi.$getLatestChannelUpdateForNode(node.pub_key);
|
||||
@@ -84,8 +86,23 @@ class NetworkSyncService {
|
||||
addresses.push(socket.addr);
|
||||
}
|
||||
deletedSockets += await NodesSocketsRepository.$deleteUnusedSockets(node.pub_key, addresses);
|
||||
|
||||
const oldRecordTypes = await NodeRecordsRepository.$getRecordTypes(node.pub_key);
|
||||
const customRecordTypes: number[] = [];
|
||||
for (const [type, payload] of Object.entries(node.custom_records || {})) {
|
||||
const numericalType = parseInt(type);
|
||||
await NodeRecordsRepository.$saveRecord({
|
||||
publicKey: node.pub_key,
|
||||
type: numericalType,
|
||||
payload,
|
||||
});
|
||||
customRecordTypes.push(numericalType);
|
||||
}
|
||||
if (oldRecordTypes.reduce((changed, type) => changed || customRecordTypes.indexOf(type) === -1, false)) {
|
||||
deletedRecords += await NodeRecordsRepository.$deleteUnusedRecords(node.pub_key, customRecordTypes);
|
||||
}
|
||||
}
|
||||
logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted`);
|
||||
logger.info(`${progress} nodes updated. ${deletedSockets} sockets deleted. ${deletedRecords} custom records deleted.`);
|
||||
|
||||
// If a channel if not present in the graph, mark it as inactive
|
||||
await nodesApi.$setNodesInactive(graphNodesPubkeys);
|
||||
@@ -309,7 +326,7 @@ class NetworkSyncService {
|
||||
└──────────────────┘
|
||||
*/
|
||||
|
||||
private async $runClosedChannelsForensics(): Promise<void> {
|
||||
private async $runClosedChannelsForensics(skipUnresolved: boolean = false): Promise<void> {
|
||||
if (!config.ESPLORA.REST_API_URL) {
|
||||
return;
|
||||
}
|
||||
@@ -318,9 +335,18 @@ class NetworkSyncService {
|
||||
|
||||
try {
|
||||
logger.info(`Started running closed channel forensics...`);
|
||||
const channels = await channelsApi.$getClosedChannelsWithoutReason();
|
||||
let channels;
|
||||
const closedChannels = await channelsApi.$getClosedChannelsWithoutReason();
|
||||
if (skipUnresolved) {
|
||||
channels = closedChannels;
|
||||
} else {
|
||||
const unresolvedChannels = await channelsApi.$getUnresolvedClosedChannels();
|
||||
channels = [...closedChannels, ...unresolvedChannels];
|
||||
}
|
||||
|
||||
for (const channel of channels) {
|
||||
let reason = 0;
|
||||
let resolvedForceClose = false;
|
||||
// Only Esplora backend can retrieve spent transaction outputs
|
||||
try {
|
||||
let outspends: IEsploraApi.Outspend[] | undefined;
|
||||
@@ -350,6 +376,7 @@ class NetworkSyncService {
|
||||
reason = 3;
|
||||
} else {
|
||||
reason = 2;
|
||||
resolvedForceClose = true;
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
@@ -374,6 +401,9 @@ class NetworkSyncService {
|
||||
if (reason) {
|
||||
logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.');
|
||||
await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]);
|
||||
if (reason === 2 && resolvedForceClose) {
|
||||
await DB.query(`UPDATE channels SET closing_resolved = ? WHERE id = ?`, [true, channel.id]);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.err(`$runClosedChannelsForensics() failed for channel ${channel.short_id}. Reason: ${e instanceof Error ? e.message : e}`);
|
||||
|
||||
Reference in New Issue
Block a user