use bdk_chain::{ bitcoin::{OutPoint, ScriptBuf, Transaction, Txid}, collections::{BTreeMap, HashMap, HashSet}, local_chain::CheckPoint, spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, tx_graph::TxGraph, BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor, }; use core::str::FromStr; use electrum_client::{ElectrumApi, Error, HeaderNotification}; use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; /// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory /// transaction cache to avoid re-fetching already downloaded transactions. #[derive(Debug)] pub struct BdkElectrumClient { /// The internal [`electrum_client::ElectrumApi`] pub inner: E, /// The transaction cache tx_cache: Mutex>>, } impl BdkElectrumClient { /// Creates a new bdk client from a [`electrum_client::ElectrumApi`] pub fn new(client: E) -> Self { Self { inner: client, tx_cache: Default::default(), } } /// Inserts transactions into the transaction cache so that the client will not fetch these /// transactions. pub fn populate_tx_cache(&self, tx_graph: impl AsRef>) { let txs = tx_graph .as_ref() .full_txs() .map(|tx_node| (tx_node.txid, tx_node.tx)); let mut tx_cache = self.tx_cache.lock().unwrap(); for (txid, tx) in txs { tx_cache.insert(txid, tx); } } /// Fetch transaction of given `txid`. /// /// If it hits the cache it will return the cached version and avoid making the request. pub fn fetch_tx(&self, txid: Txid) -> Result, Error> { let tx_cache = self.tx_cache.lock().unwrap(); if let Some(tx) = tx_cache.get(&txid) { return Ok(Arc::clone(tx)); } drop(tx_cache); let tx = Arc::new(self.inner.transaction_get(&txid)?); self.tx_cache.lock().unwrap().insert(txid, Arc::clone(&tx)); Ok(tx) } /// Full scan the keychain scripts specified with the blockchain (via an Electrum client) and /// returns updates for [`bdk_chain`] data structures. /// /// - `request`: struct with data required to perform a spk-based blockchain client full scan, /// see [`FullScanRequest`] /// - `stop_gap`: the full scan for each keychain stops after a gap of script pubkeys with no /// associated transactions /// - `batch_size`: specifies the max number of script pubkeys to request for in a single batch /// request /// - `fetch_prev_txouts`: specifies whether or not we want previous `TxOut`s for fee pub fn full_scan( &self, request: FullScanRequest, stop_gap: usize, batch_size: usize, fetch_prev_txouts: bool, ) -> Result, Error> { let mut request_spks = request.spks_by_keychain; // We keep track of already-scanned spks just in case a reorg happens and we need to do a // rescan. We need to keep track of this as iterators in `keychain_spks` are "unbounded" so // cannot be collected. In addition, we keep track of whether an spk has an active tx // history for determining the `last_active_index`. // * key: (keychain, spk_index) that identifies the spk. // * val: (script_pubkey, has_tx_history). let mut scanned_spks = BTreeMap::<(K, u32), (ScriptBuf, bool)>::new(); let update = loop { let (tip, _) = construct_update_tip(&self.inner, request.chain_tip.clone())?; let mut graph_update = TxGraph::::default(); let cps = tip .iter() .take(10) .map(|cp| (cp.height(), cp)) .collect::>(); if !request_spks.is_empty() { if !scanned_spks.is_empty() { scanned_spks.append( &mut self.populate_with_spks( &cps, &mut graph_update, &mut scanned_spks .iter() .map(|(i, (spk, _))| (i.clone(), spk.clone())), stop_gap, batch_size, )?, ); } for (keychain, keychain_spks) in &mut request_spks { scanned_spks.extend( self.populate_with_spks( &cps, &mut graph_update, keychain_spks, stop_gap, batch_size, )? .into_iter() .map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)), ); } } // check for reorgs during scan process let server_blockhash = self.inner.block_header(tip.height() as usize)?.block_hash(); if tip.hash() != server_blockhash { continue; // reorg } // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut graph_update)?; } let chain_update = tip; let keychain_update = request_spks .into_keys() .filter_map(|k| { scanned_spks .range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX)) .rev() .find(|(_, (_, active))| *active) .map(|((_, i), _)| (k, *i)) }) .collect::>(); break FullScanResult { graph_update, chain_update, last_active_indices: keychain_update, }; }; Ok(ElectrumFullScanResult(update)) } /// Sync a set of scripts with the blockchain (via an Electrum client) for the data specified /// and returns updates for [`bdk_chain`] data structures. /// /// - `request`: struct with data required to perform a spk-based blockchain client sync, /// see [`SyncRequest`] /// - `batch_size`: specifies the max number of script pubkeys to request for in a single batch /// request /// - `fetch_prev_txouts`: specifies whether or not we want previous `TxOut`s for fee /// calculation /// /// If the scripts to sync are unknown, such as when restoring or importing a keychain that /// may include scripts that have been used, use [`full_scan`] with the keychain. /// /// [`full_scan`]: Self::full_scan pub fn sync( &self, request: SyncRequest, batch_size: usize, fetch_prev_txouts: bool, ) -> Result { let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone()) .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk))); let mut full_scan_res = self .full_scan(full_scan_req, usize::MAX, batch_size, false)? .with_confirmation_height_anchor(); let (tip, _) = construct_update_tip(&self.inner, request.chain_tip)?; let cps = tip .iter() .take(10) .map(|cp| (cp.height(), cp)) .collect::>(); self.populate_with_txids(&cps, &mut full_scan_res.graph_update, request.txids)?; self.populate_with_outpoints(&cps, &mut full_scan_res.graph_update, request.outpoints)?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut full_scan_res.graph_update)?; } Ok(ElectrumSyncResult(SyncResult { chain_update: full_scan_res.chain_update, graph_update: full_scan_res.graph_update, })) } /// Populate the `graph_update` with transactions/anchors associated with the given `spks`. /// /// Transactions that contains an output with requested spk, or spends form an output with /// requested spk will be added to `graph_update`. Anchors of the aforementioned transactions are /// also included. /// /// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory. fn populate_with_spks( &self, cps: &BTreeMap, graph_update: &mut TxGraph, spks: &mut impl Iterator, stop_gap: usize, batch_size: usize, ) -> Result, Error> { let mut unused_spk_count = 0_usize; let mut scanned_spks = BTreeMap::new(); loop { let spks = (0..batch_size) .map_while(|_| spks.next()) .collect::>(); if spks.is_empty() { return Ok(scanned_spks); } let spk_histories = self .inner .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?; for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { scanned_spks.insert(spk_index, (spk, false)); unused_spk_count += 1; if unused_spk_count > stop_gap { return Ok(scanned_spks); } continue; } else { scanned_spks.insert(spk_index, (spk, true)); unused_spk_count = 0; } for tx_res in spk_history { let _ = graph_update.insert_tx(self.fetch_tx(tx_res.tx_hash)?); if let Some(anchor) = determine_tx_anchor(cps, tx_res.height, tx_res.tx_hash) { let _ = graph_update.insert_anchor(tx_res.tx_hash, anchor); } } } } } // Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions, // which we do not have by default. This data is needed to calculate the transaction fee. fn fetch_prev_txout( &self, graph_update: &mut TxGraph, ) -> Result<(), Error> { let full_txs: Vec> = graph_update.full_txs().map(|tx_node| tx_node.tx).collect(); for tx in full_txs { for vin in &tx.input { let outpoint = vin.previous_output; let vout = outpoint.vout; let prev_tx = self.fetch_tx(outpoint.txid)?; let txout = prev_tx.output[vout as usize].clone(); let _ = graph_update.insert_txout(outpoint, txout); } } Ok(()) } /// Populate the `graph_update` with associated transactions/anchors of `outpoints`. /// /// Transactions in which the outpoint resides, and transactions that spend from the outpoint are /// included. Anchors of the aforementioned transactions are included. /// /// Checkpoints (in `cps`) are used to create anchors. The `tx_cache` is self-explanatory. fn populate_with_outpoints( &self, cps: &BTreeMap, graph_update: &mut TxGraph, outpoints: impl IntoIterator, ) -> Result<(), Error> { for outpoint in outpoints { let op_txid = outpoint.txid; let op_tx = self.fetch_tx(op_txid)?; let op_txout = match op_tx.output.get(outpoint.vout as usize) { Some(txout) => txout, None => continue, }; debug_assert_eq!(op_tx.txid(), op_txid); // attempt to find the following transactions (alongside their chain positions), and // add to our sparsechain `update`: let mut has_residing = false; // tx in which the outpoint resides let mut has_spending = false; // tx that spends the outpoint for res in self.inner.script_get_history(&op_txout.script_pubkey)? { if has_residing && has_spending { break; } if !has_residing && res.tx_hash == op_txid { has_residing = true; let _ = graph_update.insert_tx(Arc::clone(&op_tx)); if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) { let _ = graph_update.insert_anchor(res.tx_hash, anchor); } } if !has_spending && res.tx_hash != op_txid { let res_tx = self.fetch_tx(res.tx_hash)?; // we exclude txs/anchors that do not spend our specified outpoint(s) has_spending = res_tx .input .iter() .any(|txin| txin.previous_output == outpoint); if !has_spending { continue; } let _ = graph_update.insert_tx(Arc::clone(&res_tx)); if let Some(anchor) = determine_tx_anchor(cps, res.height, res.tx_hash) { let _ = graph_update.insert_anchor(res.tx_hash, anchor); } } } } Ok(()) } /// Populate the `graph_update` with transactions/anchors of the provided `txids`. fn populate_with_txids( &self, cps: &BTreeMap, graph_update: &mut TxGraph, txids: impl IntoIterator, ) -> Result<(), Error> { for txid in txids { let tx = match self.fetch_tx(txid) { Ok(tx) => tx, Err(electrum_client::Error::Protocol(_)) => continue, Err(other_err) => return Err(other_err), }; let spk = tx .output .first() .map(|txo| &txo.script_pubkey) .expect("tx must have an output"); // because of restrictions of the Electrum API, we have to use the `script_get_history` // call to get confirmation status of our transaction let anchor = match self .inner .script_get_history(spk)? .into_iter() .find(|r| r.tx_hash == txid) { Some(r) => determine_tx_anchor(cps, r.height, txid), None => continue, }; let _ = graph_update.insert_tx(tx); if let Some(anchor) = anchor { let _ = graph_update.insert_anchor(txid, anchor); } } Ok(()) } } /// The result of [`BdkElectrumClient::full_scan`]. /// /// This can be transformed into a [`FullScanResult`] with either [`ConfirmationHeightAnchor`] or /// [`ConfirmationTimeHeightAnchor`] anchor types. pub struct ElectrumFullScanResult(FullScanResult); impl ElectrumFullScanResult { /// Return [`FullScanResult`] with [`ConfirmationHeightAnchor`]. pub fn with_confirmation_height_anchor(self) -> FullScanResult { self.0 } /// Return [`FullScanResult`] with [`ConfirmationTimeHeightAnchor`]. /// /// This requires additional calls to the Electrum server. pub fn with_confirmation_time_height_anchor( self, client: &BdkElectrumClient, ) -> Result, Error> { let res = self.0; Ok(FullScanResult { graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?, chain_update: res.chain_update, last_active_indices: res.last_active_indices, }) } } /// The result of [`BdkElectrumClient::sync`]. /// /// This can be transformed into a [`SyncResult`] with either [`ConfirmationHeightAnchor`] or /// [`ConfirmationTimeHeightAnchor`] anchor types. pub struct ElectrumSyncResult(SyncResult); impl ElectrumSyncResult { /// Return [`SyncResult`] with [`ConfirmationHeightAnchor`]. pub fn with_confirmation_height_anchor(self) -> SyncResult { self.0 } /// Return [`SyncResult`] with [`ConfirmationTimeHeightAnchor`]. /// /// This requires additional calls to the Electrum server. pub fn with_confirmation_time_height_anchor( self, client: &BdkElectrumClient, ) -> Result, Error> { let res = self.0; Ok(SyncResult { graph_update: try_into_confirmation_time_result(res.graph_update, &client.inner)?, chain_update: res.chain_update, }) } } fn try_into_confirmation_time_result( graph_update: TxGraph, client: &impl ElectrumApi, ) -> Result, Error> { let relevant_heights = graph_update .all_anchors() .iter() .map(|(a, _)| a.confirmation_height) .collect::>(); let height_to_time = relevant_heights .clone() .into_iter() .zip( client .batch_block_header(relevant_heights)? .into_iter() .map(|bh| bh.time as u64), ) .collect::>(); Ok(graph_update.map_anchors(|a| ConfirmationTimeHeightAnchor { anchor_block: a.anchor_block, confirmation_height: a.confirmation_height, confirmation_time: height_to_time[&a.confirmation_height], })) } /// Return a [`CheckPoint`] of the latest tip, that connects with `prev_tip`. fn construct_update_tip( client: &impl ElectrumApi, prev_tip: CheckPoint, ) -> Result<(CheckPoint, Option), Error> { let HeaderNotification { height, .. } = client.block_headers_subscribe()?; let new_tip_height = height as u32; // If electrum returns a tip height that is lower than our previous tip, then checkpoints do // not need updating. We just return the previous tip and use that as the point of agreement. if new_tip_height < prev_tip.height() { return Ok((prev_tip.clone(), Some(prev_tip.height()))); } // Atomically fetch the latest `CHAIN_SUFFIX_LENGTH` count of blocks from Electrum. We use this // to construct our checkpoint update. let mut new_blocks = { let start_height = new_tip_height.saturating_sub(CHAIN_SUFFIX_LENGTH - 1); let hashes = client .block_headers(start_height as _, CHAIN_SUFFIX_LENGTH as _)? .headers .into_iter() .map(|h| h.block_hash()); (start_height..).zip(hashes).collect::>() }; // Find the "point of agreement" (if any). let agreement_cp = { let mut agreement_cp = Option::::None; for cp in prev_tip.iter() { let cp_block = cp.block_id(); let hash = match new_blocks.get(&cp_block.height) { Some(&hash) => hash, None => { assert!( new_tip_height >= cp_block.height, "already checked that electrum's tip cannot be smaller" ); let hash = client.block_header(cp_block.height as _)?.block_hash(); new_blocks.insert(cp_block.height, hash); hash } }; if hash == cp_block.hash { agreement_cp = Some(cp); break; } } agreement_cp }; let agreement_height = agreement_cp.as_ref().map(CheckPoint::height); let new_tip = new_blocks .into_iter() // Prune `new_blocks` to only include blocks that are actually new. .filter(|(height, _)| Some(*height) > agreement_height) .map(|(height, hash)| BlockId { height, hash }) .fold(agreement_cp, |prev_cp, block| { Some(match prev_cp { Some(cp) => cp.push(block).expect("must extend checkpoint"), None => CheckPoint::new(block), }) }) .expect("must have at least one checkpoint"); Ok((new_tip, agreement_height)) } /// A [tx status] comprises of a concatenation of `tx_hash:height:`s. We transform a single one of /// these concatenations into a [`ConfirmationHeightAnchor`] if possible. /// /// We use the lowest possible checkpoint as the anchor block (from `cps`). If an anchor block /// cannot be found, or the transaction is unconfirmed, [`None`] is returned. /// /// [tx status](https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html#status) fn determine_tx_anchor( cps: &BTreeMap, raw_height: i32, txid: Txid, ) -> Option { // The electrum API has a weird quirk where an unconfirmed transaction is presented with a // height of 0. To avoid invalid representation in our data structures, we manually set // transactions residing in the genesis block to have height 0, then interpret a height of 0 as // unconfirmed for all other transactions. if txid == Txid::from_str("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b") .expect("must deserialize genesis coinbase txid") { let anchor_block = cps.values().next()?.block_id(); return Some(ConfirmationHeightAnchor { anchor_block, confirmation_height: 0, }); } match raw_height { h if h <= 0 => { debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h); None } h => { let h = h as u32; let anchor_block = cps.range(h..).next().map(|(_, cp)| cp.block_id())?; if h > anchor_block.height { None } else { Some(ConfirmationHeightAnchor { anchor_block, confirmation_height: h, }) } } } }