From cdfec5f90726f2313963c3a71e5d18bb10624736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 24 Apr 2024 16:05:59 +0800 Subject: [PATCH 1/4] feat(chain): add sync/full-scan structures for spk-based syncing These structures allows spk-based chain-sources to have a universal API. Co-authored-by: Steve Myers --- crates/chain/src/lib.rs | 1 + crates/chain/src/spk_client.rs | 315 +++++++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 crates/chain/src/spk_client.rs diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index 20656697..a05a4c3e 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -51,6 +51,7 @@ pub use descriptor_ext::DescriptorExt; mod spk_iter; #[cfg(feature = "miniscript")] pub use spk_iter::*; +pub mod spk_client; #[allow(unused_imports)] #[macro_use] diff --git a/crates/chain/src/spk_client.rs b/crates/chain/src/spk_client.rs new file mode 100644 index 00000000..7873ba22 --- /dev/null +++ b/crates/chain/src/spk_client.rs @@ -0,0 +1,315 @@ +//! Helper types for spk-based blockchain clients. + +use core::{fmt::Debug, ops::RangeBounds}; + +use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; +use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; + +use crate::{local_chain::CheckPoint, ConfirmationTimeHeightAnchor, TxGraph}; + +/// Data required to perform a spk-based blockchain client sync. +/// +/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and +/// outpoints. The sync process also updates the chain from the given [`CheckPoint`]. +pub struct SyncRequest { + /// A checkpoint for the current chain [`LocalChain::tip`]. + /// The sync process will return a new chain update that extends this tip. + /// + /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip + pub chain_tip: CheckPoint, + /// Transactions that spend from or to these indexed script pubkeys. + pub spks: Box + Send>, + /// Transactions with these txids. + pub txids: Box + Send>, + /// Transactions with these outpoints or spent from these outpoints. + pub outpoints: Box + Send>, +} + +impl SyncRequest { + /// Construct a new [`SyncRequest`] from a given `cp` tip. + pub fn from_chain_tip(cp: CheckPoint) -> Self { + Self { + chain_tip: cp, + spks: Box::new(core::iter::empty()), + txids: Box::new(core::iter::empty()), + outpoints: Box::new(core::iter::empty()), + } + } + + /// Set the [`Script`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn set_spks( + mut self, + spks: impl IntoIterator + Send + 'static>, + ) -> Self { + self.spks = Box::new(spks.into_iter()); + self + } + + /// Set the [`Txid`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn set_txids( + mut self, + txids: impl IntoIterator + Send + 'static>, + ) -> Self { + self.txids = Box::new(txids.into_iter()); + self + } + + /// Set the [`OutPoint`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn set_outpoints( + mut self, + outpoints: impl IntoIterator + Send + 'static>, + ) -> Self { + self.outpoints = Box::new(outpoints.into_iter()); + self + } + + /// Chain on additional [`Script`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn chain_spks( + mut self, + spks: impl IntoIterator< + IntoIter = impl Iterator + Send + 'static, + Item = ScriptBuf, + >, + ) -> Self { + self.spks = Box::new(self.spks.chain(spks)); + self + } + + /// Chain on additional [`Txid`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn chain_txids( + mut self, + txids: impl IntoIterator + Send + 'static, Item = Txid>, + ) -> Self { + self.txids = Box::new(self.txids.chain(txids)); + self + } + + /// Chain on additional [`OutPoint`]s that will be synced against. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn chain_outpoints( + mut self, + outpoints: impl IntoIterator< + IntoIter = impl Iterator + Send + 'static, + Item = OutPoint, + >, + ) -> Self { + self.outpoints = Box::new(self.outpoints.chain(outpoints)); + self + } + + /// Add a closure that will be called for each [`Script`] synced in this request. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn inspect_spks(mut self, inspect: impl Fn(&Script) + Send + Sync + 'static) -> Self { + self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk))); + self + } + + /// Add a closure that will be called for each [`Txid`] synced in this request. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn inspect_txids(mut self, inspect: impl Fn(&Txid) + Send + Sync + 'static) -> Self { + self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid))); + self + } + + /// Add a closure that will be called for each [`OutPoint`] synced in this request. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn inspect_outpoints( + mut self, + inspect: impl Fn(&OutPoint) + Send + Sync + 'static, + ) -> Self { + self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op))); + self + } + + /// Populate the request with revealed script pubkeys from `index` with the given `spk_range`. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[cfg(feature = "miniscript")] + #[must_use] + pub fn populate_with_revealed_spks( + self, + index: &crate::keychain::KeychainTxOutIndex, + spk_range: impl RangeBounds, + ) -> Self { + use alloc::borrow::ToOwned; + self.chain_spks( + index + .revealed_spks(spk_range) + .map(|(_, _, spk)| spk.to_owned()) + .collect::>(), + ) + } +} + +/// Data returned from a spk-based blockchain client sync. +/// +/// See also [`SyncRequest`]. +pub struct SyncResult { + /// The update to apply to the receiving [`TxGraph`]. + pub graph_update: TxGraph, + /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). + pub chain_update: CheckPoint, +} + +/// Data required to perform a spk-based blockchain client full scan. +/// +/// A client full scan iterates through all the scripts for the given keychains, fetching relevant +/// data until some stop gap number of scripts is found that have no data. This operation is +/// generally only used when importing or restoring previously used keychains in which the list of +/// used scripts is not known. The full scan process also updates the chain from the given [`CheckPoint`]. +pub struct FullScanRequest { + /// A checkpoint for the current [`LocalChain::tip`]. + /// The full scan process will return a new chain update that extends this tip. + /// + /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip + pub chain_tip: CheckPoint, + /// Iterators of script pubkeys indexed by the keychain index. + pub spks_by_keychain: BTreeMap + Send>>, +} + +impl FullScanRequest { + /// Construct a new [`FullScanRequest`] from a given `chain_tip`. + #[must_use] + pub fn from_chain_tip(chain_tip: CheckPoint) -> Self { + Self { + chain_tip, + spks_by_keychain: BTreeMap::new(), + } + } + + /// Construct a new [`FullScanRequest`] from a given `chain_tip` and `index`. + /// + /// Unbounded script pubkey iterators for each keychain (`K`) are extracted using + /// [`KeychainTxOutIndex::all_unbounded_spk_iters`] and is used to populate the + /// [`FullScanRequest`]. + /// + /// [`KeychainTxOutIndex::all_unbounded_spk_iters`]: crate::keychain::KeychainTxOutIndex::all_unbounded_spk_iters + #[cfg(feature = "miniscript")] + #[must_use] + pub fn from_keychain_txout_index( + chain_tip: CheckPoint, + index: &crate::keychain::KeychainTxOutIndex, + ) -> Self + where + K: Debug, + { + let mut req = Self::from_chain_tip(chain_tip); + for (keychain, spks) in index.all_unbounded_spk_iters() { + req = req.set_spks_for_keychain(keychain, spks); + } + req + } + + /// Set the [`Script`]s for a given `keychain`. + /// + /// This consumes the [`FullScanRequest`] and returns the updated one. + #[must_use] + pub fn set_spks_for_keychain( + mut self, + keychain: K, + spks: impl IntoIterator + Send + 'static>, + ) -> Self { + self.spks_by_keychain + .insert(keychain, Box::new(spks.into_iter())); + self + } + + /// Chain on additional [`Script`]s that will be synced against. + /// + /// This consumes the [`FullScanRequest`] and returns the updated one. + #[must_use] + pub fn chain_spks_for_keychain( + mut self, + keychain: K, + spks: impl IntoIterator + Send + 'static>, + ) -> Self { + match self.spks_by_keychain.remove(&keychain) { + Some(keychain_spks) => self + .spks_by_keychain + .insert(keychain, Box::new(keychain_spks.chain(spks.into_iter()))), + None => self + .spks_by_keychain + .insert(keychain, Box::new(spks.into_iter())), + }; + self + } + + /// Add a closure that will be called for every [`Script`] previously added to any keychain in + /// this request. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn inspect_spks_for_all_keychains( + mut self, + inspect: impl FnMut(K, u32, &Script) + Send + Sync + Clone + 'static, + ) -> Self + where + K: Send + 'static, + { + for (keychain, spks) in core::mem::take(&mut self.spks_by_keychain) { + let mut inspect = inspect.clone(); + self.spks_by_keychain.insert( + keychain.clone(), + Box::new(spks.inspect(move |(i, spk)| inspect(keychain.clone(), *i, spk))), + ); + } + self + } + + /// Add a closure that will be called for every [`Script`] previously added to a given + /// `keychain` in this request. + /// + /// This consumes the [`SyncRequest`] and returns the updated one. + #[must_use] + pub fn inspect_spks_for_keychain( + mut self, + keychain: K, + mut inspect: impl FnMut(u32, &Script) + Send + Sync + 'static, + ) -> Self + where + K: Send + 'static, + { + if let Some(spks) = self.spks_by_keychain.remove(&keychain) { + self.spks_by_keychain.insert( + keychain, + Box::new(spks.inspect(move |(i, spk)| inspect(*i, spk))), + ); + } + self + } +} + +/// Data returned from a spk-based blockchain client full scan. +/// +/// See also [`FullScanRequest`]. +pub struct FullScanResult { + /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). + pub graph_update: TxGraph, + /// The update to apply to the receiving [`TxGraph`]. + pub chain_update: CheckPoint, + /// Last active indices for the corresponding keychains (`K`). + pub last_active_indices: BTreeMap, +} From 4c52f3e08e85e73db379ee3411cceb8dbcec92e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 24 Apr 2024 16:54:03 +0800 Subject: [PATCH 2/4] feat(wallet): make wallet compatible with sync/full-scan structures * Changed `Wallet::apply_update` to also take in anything that implements `Into`. This allows us to directly apply a `FullScanResult` or `SyncResult`. * Added `start_full_scan` and `start_sync_with_revealed_spks` methods to `Wallet`. Co-authored-by: Steve Myers --- crates/bdk/src/wallet/mod.rs | 49 +++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/bdk/src/wallet/mod.rs b/crates/bdk/src/wallet/mod.rs index 1e24dd50..b959a3ad 100644 --- a/crates/bdk/src/wallet/mod.rs +++ b/crates/bdk/src/wallet/mod.rs @@ -26,6 +26,7 @@ use bdk_chain::{ local_chain::{ self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, }, + spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut, IndexedTxGraph, Persist, PersistBackend, @@ -110,6 +111,26 @@ pub struct Update { pub chain: Option, } +impl From> for Update { + fn from(value: FullScanResult) -> Self { + Self { + last_active_indices: value.last_active_indices, + graph: value.graph_update, + chain: Some(value.chain_update), + } + } +} + +impl From for Update { + fn from(value: SyncResult) -> Self { + Self { + last_active_indices: BTreeMap::new(), + graph: value.graph_update, + chain: Some(value.chain_update), + } + } +} + /// The changes made to a wallet by applying an [`Update`]. #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, Default)] pub struct ChangeSet { @@ -2262,7 +2283,8 @@ impl Wallet { /// transactions related to your wallet into it. /// /// [`commit`]: Self::commit - pub fn apply_update(&mut self, update: Update) -> Result<(), CannotConnectError> { + pub fn apply_update(&mut self, update: impl Into) -> Result<(), CannotConnectError> { + let update = update.into(); let mut changeset = match update.chain { Some(chain_update) => ChangeSet::from(self.chain.apply_update(chain_update)?), None => ChangeSet::default(), @@ -2387,6 +2409,31 @@ impl Wallet { } } +/// Methods to construct sync/full-scan requests for spk-based chain sources. +impl Wallet { + /// Create a partial [`SyncRequest`] for this wallet for all revealed spks. + /// + /// This is the first step when performing a spk-based wallet partial sync, the returned + /// [`SyncRequest`] collects all revealed script pubkeys from the wallet keychain needed to + /// start a blockchain sync with a spk based blockchain client. + pub fn start_sync_with_revealed_spks(&self) -> SyncRequest { + SyncRequest::from_chain_tip(self.chain.tip()) + .populate_with_revealed_spks(&self.indexed_graph.index, ..) + } + + /// Create a [`FullScanRequest] for this wallet. + /// + /// This is the first step when performing a spk-based wallet full scan, the returned + /// [`FullScanRequest] collects iterators for the wallet's keychain script pub keys needed to + /// start a blockchain full scan with a spk based blockchain client. + /// + /// This operation is generally only used when importing or restoring a previously used wallet + /// in which the list of used scripts is not known. + pub fn start_full_scan(&self) -> FullScanRequest { + FullScanRequest::from_keychain_txout_index(self.chain.tip(), &self.indexed_graph.index) + } +} + impl AsRef> for Wallet { fn as_ref(&self) -> &bdk_chain::tx_graph::TxGraph { self.indexed_graph.graph() From 0f94f24aaf6374fe2d7c2abf32a870d3849fb8cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 25 Apr 2024 10:36:06 +0800 Subject: [PATCH 3/4] feat(esplora)!: update to use new sync/full-scan structures --- crates/esplora/src/async_ext.rs | 82 ++++++++------ crates/esplora/src/blocking_ext.rs | 69 +++++------- crates/esplora/src/lib.rs | 22 +--- crates/esplora/tests/async_ext.rs | 65 ++++++----- crates/esplora/tests/blocking_ext.rs | 57 ++++++---- example-crates/example_esplora/src/main.rs | 106 ++++++++---------- .../wallet_esplora_async/src/main.rs | 59 +++++----- .../wallet_esplora_blocking/src/main.rs | 43 +++---- 8 files changed, 243 insertions(+), 260 deletions(-) diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 9d02646c..2942d274 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,6 +1,7 @@ use std::collections::BTreeSet; use async_trait::async_trait; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, @@ -11,7 +12,7 @@ use bdk_chain::{ use esplora_client::{Amount, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::{anchor_from_status, FullScanUpdate, SyncUpdate}; +use crate::anchor_from_status; /// [`esplora_client::Error`] type Error = Box; @@ -50,14 +51,10 @@ pub trait EsploraAsyncExt { /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip async fn full_scan( &self, - local_tip: CheckPoint, - keychain_spks: BTreeMap< - K, - impl IntoIterator + Send> + Send, - >, + request: FullScanRequest, stop_gap: usize, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. @@ -75,12 +72,9 @@ pub trait EsploraAsyncExt { /// [`full_scan`]: EsploraAsyncExt::full_scan async fn sync( &self, - local_tip: CheckPoint, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, + request: SyncRequest, parallel_requests: usize, - ) -> Result; + ) -> Result; } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -88,42 +82,56 @@ pub trait EsploraAsyncExt { impl EsploraAsyncExt for esplora_client::AsyncClient { async fn full_scan( &self, - local_tip: CheckPoint, - keychain_spks: BTreeMap< - K, - impl IntoIterator + Send> + Send, - >, + request: FullScanRequest, stop_gap: usize, parallel_requests: usize, - ) -> Result, Error> { + ) -> Result, Error> { let latest_blocks = fetch_latest_blocks(self).await?; - let (tx_graph, last_active_indices) = - full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?; - let local_chain = - chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; - Ok(FullScanUpdate { - local_chain, - tx_graph, + let (graph_update, last_active_indices) = full_scan_for_index_and_graph( + self, + request.spks_by_keychain, + stop_gap, + parallel_requests, + ) + .await?; + let chain_update = chain_update( + self, + &latest_blocks, + &request.chain_tip, + graph_update.all_anchors(), + ) + .await?; + Ok(FullScanResult { + chain_update, + graph_update, last_active_indices, }) } async fn sync( &self, - local_tip: CheckPoint, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, + request: SyncRequest, parallel_requests: usize, - ) -> Result { + ) -> Result { let latest_blocks = fetch_latest_blocks(self).await?; - let tx_graph = - sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?; - let local_chain = - chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; - Ok(SyncUpdate { - tx_graph, - local_chain, + let graph_update = sync_for_index_and_graph( + self, + request.spks, + request.txids, + request.outpoints, + parallel_requests, + ) + .await?; + let chain_update = chain_update( + self, + &latest_blocks, + &request.chain_tip, + graph_update.all_anchors(), + ) + .await?; + Ok(SyncResult { + chain_update, + graph_update, }) } } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 70373856..469ab52e 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -3,6 +3,7 @@ use std::thread::JoinHandle; use std::usize; use bdk_chain::collections::BTreeMap; +use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, @@ -12,8 +13,6 @@ use bdk_chain::{ use esplora_client::TxStatus; use crate::anchor_from_status; -use crate::FullScanUpdate; -use crate::SyncUpdate; /// [`esplora_client::Error`] pub type Error = Box; @@ -50,11 +49,10 @@ pub trait EsploraExt { /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip fn full_scan( &self, - local_tip: CheckPoint, - keychain_spks: BTreeMap>, + request: FullScanRequest, stop_gap: usize, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. @@ -70,59 +68,54 @@ pub trait EsploraExt { /// /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`full_scan`]: EsploraExt::full_scan - fn sync( - &self, - local_tip: CheckPoint, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result; + fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result; } impl EsploraExt for esplora_client::BlockingClient { fn full_scan( &self, - local_tip: CheckPoint, - keychain_spks: BTreeMap>, + request: FullScanRequest, stop_gap: usize, parallel_requests: usize, - ) -> Result, Error> { + ) -> Result, Error> { let latest_blocks = fetch_latest_blocks(self)?; - let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( + let (graph_update, last_active_indices) = full_scan_for_index_and_graph_blocking( self, - keychain_spks, + request.spks_by_keychain, stop_gap, parallel_requests, )?; - let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; - Ok(FullScanUpdate { - local_chain, - tx_graph, + let chain_update = chain_update( + self, + &latest_blocks, + &request.chain_tip, + graph_update.all_anchors(), + )?; + Ok(FullScanResult { + chain_update, + graph_update, last_active_indices, }) } - fn sync( - &self, - local_tip: CheckPoint, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result { + fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result { let latest_blocks = fetch_latest_blocks(self)?; - let tx_graph = sync_for_index_and_graph_blocking( + let graph_update = sync_for_index_and_graph_blocking( self, - misc_spks, - txids, - outpoints, + request.spks, + request.txids, + request.outpoints, parallel_requests, )?; - let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; - Ok(SyncUpdate { - local_chain, - tx_graph, + let chain_update = chain_update( + self, + &latest_blocks, + &request.chain_tip, + graph_update.all_anchors(), + )?; + Ok(SyncResult { + chain_update, + graph_update, }) } } diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 37d7dd26..535167ff 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -16,9 +16,7 @@ //! [`TxGraph`]: bdk_chain::tx_graph::TxGraph //! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora -use std::collections::BTreeMap; - -use bdk_chain::{local_chain::CheckPoint, BlockId, ConfirmationTimeHeightAnchor, TxGraph}; +use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor}; use esplora_client::TxStatus; pub use esplora_client; @@ -50,21 +48,3 @@ fn anchor_from_status(status: &TxStatus) -> Option None } } - -/// Update returns from a full scan. -pub struct FullScanUpdate { - /// The update to apply to the receiving [`LocalChain`](bdk_chain::local_chain::LocalChain). - pub local_chain: CheckPoint, - /// The update to apply to the receiving [`TxGraph`]. - pub tx_graph: TxGraph, - /// Last active indices for the corresponding keychains (`K`). - pub last_active_indices: BTreeMap, -} - -/// Update returned from a sync. -pub struct SyncUpdate { - /// The update to apply to the receiving [`LocalChain`](bdk_chain::local_chain::LocalChain). - pub local_chain: CheckPoint, - /// The update to apply to the receiving [`TxGraph`]. - pub tx_graph: TxGraph, -} diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index f6954fe1..6f7956d4 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,8 +1,9 @@ +use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_esplora::EsploraAsyncExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use esplora_client::{self, Builder}; -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; @@ -55,20 +56,15 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { // use a full checkpoint linked list (since this is not what we are testing) let cp_tip = env.make_checkpoint_tip(); - let sync_update = client - .sync( - cp_tip.clone(), - misc_spks.into_iter(), - vec![].into_iter(), - vec![].into_iter(), - 1, - ) - .await?; + let sync_update = { + let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + client.sync(request, 1).await? + }; assert!( { let update_cps = sync_update - .local_chain + .chain_update .iter() .map(|cp| cp.block_id()) .collect::>(); @@ -81,7 +77,7 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { "update should not alter original checkpoint tip since we already started with all checkpoints", ); - let graph_update = sync_update.tx_graph; + let graph_update = sync_update.graph_update; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -142,8 +138,6 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { .enumerate() .map(|(i, addr)| (i as u32, addr.script_pubkey())) .collect(); - let mut keychains = BTreeMap::new(); - keychains.insert(0, spks); // Then receive coins on the 4th address. let txid_4th_addr = env.bitcoind.client.send_to_address( @@ -166,16 +160,25 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let full_scan_update = client - .full_scan(cp_tip.clone(), keychains.clone(), 3, 1) - .await?; - assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 3, 1).await? + }; + assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); - let full_scan_update = client - .full_scan(cp_tip.clone(), keychains.clone(), 4, 1) - .await?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 4, 1).await? + }; assert_eq!( - full_scan_update.tx_graph.full_txs().next().unwrap().txid, + full_scan_update + .graph_update + .full_txs() + .next() + .unwrap() + .txid, txid_4th_addr ); assert_eq!(full_scan_update.last_active_indices[&0], 3); @@ -198,20 +201,26 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let full_scan_update = client - .full_scan(cp_tip.clone(), keychains.clone(), 5, 1) - .await?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 5, 1).await? + }; let txs: HashSet<_> = full_scan_update - .tx_graph + .graph_update .full_txs() .map(|tx| tx.txid) .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); - let full_scan_update = client.full_scan(cp_tip, keychains, 6, 1).await?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 6, 1).await? + }; let txs: HashSet<_> = full_scan_update - .tx_graph + .graph_update .full_txs() .map(|tx| tx.txid) .collect(); diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 40e446a4..61c2466d 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,8 +1,9 @@ +use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_esplora::EsploraExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use esplora_client::{self, Builder}; -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; @@ -55,18 +56,15 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { // use a full checkpoint linked list (since this is not what we are testing) let cp_tip = env.make_checkpoint_tip(); - let sync_update = client.sync( - cp_tip.clone(), - misc_spks.into_iter(), - vec![].into_iter(), - vec![].into_iter(), - 1, - )?; + let sync_update = { + let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + client.sync(request, 1)? + }; assert!( { let update_cps = sync_update - .local_chain + .chain_update .iter() .map(|cp| cp.block_id()) .collect::>(); @@ -79,7 +77,7 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { "update should not alter original checkpoint tip since we already started with all checkpoints", ); - let graph_update = sync_update.tx_graph; + let graph_update = sync_update.graph_update; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -141,8 +139,6 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { .enumerate() .map(|(i, addr)| (i as u32, addr.script_pubkey())) .collect(); - let mut keychains = BTreeMap::new(); - keychains.insert(0, spks); // Then receive coins on the 4th address. let txid_4th_addr = env.bitcoind.client.send_to_address( @@ -165,12 +161,25 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 3, 1)?; - assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 3, 1)? + }; + assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); - let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 4, 1)?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 4, 1)? + }; assert_eq!( - full_scan_update.tx_graph.full_txs().next().unwrap().txid, + full_scan_update + .graph_update + .full_txs() + .next() + .unwrap() + .txid, txid_4th_addr ); assert_eq!(full_scan_update.last_active_indices[&0], 3); @@ -193,18 +202,26 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 5, 1)?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 5, 1)? + }; let txs: HashSet<_> = full_scan_update - .tx_graph + .graph_update .full_txs() .map(|tx| tx.txid) .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); - let full_scan_update = client.full_scan(cp_tip.clone(), keychains, 6, 1)?; + let full_scan_update = { + let request = + FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + client.full_scan(request, 6, 1)? + }; let txs: HashSet<_> = full_scan_update - .tx_graph + .graph_update .full_txs() .map(|tx| tx.txid) .collect(); diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 33aab276..46eb18b8 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -1,14 +1,15 @@ use std::{ - collections::BTreeMap, + collections::BTreeSet, io::{self, Write}, sync::Mutex, }; use bdk_chain::{ - bitcoin::{constants::genesis_block, Address, Network, OutPoint, ScriptBuf, Txid}, + bitcoin::{constants::genesis_block, Address, Network, Txid}, indexed_tx_graph::{self, IndexedTxGraph}, keychain, local_chain::{self, LocalChain}, + spk_client::{FullScanRequest, SyncRequest}, Append, ConfirmationTimeHeightAnchor, }; @@ -167,45 +168,34 @@ fn main() -> anyhow::Result<()> { scan_options, .. } => { - let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); - let keychain_spks = graph - .lock() - .expect("mutex must not be poisoned") - .index - .all_unbounded_spk_iters() - .into_iter() - // This `map` is purely for logging. - .map(|(keychain, iter)| { - let mut first = true; - let spk_iter = iter.inspect(move |(i, _)| { - if first { - eprint!("\nscanning {}: ", keychain); - first = false; + let request = { + let chain_tip = chain.lock().expect("mutex must not be poisoned").tip(); + let indexed_graph = &*graph.lock().expect("mutex must not be poisoned"); + FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index) + .inspect_spks_for_all_keychains({ + let mut once = BTreeSet::::new(); + move |keychain, spk_i, _| { + if once.insert(keychain) { + eprint!("\nscanning {}: ", keychain); + } + eprint!("{} ", spk_i); + // Flush early to ensure we print at every iteration. + let _ = io::stderr().flush(); } - eprint!("{} ", i); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - }); - (keychain, spk_iter) - }) - .collect::>(); + }) + }; // The client scans keychain spks for transaction histories, stopping after `stop_gap` // is reached. It returns a `TxGraph` update (`graph_update`) and a structure that // represents the last active spk derivation indices of keychains // (`keychain_indices_update`). let mut update = client - .full_scan( - local_tip, - keychain_spks, - *stop_gap, - scan_options.parallel_requests, - ) + .full_scan(request, *stop_gap, scan_options.parallel_requests) .context("scanning for transactions")?; // We want to keep track of the latest time a transaction was seen unconfirmed. let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update.tx_graph.update_last_seen_unconfirmed(now); + let _ = update.graph_update.update_last_seen_unconfirmed(now); let mut graph = graph.lock().expect("mutex must not be poisoned"); let mut chain = chain.lock().expect("mutex must not be poisoned"); @@ -213,11 +203,11 @@ fn main() -> anyhow::Result<()> { // deriviation indices. Usually before a scan you are on a fresh wallet with no // addresses derived so we need to derive up to last active addresses the scan found // before adding the transactions. - (chain.apply_update(update.local_chain)?, { + (chain.apply_update(update.chain_update)?, { let (_, index_changeset) = graph .index .reveal_to_target_multi(&update.last_active_indices); - let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_graph); + let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update); indexed_tx_graph_changeset.append(index_changeset.into()); indexed_tx_graph_changeset }) @@ -241,12 +231,9 @@ fn main() -> anyhow::Result<()> { unused_spks = false; } - // Spks, outpoints and txids we want updates on will be accumulated here. - let mut spks: Box> = Box::new(core::iter::empty()); - let mut outpoints: Box> = Box::new(core::iter::empty()); - let mut txids: Box> = Box::new(core::iter::empty()); - let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); + // Spks, outpoints and txids we want updates on will be accumulated here. + let mut request = SyncRequest::from_chain_tip(local_tip.clone()); // Get a short lock on the structures to get spks, utxos, and txs that we are interested // in. @@ -260,12 +247,12 @@ fn main() -> anyhow::Result<()> { .revealed_spks(..) .map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned())) .collect::>(); - spks = Box::new(spks.chain(all_spks.into_iter().map(|(k, i, spk)| { + request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| { eprintln!("scanning {}:{}", k, i); // Flush early to ensure we print at every iteration. let _ = io::stderr().flush(); spk - }))); + })); } if unused_spks { let unused_spks = graph @@ -273,17 +260,18 @@ fn main() -> anyhow::Result<()> { .unused_spks() .map(|(k, i, spk)| (k, i, spk.to_owned())) .collect::>(); - spks = Box::new(spks.chain(unused_spks.into_iter().map(|(k, i, spk)| { - eprintln!( - "Checking if address {} {}:{} has been used", - Address::from_script(&spk, args.network).unwrap(), - k, - i, - ); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - spk - }))); + request = + request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| { + eprintln!( + "Checking if address {} {}:{} has been used", + Address::from_script(&spk, args.network).unwrap(), + k, + i, + ); + // Flush early to ensure we print at every iteration. + let _ = io::stderr().flush(); + spk + })); } if utxos { // We want to search for whether the UTXO is spent, and spent by which @@ -295,7 +283,7 @@ fn main() -> anyhow::Result<()> { .filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints) .map(|(_, utxo)| utxo) .collect::>(); - outpoints = Box::new( + request = request.chain_outpoints( utxos .into_iter() .inspect(|utxo| { @@ -319,7 +307,7 @@ fn main() -> anyhow::Result<()> { .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) .map(|canonical_tx| canonical_tx.tx_node.txid) .collect::>(); - txids = Box::new(unconfirmed_txids.into_iter().inspect(|txid| { + request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| { eprintln!("Checking if {} is confirmed yet", txid); // Flush early to ensure we print at every iteration. let _ = io::stderr().flush(); @@ -327,21 +315,15 @@ fn main() -> anyhow::Result<()> { } } - let mut update = client.sync( - local_tip, - spks, - txids, - outpoints, - scan_options.parallel_requests, - )?; + let mut update = client.sync(request, scan_options.parallel_requests)?; // Update last seen unconfirmed let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update.tx_graph.update_last_seen_unconfirmed(now); + let _ = update.graph_update.update_last_seen_unconfirmed(now); ( - chain.lock().unwrap().apply_update(update.local_chain)?, - graph.lock().unwrap().apply_update(update.tx_graph), + chain.lock().unwrap().apply_update(update.chain_update)?, + graph.lock().unwrap().apply_update(update.graph_update), ) } }; diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index c37b6e66..d89e5fd2 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -1,8 +1,7 @@ -use std::{io::Write, str::FromStr}; +use std::{collections::BTreeSet, io::Write, str::FromStr}; use bdk::{ - bitcoin::{Address, Network}, - wallet::Update, + bitcoin::{Address, Network, Script}, KeychainKind, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraAsyncExt}; @@ -37,34 +36,44 @@ async fn main() -> Result<(), anyhow::Error> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?; - let prev_tip = wallet.latest_checkpoint(); - let keychain_spks = wallet - .all_unbounded_spk_iters() - .into_iter() - .map(|(k, k_spks)| { - let mut once = Some(()); - let mut stdout = std::io::stdout(); - let k_spks = k_spks - .inspect(move |(spk_i, _)| match once.take() { - Some(_) => print!("\nScanning keychain [{:?}]", k), - None => print!(" {:<3}", spk_i), - }) - .inspect(move |_| stdout.flush().expect("must flush")); - (k, k_spks) + fn generate_inspect(kind: KeychainKind) -> impl FnMut(u32, &Script) + Send + Sync + 'static { + let mut once = Some(()); + let mut stdout = std::io::stdout(); + move |spk_i, _| { + match once.take() { + Some(_) => print!("\nScanning keychain [{:?}]", kind), + None => print!(" {:<3}", spk_i), + }; + stdout.flush().expect("must flush"); + } + } + let request = wallet + .start_full_scan() + .inspect_spks_for_all_keychains({ + let mut once = BTreeSet::::new(); + move |keychain, spk_i, _| { + match once.insert(keychain) { + true => print!("\nScanning keychain [{:?}]", keychain), + false => print!(" {:<3}", spk_i), + } + std::io::stdout().flush().expect("must flush") + } }) - .collect(); + .inspect_spks_for_keychain( + KeychainKind::External, + generate_inspect(KeychainKind::External), + ) + .inspect_spks_for_keychain( + KeychainKind::Internal, + generate_inspect(KeychainKind::Internal), + ); let mut update = client - .full_scan(prev_tip, keychain_spks, STOP_GAP, PARALLEL_REQUESTS) + .full_scan(request, STOP_GAP, PARALLEL_REQUESTS) .await?; let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update.tx_graph.update_last_seen_unconfirmed(now); + let _ = update.graph_update.update_last_seen_unconfirmed(now); - let update = Update { - last_active_indices: update.last_active_indices, - graph: update.tx_graph, - chain: Some(update.local_chain), - }; wallet.apply_update(update)?; wallet.commit()?; println!(); diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 979e272f..6028bb7d 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -3,11 +3,10 @@ const SEND_AMOUNT: u64 = 1000; const STOP_GAP: usize = 5; const PARALLEL_REQUESTS: usize = 1; -use std::{io::Write, str::FromStr}; +use std::{collections::BTreeSet, io::Write, str::FromStr}; use bdk::{ bitcoin::{Address, Network}, - wallet::Update, KeychainKind, SignOptions, Wallet, }; use bdk_esplora::{esplora_client, EsploraExt}; @@ -36,36 +35,22 @@ fn main() -> Result<(), anyhow::Error> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking(); - let keychain_spks = wallet - .all_unbounded_spk_iters() - .into_iter() - .map(|(k, k_spks)| { - let mut once = Some(()); - let mut stdout = std::io::stdout(); - let k_spks = k_spks - .inspect(move |(spk_i, _)| match once.take() { - Some(_) => print!("\nScanning keychain [{:?}]", k), - None => print!(" {:<3}", spk_i), - }) - .inspect(move |_| stdout.flush().expect("must flush")); - (k, k_spks) - }) - .collect(); + let request = wallet.start_full_scan().inspect_spks_for_all_keychains({ + let mut once = BTreeSet::::new(); + move |keychain, spk_i, _| { + match once.insert(keychain) { + true => print!("\nScanning keychain [{:?}]", keychain), + false => print!(" {:<3}", spk_i), + }; + std::io::stdout().flush().expect("must flush") + } + }); - let mut update = client.full_scan( - wallet.latest_checkpoint(), - keychain_spks, - STOP_GAP, - PARALLEL_REQUESTS, - )?; + let mut update = client.full_scan(request, STOP_GAP, PARALLEL_REQUESTS)?; let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update.tx_graph.update_last_seen_unconfirmed(now); + let _ = update.graph_update.update_last_seen_unconfirmed(now); - wallet.apply_update(Update { - last_active_indices: update.last_active_indices, - graph: update.tx_graph, - chain: Some(update.local_chain), - })?; + wallet.apply_update(update)?; wallet.commit()?; println!(); From c0374a0eeb155f1cb4f9d99223f6bc65d24f7df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 27 Apr 2024 20:40:08 +0800 Subject: [PATCH 4/4] feat(chain): `SyncRequest` now uses `ExactSizeIterator`s This allows the caller to track sync progress. --- crates/chain/src/spk_client.rs | 107 +++++++++++++++++---- example-crates/example_esplora/src/main.rs | 33 ++++++- 2 files changed, 117 insertions(+), 23 deletions(-) diff --git a/crates/chain/src/spk_client.rs b/crates/chain/src/spk_client.rs index 7873ba22..eefa211c 100644 --- a/crates/chain/src/spk_client.rs +++ b/crates/chain/src/spk_client.rs @@ -1,6 +1,6 @@ //! Helper types for spk-based blockchain clients. -use core::{fmt::Debug, ops::RangeBounds}; +use core::{fmt::Debug, marker::PhantomData, ops::RangeBounds}; use alloc::{boxed::Box, collections::BTreeMap, vec::Vec}; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; @@ -18,11 +18,11 @@ pub struct SyncRequest { /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip pub chain_tip: CheckPoint, /// Transactions that spend from or to these indexed script pubkeys. - pub spks: Box + Send>, + pub spks: Box + Send>, /// Transactions with these txids. - pub txids: Box + Send>, + pub txids: Box + Send>, /// Transactions with these outpoints or spent from these outpoints. - pub outpoints: Box + Send>, + pub outpoints: Box + Send>, } impl SyncRequest { @@ -42,7 +42,7 @@ impl SyncRequest { #[must_use] pub fn set_spks( mut self, - spks: impl IntoIterator + Send + 'static>, + spks: impl IntoIterator + Send + 'static>, ) -> Self { self.spks = Box::new(spks.into_iter()); self @@ -54,7 +54,7 @@ impl SyncRequest { #[must_use] pub fn set_txids( mut self, - txids: impl IntoIterator + Send + 'static>, + txids: impl IntoIterator + Send + 'static>, ) -> Self { self.txids = Box::new(txids.into_iter()); self @@ -66,7 +66,9 @@ impl SyncRequest { #[must_use] pub fn set_outpoints( mut self, - outpoints: impl IntoIterator + Send + 'static>, + outpoints: impl IntoIterator< + IntoIter = impl ExactSizeIterator + Send + 'static, + >, ) -> Self { self.outpoints = Box::new(outpoints.into_iter()); self @@ -79,11 +81,11 @@ impl SyncRequest { pub fn chain_spks( mut self, spks: impl IntoIterator< - IntoIter = impl Iterator + Send + 'static, + IntoIter = impl ExactSizeIterator + Send + 'static, Item = ScriptBuf, >, ) -> Self { - self.spks = Box::new(self.spks.chain(spks)); + self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter())); self } @@ -93,9 +95,12 @@ impl SyncRequest { #[must_use] pub fn chain_txids( mut self, - txids: impl IntoIterator + Send + 'static, Item = Txid>, + txids: impl IntoIterator< + IntoIter = impl ExactSizeIterator + Send + 'static, + Item = Txid, + >, ) -> Self { - self.txids = Box::new(self.txids.chain(txids)); + self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter())); self } @@ -106,39 +111,42 @@ impl SyncRequest { pub fn chain_outpoints( mut self, outpoints: impl IntoIterator< - IntoIter = impl Iterator + Send + 'static, + IntoIter = impl ExactSizeIterator + Send + 'static, Item = OutPoint, >, ) -> Self { - self.outpoints = Box::new(self.outpoints.chain(outpoints)); + self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter())); self } - /// Add a closure that will be called for each [`Script`] synced in this request. + /// Add a closure that will be called for [`Script`]s previously added to this request. /// /// This consumes the [`SyncRequest`] and returns the updated one. #[must_use] - pub fn inspect_spks(mut self, inspect: impl Fn(&Script) + Send + Sync + 'static) -> Self { + pub fn inspect_spks( + mut self, + mut inspect: impl FnMut(&Script) + Send + Sync + 'static, + ) -> Self { self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk))); self } - /// Add a closure that will be called for each [`Txid`] synced in this request. + /// Add a closure that will be called for [`Txid`]s previously added to this request. /// /// This consumes the [`SyncRequest`] and returns the updated one. #[must_use] - pub fn inspect_txids(mut self, inspect: impl Fn(&Txid) + Send + Sync + 'static) -> Self { + pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self { self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid))); self } - /// Add a closure that will be called for each [`OutPoint`] synced in this request. + /// Add a closure that will be called for [`OutPoint`]s previously added to this request. /// /// This consumes the [`SyncRequest`] and returns the updated one. #[must_use] pub fn inspect_outpoints( mut self, - inspect: impl Fn(&OutPoint) + Send + Sync + 'static, + mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static, ) -> Self { self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op))); self @@ -313,3 +321,64 @@ pub struct FullScanResult { /// Last active indices for the corresponding keychains (`K`). pub last_active_indices: BTreeMap, } + +/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new +/// [`ExactSizeIterator`]. +/// +/// The danger of this is explained in [the `ExactSizeIterator` docs] +/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator). +/// This does not apply here since it would be impossible to scan an item count that overflows +/// `usize` anyway. +struct ExactSizeChain { + a: Option, + b: Option, + i: PhantomData, +} + +impl ExactSizeChain { + fn new(a: A, b: B) -> Self { + ExactSizeChain { + a: Some(a), + b: Some(b), + i: PhantomData, + } + } +} + +impl Iterator for ExactSizeChain +where + A: Iterator, + B: Iterator, +{ + type Item = I; + + fn next(&mut self) -> Option { + if let Some(a) = &mut self.a { + let item = a.next(); + if item.is_some() { + return item; + } + self.a = None; + } + if let Some(b) = &mut self.b { + let item = b.next(); + if item.is_some() { + return item; + } + self.b = None; + } + None + } +} + +impl ExactSizeIterator for ExactSizeChain +where + A: ExactSizeIterator, + B: ExactSizeIterator, +{ + fn len(&self) -> usize { + let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0); + let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0); + a_len + b_len + } +} diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 46eb18b8..e785bcc3 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -248,7 +248,7 @@ fn main() -> anyhow::Result<()> { .map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned())) .collect::>(); request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| { - eprintln!("scanning {}:{}", k, i); + eprint!("scanning {}:{}", k, i); // Flush early to ensure we print at every iteration. let _ = io::stderr().flush(); spk @@ -262,7 +262,7 @@ fn main() -> anyhow::Result<()> { .collect::>(); request = request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| { - eprintln!( + eprint!( "Checking if address {} {}:{} has been used", Address::from_script(&spk, args.network).unwrap(), k, @@ -287,7 +287,7 @@ fn main() -> anyhow::Result<()> { utxos .into_iter() .inspect(|utxo| { - eprintln!( + eprint!( "Checking if outpoint {} (value: {}) has been spent", utxo.outpoint, utxo.txout.value ); @@ -308,13 +308,38 @@ fn main() -> anyhow::Result<()> { .map(|canonical_tx| canonical_tx.tx_node.txid) .collect::>(); request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| { - eprintln!("Checking if {} is confirmed yet", txid); + eprint!("Checking if {} is confirmed yet", txid); // Flush early to ensure we print at every iteration. let _ = io::stderr().flush(); })); } } + let total_spks = request.spks.len(); + let total_txids = request.txids.len(); + let total_ops = request.outpoints.len(); + request = request + .inspect_spks({ + let mut visited = 0; + move |_| { + visited += 1; + eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32) + } + }) + .inspect_txids({ + let mut visited = 0; + move |_| { + visited += 1; + eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32) + } + }) + .inspect_outpoints({ + let mut visited = 0; + move |_| { + visited += 1; + eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32) + } + }); let mut update = client.sync(request, scan_options.parallel_requests)?; // Update last seen unconfirmed