diff --git a/crates/electrum/src/lib.rs b/crates/electrum/src/lib.rs index 6d352ca1..051b6375 100644 --- a/crates/electrum/src/lib.rs +++ b/crates/electrum/src/lib.rs @@ -20,12 +20,6 @@ //! [`batch_transaction_get`]: ElectrumApi::batch_transaction_get //! [`bdk_electrum_example`]: https://github.com/LLFourn/bdk_core_staging/tree/master/bdk_electrum_example -use std::{ - collections::{BTreeMap, HashMap}, - fmt::Debug, -}; - -pub use bdk_chain; use bdk_chain::{ bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid}, chain_graph::{self, ChainGraph}, @@ -34,8 +28,15 @@ use bdk_chain::{ tx_graph::TxGraph, BlockId, ConfirmationTime, TxHeight, }; -pub use electrum_client; use electrum_client::{Client, ElectrumApi, Error}; +use std::{ + collections::{BTreeMap, HashMap}, + fmt::Debug, +}; + +pub mod v2; +pub use bdk_chain; +pub use electrum_client; /// Trait to extend [`electrum_client::Client`] functionality. /// diff --git a/crates/electrum/src/v2.rs b/crates/electrum/src/v2.rs new file mode 100644 index 00000000..6a942a1f --- /dev/null +++ b/crates/electrum/src/v2.rs @@ -0,0 +1,507 @@ +use bdk_chain::{ + bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid}, + keychain::LocalUpdate, + local_chain::LocalChain, + tx_graph::{self, TxGraph}, + Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, +}; +use electrum_client::{Client, ElectrumApi, Error}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + fmt::Debug, +}; + +use crate::InternalError; + +#[derive(Debug, Clone)] +pub struct ElectrumUpdate { + pub graph_update: HashMap>, + pub chain_update: LocalChain, + pub keychain_update: BTreeMap, +} + +impl Default for ElectrumUpdate { + fn default() -> Self { + Self { + graph_update: Default::default(), + chain_update: Default::default(), + keychain_update: Default::default(), + } + } +} + +impl<'a, K, A: Anchor> ElectrumUpdate { + pub fn missing_full_txs( + &'a self, + graph: &'a TxGraph, + ) -> impl Iterator + 'a { + self.graph_update + .keys() + .filter(move |&&txid| graph.as_ref().get_tx(txid).is_none()) + } + + pub fn finalize(self, seen_at: Option, new_txs: T) -> LocalUpdate + where + T: IntoIterator, + { + let mut graph_update = TxGraph::::new(new_txs); + for (txid, anchors) in self.graph_update { + if let Some(seen_at) = seen_at { + let _ = graph_update.insert_seen_at(txid, seen_at); + } + for anchor in anchors { + let _ = graph_update.insert_anchor(txid, anchor); + } + } + dbg!(graph_update.full_txs().count()); + LocalUpdate { + keychain: self.keychain_update, + graph: graph_update, + chain: self.chain_update, + } + } +} + +impl ElectrumUpdate { + pub fn finalize_as_confirmation_time( + self, + client: &Client, + seen_at: Option, + new_txs: T, + ) -> Result, Error> + where + T: IntoIterator, + { + let update = self.finalize(seen_at, new_txs); + let update_tip = update.chain.tip().expect("must have tip"); + + let relevant_heights = { + let mut visited_heights = HashSet::new(); + update + .graph + .all_anchors() + .iter() + .map(|(a, _)| a.confirmation_height_upper_bound()) + .filter(move |&h| visited_heights.insert(h)) + .collect::>() + }; + + let height_to_time = relevant_heights + .clone() + .into_iter() + .zip( + client + .batch_block_header(relevant_heights)? + .into_iter() + .map(|bh| bh.time as u64), + ) + .collect::>(); + + if update_tip.hash != client.block_header(update_tip.height as _)?.block_hash() { + // [TODO] We should alter the logic so we won't have to return an error. This is to + // [TODO] ensure obtained block times are "anchored" to our tip. If we exclude this, it + // [TODO] should be "safe" as well. Tx confirmation times would just slightly vary. + return Err(Error::Message(format!( + "tip changed during update: update_tip={:?}", + update_tip + ))); + } + + let graph_additions = { + let old_additions = TxGraph::default().determine_additions(&update.graph); + tx_graph::Additions { + tx: old_additions.tx, + txout: old_additions.txout, + last_seen: old_additions.last_seen, + anchors: old_additions + .anchors + .into_iter() + .map(|(height_anchor, txid)| { + let confirmation_height = dbg!(height_anchor.confirmation_height); + let confirmation_time = height_to_time[&confirmation_height]; + let time_anchor = ConfirmationTimeAnchor { + anchor_block: height_anchor.anchor_block, + confirmation_height, + confirmation_time, + }; + (time_anchor, txid) + }) + .collect(), + } + }; + + Ok(LocalUpdate { + keychain: update.keychain, + graph: { + let mut graph = TxGraph::default(); + graph.apply_additions(graph_additions); + graph + }, + chain: update.chain, + }) + } +} + +pub trait ElectrumExt { + fn get_tip(&self) -> Result<(u32, BlockHash), Error>; + + fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + batch_size: usize, + ) -> Result, Error>; + + fn scan_without_keychain( + &self, + local_chain: &BTreeMap, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + batch_size: usize, + ) -> Result, Error> { + let spk_iter = misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)); + + self.scan( + local_chain, + [((), spk_iter)].into(), + txids, + outpoints, + usize::MAX, + batch_size, + ) + } +} + +impl ElectrumExt for Client { + fn get_tip(&self) -> Result<(u32, BlockHash), Error> { + // TODO: unsubscribe when added to the client, or is there a better call to use here? + self.block_headers_subscribe() + .map(|data| (data.height as u32, data.header.block_hash())) + } + + fn scan( + &self, + local_chain: &BTreeMap, + keychain_spks: BTreeMap>, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + stop_gap: usize, + batch_size: usize, + ) -> Result, Error> { + let mut request_spks = keychain_spks + .into_iter() + .map(|(k, s)| (k, s.into_iter())) + .collect::>(); + let mut scanned_spks = BTreeMap::<(K, u32), (Script, bool)>::new(); + + let txids = txids.into_iter().collect::>(); + let outpoints = outpoints.into_iter().collect::>(); + + let update = loop { + let mut update = ElectrumUpdate:: { + chain_update: prepare_chain_update(self, local_chain)?, + ..Default::default() + }; + let anchor_block = update + .chain_update + .tip() + .expect("must have atleast one block"); + + if !request_spks.is_empty() { + if !scanned_spks.is_empty() { + let mut scanned_spk_iter = scanned_spks + .iter() + .map(|(i, (spk, _))| (i.clone(), spk.clone())); + match populate_with_spks( + self, + anchor_block, + &mut update, + &mut scanned_spk_iter, + stop_gap, + batch_size, + ) { + Err(InternalError::Reorg) => continue, + Err(InternalError::ElectrumError(e)) => return Err(e), + Ok(mut spks) => scanned_spks.append(&mut spks), + }; + } + for (keychain, keychain_spks) in &mut request_spks { + match populate_with_spks( + self, + anchor_block, + &mut update, + keychain_spks, + stop_gap, + batch_size, + ) { + Err(InternalError::Reorg) => continue, + Err(InternalError::ElectrumError(e)) => return Err(e), + Ok(spks) => scanned_spks.extend( + spks.into_iter() + .map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)), + ), + }; + } + } + + match populate_with_txids(self, anchor_block, &mut update, &mut txids.iter().cloned()) { + Err(InternalError::Reorg) => continue, + Err(InternalError::ElectrumError(e)) => return Err(e), + Ok(_) => {} + } + + match populate_with_outpoints( + self, + anchor_block, + &mut update, + &mut outpoints.iter().cloned(), + ) { + Err(InternalError::Reorg) => continue, + Err(InternalError::ElectrumError(e)) => return Err(e), + Ok(_txs) => { /* [TODO] cache full txs to reduce bandwidth */ } + } + + // check for reorgs during scan process + let server_blockhash = self + .block_header(anchor_block.height as usize)? + .block_hash(); + if anchor_block.hash != server_blockhash { + continue; // reorg + } + + update.keychain_update = request_spks + .into_keys() + .filter_map(|k| { + scanned_spks + .range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX)) + .rev() + .find(|(_, (_, active))| *active) + .map(|((_, i), _)| (k, *i)) + }) + .collect::>(); + break update; + }; + + Ok(update) + } +} + +/// Prepare an update "template" based on the checkpoints of the `local_chain`. +fn prepare_chain_update( + client: &Client, + local_chain: &BTreeMap, +) -> Result { + let mut update = LocalChain::default(); + + // Find the local chain block that is still there so our update can connect to the local chain. + for (&existing_height, &existing_hash) in local_chain.iter().rev() { + // TODO: a batch request may be safer, as a reorg that happens when we are obtaining + // `block_header`s will result in inconsistencies + let current_hash = client.block_header(existing_height as usize)?.block_hash(); + let _ = update + .insert_block(BlockId { + height: existing_height, + hash: current_hash, + }) + .expect("This never errors because we are working with a fresh chain"); + + if current_hash == existing_hash { + break; + } + } + + // Insert the new tip so new transactions will be accepted into the sparsechain. + let tip = { + let (height, hash) = crate::get_tip(client)?; + BlockId { height, hash } + }; + if update.insert_block(tip).is_err() { + // There has been a re-org before we even begin scanning addresses. + // Just recursively call (this should never happen). + return prepare_chain_update(client, local_chain); + } + + Ok(update) +} + +fn determine_tx_anchor( + anchor_block: BlockId, + raw_height: i32, + txid: Txid, +) -> Option { + if txid + == Txid::from_hex("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b") + .expect("must deserialize genesis coinbase txid") + { + return Some(ConfirmationHeightAnchor { + anchor_block, + confirmation_height: 0, + }); + } + match raw_height { + h if h <= 0 => { + debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h); + None + } + h => { + let h = h as u32; + if h > anchor_block.height { + None + } else { + Some(ConfirmationHeightAnchor { + anchor_block, + confirmation_height: h, + }) + } + } + } +} + +fn populate_with_outpoints( + client: &Client, + anchor_block: BlockId, + update: &mut ElectrumUpdate, + outpoints: &mut impl Iterator, +) -> Result, InternalError> { + let mut full_txs = HashMap::new(); + for outpoint in outpoints { + let txid = outpoint.txid; + let tx = client.transaction_get(&txid)?; + debug_assert_eq!(tx.txid(), txid); + let txout = match tx.output.get(outpoint.vout as usize) { + Some(txout) => txout, + None => continue, + }; + // attempt to find the following transactions (alongside their chain positions), and + // add to our sparsechain `update`: + let mut has_residing = false; // tx in which the outpoint resides + let mut has_spending = false; // tx that spends the outpoint + for res in client.script_get_history(&txout.script_pubkey)? { + if has_residing && has_spending { + break; + } + + if res.tx_hash == txid { + if has_residing { + continue; + } + has_residing = true; + full_txs.insert(res.tx_hash, tx.clone()); + } else { + if has_spending { + continue; + } + let res_tx = match full_txs.get(&res.tx_hash) { + Some(tx) => tx, + None => { + let res_tx = client.transaction_get(&res.tx_hash)?; + full_txs.insert(res.tx_hash, res_tx); + full_txs.get(&res.tx_hash).expect("just inserted") + } + }; + has_spending = res_tx + .input + .iter() + .any(|txin| txin.previous_output == outpoint); + if !has_spending { + continue; + } + }; + + let anchor = determine_tx_anchor(anchor_block, res.height, res.tx_hash); + + let tx_entry = update.graph_update.entry(res.tx_hash).or_default(); + if let Some(anchor) = anchor { + tx_entry.insert(anchor); + } + } + } + Ok(full_txs) +} + +fn populate_with_txids( + client: &Client, + anchor_block: BlockId, + update: &mut ElectrumUpdate, + txids: &mut impl Iterator, +) -> Result<(), InternalError> { + for txid in txids { + let tx = match client.transaction_get(&txid) { + Ok(tx) => tx, + Err(electrum_client::Error::Protocol(_)) => continue, + Err(other_err) => return Err(other_err.into()), + }; + + let spk = tx + .output + .get(0) + .map(|txo| &txo.script_pubkey) + .expect("tx must have an output"); + + let anchor = match client + .script_get_history(spk)? + .into_iter() + .find(|r| r.tx_hash == txid) + { + Some(r) => determine_tx_anchor(anchor_block, r.height, txid), + None => continue, + }; + + let tx_entry = update.graph_update.entry(txid).or_default(); + if let Some(anchor) = anchor { + tx_entry.insert(anchor); + } + } + Ok(()) +} + +fn populate_with_spks( + client: &Client, + anchor_block: BlockId, + update: &mut ElectrumUpdate, + spks: &mut impl Iterator, + stop_gap: usize, + batch_size: usize, +) -> Result, InternalError> { + let mut unused_spk_count = 0_usize; + let mut scanned_spks = BTreeMap::new(); + + loop { + let spks = (0..batch_size) + .map_while(|_| spks.next()) + .collect::>(); + if spks.is_empty() { + return Ok(scanned_spks); + } + + let spk_histories = client.batch_script_get_history(spks.iter().map(|(_, s)| s))?; + + for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { + if spk_history.is_empty() { + scanned_spks.insert(spk_index, (spk, false)); + unused_spk_count += 1; + if unused_spk_count > stop_gap { + return Ok(scanned_spks); + } + continue; + } else { + scanned_spks.insert(spk_index, (spk, true)); + unused_spk_count = 0; + } + + for tx in spk_history { + let tx_entry = update.graph_update.entry(tx.tx_hash).or_default(); + if let Some(anchor) = determine_tx_anchor(anchor_block, tx.height, tx.tx_hash) { + tx_entry.insert(anchor); + } + } + } + } +}