Use new bulk endpoints to speed up forensics

This commit is contained in:
Mononaut 2023-08-17 02:42:59 +09:00
parent 7ec7ae7b95
commit 5bee54a2bf
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
4 changed files with 155 additions and 69 deletions

View File

@ -24,6 +24,7 @@ export interface AbstractBitcoinApi {
$getOutspend(txId: string, vout: number): Promise<IEsploraApi.Outspend>; $getOutspend(txId: string, vout: number): Promise<IEsploraApi.Outspend>;
$getOutspends(txId: string): Promise<IEsploraApi.Outspend[]>; $getOutspends(txId: string): Promise<IEsploraApi.Outspend[]>;
$getBatchedOutspends(txId: string[]): Promise<IEsploraApi.Outspend[][]>; $getBatchedOutspends(txId: string[]): Promise<IEsploraApi.Outspend[][]>;
$getBatchedOutspendsInternal(txId: string[]): Promise<IEsploraApi.Outspend[][]>;
startHealthChecks(): void; startHealthChecks(): void;
} }

View File

@ -60,8 +60,17 @@ class BitcoinApi implements AbstractBitcoinApi {
}); });
} }
$getRawTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> { async $getRawTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
throw new Error('Method getRawTransactions not supported by the Bitcoin RPC API.'); const txs: IEsploraApi.Transaction[] = [];
for (const txid of txids) {
try {
const tx = await this.$getRawTransaction(txid, false, true);
txs.push(tx);
} catch (err) {
// skip failures
}
}
return txs;
} }
$getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> { $getMempoolTransactions(txids: string[]): Promise<IEsploraApi.Transaction[]> {
@ -202,6 +211,10 @@ class BitcoinApi implements AbstractBitcoinApi {
return outspends; return outspends;
} }
async $getBatchedOutspendsInternal(txId: string[]): Promise<IEsploraApi.Outspend[][]> {
return this.$getBatchedOutspends(txId);
}
$getEstimatedHashrate(blockHeight: number): Promise<number> { $getEstimatedHashrate(blockHeight: number): Promise<number> {
// 120 is the default block span in Core // 120 is the default block span in Core
return this.bitcoindClient.getNetworkHashPs(120, blockHeight); return this.bitcoindClient.getNetworkHashPs(120, blockHeight);

View File

@ -301,6 +301,19 @@ class ElectrsApi implements AbstractBitcoinApi {
throw new Error('Method not implemented.'); throw new Error('Method not implemented.');
} }
async $getBatchedOutspendsInternal(txids: string[]): Promise<IEsploraApi.Outspend[][]> {
const allOutspends: IEsploraApi.Outspend[][] = [];
const sliceLength = 50;
for (let i = 0; i < Math.ceil(txids.length / sliceLength); i++) {
const slice = txids.slice(i * sliceLength, (i + 1) * sliceLength);
const sliceOutspends = await this.failoverRouter.$get<IEsploraApi.Outspend[][]>('/txs/outspends', 'json', { txids: slice.join(',') });
for (const outspends of sliceOutspends) {
allOutspends.push(outspends);
}
}
return allOutspends;
}
public startHealthChecks(): void { public startHealthChecks(): void {
this.failoverRouter.startHealthChecks(); this.failoverRouter.startHealthChecks();
} }

View File

@ -15,8 +15,6 @@ class ForensicsService {
txCache: { [txid: string]: IEsploraApi.Transaction } = {}; txCache: { [txid: string]: IEsploraApi.Transaction } = {};
tempCached: string[] = []; tempCached: string[] = [];
constructor() {}
public async $startService(): Promise<void> { public async $startService(): Promise<void> {
logger.info('Starting lightning network forensics service'); logger.info('Starting lightning network forensics service');
@ -66,39 +64,73 @@ class ForensicsService {
*/ */
public async $runClosedChannelsForensics(onlyNewChannels: boolean = false): Promise<void> { public async $runClosedChannelsForensics(onlyNewChannels: boolean = false): Promise<void> {
// Only Esplora backend can retrieve spent transaction outputs
if (config.MEMPOOL.BACKEND !== 'esplora') { if (config.MEMPOOL.BACKEND !== 'esplora') {
return; return;
} }
let progress = 0;
try { try {
logger.debug(`Started running closed channel forensics...`); logger.debug(`Started running closed channel forensics...`);
let channels; let remainingChannels;
if (onlyNewChannels) { if (onlyNewChannels) {
channels = await channelsApi.$getClosedChannelsWithoutReason(); remainingChannels = await channelsApi.$getClosedChannelsWithoutReason();
} else { } else {
channels = await channelsApi.$getUnresolvedClosedChannels(); remainingChannels = await channelsApi.$getUnresolvedClosedChannels();
} }
for (const channel of channels) { let progress = 0;
let reason = 0; const sliceLength = 1000;
let resolvedForceClose = false; // process batches of 1000 channels
// Only Esplora backend can retrieve spent transaction outputs for (let i = 0; i < Math.ceil(remainingChannels.length / sliceLength); i++) {
const cached: string[] = []; const channels = remainingChannels.slice(i * sliceLength, (i + 1) * sliceLength);
let allOutspends: IEsploraApi.Outspend[][] = [];
const forceClosedChannels: { channel: any, cachedSpends: string[] }[] = [];
// fetch outspends in bulk
try { try {
let outspends: IEsploraApi.Outspend[] | undefined; const outspendTxids = channels.map(channel => channel.closing_transaction_id);
try { allOutspends = await bitcoinApi.$getBatchedOutspendsInternal(outspendTxids);
outspends = await bitcoinApi.$getOutspends(channel.closing_transaction_id); logger.info(`Fetched outspends for ${allOutspends.length} txs from esplora for lightning forensics`);
await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT); await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
} catch (e) { } catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + channel.closing_transaction_id + '/outspends'}. Reason ${e instanceof Error ? e.message : e}`); logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs/outspends'}. Reason ${e instanceof Error ? e.message : e}`);
}
// fetch spending transactions in bulk and load into txCache
try {
const newSpendingTxids: { [txid: string]: boolean } = {};
for (const outspends of allOutspends) {
for (const outspend of outspends) {
if (outspend.spent && outspend.txid) {
if (!this.txCache[outspend.txid]) {
newSpendingTxids[outspend.txid] = true;
}
}
}
}
const allOutspendTxs = await bitcoinApi.$getRawTransactions(Object.keys(newSpendingTxids));
logger.info(`Fetched ${allOutspendTxs.length} out-spending txs from esplora for lightning forensics`);
for (const tx of allOutspendTxs) {
this.txCache[tx.txid] = tx;
}
} catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs'}. Reason ${e instanceof Error ? e.message : e}`);
}
// process each outspend
for (const [index, channel] of channels.entries()) {
let reason = 0;
const cached: string[] = [];
try {
const outspends = allOutspends[index];
if (!outspends || !outspends.length) {
// outspends are missing
continue; continue;
} }
const lightningScriptReasons: number[] = []; const lightningScriptReasons: number[] = [];
for (const outspend of outspends) { for (const outspend of outspends) {
if (outspend.spent && outspend.txid) { if (outspend.spent && outspend.txid) {
let spendingTx = await this.fetchTransaction(outspend.txid); const spendingTx = this.txCache[outspend.txid];
if (!spendingTx) { if (!spendingTx) {
continue; continue;
} }
@ -110,49 +142,76 @@ class ForensicsService {
const filteredReasons = lightningScriptReasons.filter((r) => r !== 1); const filteredReasons = lightningScriptReasons.filter((r) => r !== 1);
if (filteredReasons.length) { if (filteredReasons.length) {
if (filteredReasons.some((r) => r === 2 || r === 4)) { if (filteredReasons.some((r) => r === 2 || r === 4)) {
// Force closed with penalty
reason = 3; reason = 3;
} else { } else {
// Force closed without penalty
reason = 2; reason = 2;
resolvedForceClose = true;
}
} else {
/*
We can detect a commitment transaction (force close) by reading Sequence and Locktime
https://github.com/lightning/bolts/blob/master/03-transactions.md#commitment-transaction
*/
let closingTx = await this.fetchTransaction(channel.closing_transaction_id, true);
if (!closingTx) {
continue;
}
cached.push(closingTx.txid);
const sequenceHex: string = closingTx.vin[0].sequence.toString(16);
const locktimeHex: string = closingTx.locktime.toString(16);
if (sequenceHex.substring(0, 2) === '80' && locktimeHex.substring(0, 2) === '20') {
reason = 2; // Here we can't be sure if it's a penalty or not
} else {
reason = 1;
}
}
if (reason) {
logger.debug('Setting closing reason ' + reason + ' for channel: ' + channel.id + '.');
await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]);
if (reason === 2 && resolvedForceClose) {
await DB.query(`UPDATE channels SET closing_resolved = ? WHERE id = ?`, [true, channel.id]); await DB.query(`UPDATE channels SET closing_resolved = ? WHERE id = ?`, [true, channel.id]);
} }
if (reason !== 2 || resolvedForceClose) { await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]);
// clean up cached transactions
cached.forEach(txid => { cached.forEach(txid => {
delete this.txCache[txid]; delete this.txCache[txid];
}); });
} } else {
forceClosedChannels.push({ channel, cachedSpends: cached });
} }
} catch (e) { } catch (e) {
logger.err(`$runClosedChannelsForensics() failed for channel ${channel.short_id}. Reason: ${e instanceof Error ? e.message : e}`); logger.err(`$runClosedChannelsForensics() failed for channel ${channel.short_id}. Reason: ${e instanceof Error ? e.message : e}`);
} }
}
++progress; // fetch force-closing transactions in bulk
try {
const newClosingTxids: { [txid: string]: boolean } = {};
for (const { channel } of forceClosedChannels) {
if (!this.txCache[channel.closing_transaction_id]) {
newClosingTxids[channel.closing_transaction_id] = true;
}
}
const closingTxs = await bitcoinApi.$getRawTransactions(Object.keys(newClosingTxids));
logger.info(`Fetched ${closingTxs.length} closing txs from esplora for lightning forensics`);
for (const tx of closingTxs) {
this.txCache[tx.txid] = tx;
}
} catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/txs'}. Reason ${e instanceof Error ? e.message : e}`);
}
// process channels with no lightning script reasons
for (const { channel, cachedSpends } of forceClosedChannels) {
const closingTx = this.txCache[channel.closing_transaction_id];
if (!closingTx) {
// no channel close transaction found yet
continue;
}
/*
We can detect a commitment transaction (force close) by reading Sequence and Locktime
https://github.com/lightning/bolts/blob/master/03-transactions.md#commitment-transaction
*/
const sequenceHex: string = closingTx.vin[0].sequence.toString(16);
const locktimeHex: string = closingTx.locktime.toString(16);
let reason;
if (sequenceHex.substring(0, 2) === '80' && locktimeHex.substring(0, 2) === '20') {
// Force closed, but we can't be sure if it's a penalty or not
reason = 2;
} else {
// Mutually closed
reason = 1;
// clean up cached transactions
delete this.txCache[closingTx.txid];
for (const txid of cachedSpends) {
delete this.txCache[txid];
}
}
await DB.query(`UPDATE channels SET closing_reason = ? WHERE id = ?`, [reason, channel.id]);
}
progress += channels.length;
const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer); const elapsedSeconds = Math.round((new Date().getTime() / 1000) - this.loggerTimer);
if (elapsedSeconds > 10) { if (elapsedSeconds > 10) {
logger.debug(`Updating channel closed channel forensics ${progress}/${channels.length}`); logger.debug(`Updating channel closed channel forensics ${progress}/${remainingChannels.length}`);
this.loggerTimer = new Date().getTime() / 1000; this.loggerTimer = new Date().getTime() / 1000;
} }
} }
@ -221,7 +280,7 @@ class ForensicsService {
const channels = await channelsApi.$getChannelsWithoutSourceChecked(); const channels = await channelsApi.$getChannelsWithoutSourceChecked();
for (const openChannel of channels) { for (const openChannel of channels) {
let openTx = await this.fetchTransaction(openChannel.transaction_id, true); const openTx = await this.fetchTransaction(openChannel.transaction_id, true);
if (!openTx) { if (!openTx) {
continue; continue;
} }
@ -276,7 +335,7 @@ class ForensicsService {
// Check if a channel open tx input spends the result of a swept channel close output // Check if a channel open tx input spends the result of a swept channel close output
private async $attributeSweptChannelCloses(openChannel: ILightningApi.Channel, input: IEsploraApi.Vin): Promise<void> { private async $attributeSweptChannelCloses(openChannel: ILightningApi.Channel, input: IEsploraApi.Vin): Promise<void> {
let sweepTx = await this.fetchTransaction(input.txid, true); const sweepTx = await this.fetchTransaction(input.txid, true);
if (!sweepTx) { if (!sweepTx) {
logger.err(`couldn't find input transaction for channel forensics ${openChannel.channel_id} ${input.txid}`); logger.err(`couldn't find input transaction for channel forensics ${openChannel.channel_id} ${input.txid}`);
return; return;
@ -335,7 +394,7 @@ class ForensicsService {
if (matched && !ambiguous) { if (matched && !ambiguous) {
// fetch closing channel transaction and perform forensics on the outputs // fetch closing channel transaction and perform forensics on the outputs
let prevChannelTx = await this.fetchTransaction(input.txid, true); const prevChannelTx = await this.fetchTransaction(input.txid, true);
let outspends: IEsploraApi.Outspend[] | undefined; let outspends: IEsploraApi.Outspend[] | undefined;
try { try {
outspends = await bitcoinApi.$getOutspends(input.txid); outspends = await bitcoinApi.$getOutspends(input.txid);
@ -430,7 +489,7 @@ class ForensicsService {
} }
await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT); await Common.sleep$(config.LIGHTNING.FORENSICS_RATE_LIMIT);
} catch (e) { } catch (e) {
logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + txid + '/outspends'}. Reason ${e instanceof Error ? e.message : e}`); logger.err(`Failed to call ${config.ESPLORA.REST_API_URL + '/tx/' + txid}. Reason ${e instanceof Error ? e.message : e}`);
return null; return null;
} }
} }