feat(esplora)!: update to use new sync/full-scan structures

This commit is contained in:
志宇 2024-04-25 10:36:06 +08:00
parent 4c52f3e08e
commit 0f94f24aaf
No known key found for this signature in database
GPG Key ID: F6345C9837C2BDE8
8 changed files with 243 additions and 260 deletions

View File

@ -1,6 +1,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use async_trait::async_trait; use async_trait::async_trait;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::Anchor; use bdk_chain::Anchor;
use bdk_chain::{ use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
@ -11,7 +12,7 @@ use bdk_chain::{
use esplora_client::{Amount, TxStatus}; use esplora_client::{Amount, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt}; use futures::{stream::FuturesOrdered, TryStreamExt};
use crate::{anchor_from_status, FullScanUpdate, SyncUpdate}; use crate::anchor_from_status;
/// [`esplora_client::Error`] /// [`esplora_client::Error`]
type Error = Box<esplora_client::Error>; type Error = Box<esplora_client::Error>;
@ -50,14 +51,10 @@ pub trait EsploraAsyncExt {
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
async fn full_scan<K: Ord + Clone + Send>( async fn full_scan<K: Ord + Clone + Send>(
&self, &self,
local_tip: CheckPoint, request: FullScanRequest<K>,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
stop_gap: usize, stop_gap: usize,
parallel_requests: usize, parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error>; ) -> Result<FullScanResult<K>, Error>;
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`]. /// specified and return a [`TxGraph`].
@ -75,12 +72,9 @@ pub trait EsploraAsyncExt {
/// [`full_scan`]: EsploraAsyncExt::full_scan /// [`full_scan`]: EsploraAsyncExt::full_scan
async fn sync( async fn sync(
&self, &self,
local_tip: CheckPoint, request: SyncRequest,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize, parallel_requests: usize,
) -> Result<SyncUpdate, Error>; ) -> Result<SyncResult, Error>;
} }
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@ -88,42 +82,56 @@ pub trait EsploraAsyncExt {
impl EsploraAsyncExt for esplora_client::AsyncClient { impl EsploraAsyncExt for esplora_client::AsyncClient {
async fn full_scan<K: Ord + Clone + Send>( async fn full_scan<K: Ord + Clone + Send>(
&self, &self,
local_tip: CheckPoint, request: FullScanRequest<K>,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
stop_gap: usize, stop_gap: usize,
parallel_requests: usize, parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> { ) -> Result<FullScanResult<K>, Error> {
let latest_blocks = fetch_latest_blocks(self).await?; let latest_blocks = fetch_latest_blocks(self).await?;
let (tx_graph, last_active_indices) = let (graph_update, last_active_indices) = full_scan_for_index_and_graph(
full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?; self,
let local_chain = request.spks_by_keychain,
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; stop_gap,
Ok(FullScanUpdate { parallel_requests,
local_chain, )
tx_graph, .await?;
let chain_update = chain_update(
self,
&latest_blocks,
&request.chain_tip,
graph_update.all_anchors(),
)
.await?;
Ok(FullScanResult {
chain_update,
graph_update,
last_active_indices, last_active_indices,
}) })
} }
async fn sync( async fn sync(
&self, &self,
local_tip: CheckPoint, request: SyncRequest,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize, parallel_requests: usize,
) -> Result<SyncUpdate, Error> { ) -> Result<SyncResult, Error> {
let latest_blocks = fetch_latest_blocks(self).await?; let latest_blocks = fetch_latest_blocks(self).await?;
let tx_graph = let graph_update = sync_for_index_and_graph(
sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?; self,
let local_chain = request.spks,
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; request.txids,
Ok(SyncUpdate { request.outpoints,
tx_graph, parallel_requests,
local_chain, )
.await?;
let chain_update = chain_update(
self,
&latest_blocks,
&request.chain_tip,
graph_update.all_anchors(),
)
.await?;
Ok(SyncResult {
chain_update,
graph_update,
}) })
} }
} }

View File

@ -3,6 +3,7 @@ use std::thread::JoinHandle;
use std::usize; use std::usize;
use bdk_chain::collections::BTreeMap; use bdk_chain::collections::BTreeMap;
use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult};
use bdk_chain::Anchor; use bdk_chain::Anchor;
use bdk_chain::{ use bdk_chain::{
bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
@ -12,8 +13,6 @@ use bdk_chain::{
use esplora_client::TxStatus; use esplora_client::TxStatus;
use crate::anchor_from_status; use crate::anchor_from_status;
use crate::FullScanUpdate;
use crate::SyncUpdate;
/// [`esplora_client::Error`] /// [`esplora_client::Error`]
pub type Error = Box<esplora_client::Error>; pub type Error = Box<esplora_client::Error>;
@ -50,11 +49,10 @@ pub trait EsploraExt {
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
fn full_scan<K: Ord + Clone>( fn full_scan<K: Ord + Clone>(
&self, &self,
local_tip: CheckPoint, request: FullScanRequest<K>,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
stop_gap: usize, stop_gap: usize,
parallel_requests: usize, parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error>; ) -> Result<FullScanResult<K>, Error>;
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// Sync a set of scripts with the blockchain (via an Esplora client) for the data
/// specified and return a [`TxGraph`]. /// specified and return a [`TxGraph`].
@ -70,59 +68,54 @@ pub trait EsploraExt {
/// ///
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
/// [`full_scan`]: EsploraExt::full_scan /// [`full_scan`]: EsploraExt::full_scan
fn sync( fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error>;
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
) -> Result<SyncUpdate, Error>;
} }
impl EsploraExt for esplora_client::BlockingClient { impl EsploraExt for esplora_client::BlockingClient {
fn full_scan<K: Ord + Clone>( fn full_scan<K: Ord + Clone>(
&self, &self,
local_tip: CheckPoint, request: FullScanRequest<K>,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
stop_gap: usize, stop_gap: usize,
parallel_requests: usize, parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> { ) -> Result<FullScanResult<K>, Error> {
let latest_blocks = fetch_latest_blocks(self)?; let latest_blocks = fetch_latest_blocks(self)?;
let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( let (graph_update, last_active_indices) = full_scan_for_index_and_graph_blocking(
self, self,
keychain_spks, request.spks_by_keychain,
stop_gap, stop_gap,
parallel_requests, parallel_requests,
)?; )?;
let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; let chain_update = chain_update(
Ok(FullScanUpdate { self,
local_chain, &latest_blocks,
tx_graph, &request.chain_tip,
graph_update.all_anchors(),
)?;
Ok(FullScanResult {
chain_update,
graph_update,
last_active_indices, last_active_indices,
}) })
} }
fn sync( fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result<SyncResult, Error> {
&self,
local_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
parallel_requests: usize,
) -> Result<SyncUpdate, Error> {
let latest_blocks = fetch_latest_blocks(self)?; let latest_blocks = fetch_latest_blocks(self)?;
let tx_graph = sync_for_index_and_graph_blocking( let graph_update = sync_for_index_and_graph_blocking(
self, self,
misc_spks, request.spks,
txids, request.txids,
outpoints, request.outpoints,
parallel_requests, parallel_requests,
)?; )?;
let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; let chain_update = chain_update(
Ok(SyncUpdate { self,
local_chain, &latest_blocks,
tx_graph, &request.chain_tip,
graph_update.all_anchors(),
)?;
Ok(SyncResult {
chain_update,
graph_update,
}) })
} }
} }

View File

@ -16,9 +16,7 @@
//! [`TxGraph`]: bdk_chain::tx_graph::TxGraph //! [`TxGraph`]: bdk_chain::tx_graph::TxGraph
//! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora //! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora
use std::collections::BTreeMap; use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor};
use bdk_chain::{local_chain::CheckPoint, BlockId, ConfirmationTimeHeightAnchor, TxGraph};
use esplora_client::TxStatus; use esplora_client::TxStatus;
pub use esplora_client; pub use esplora_client;
@ -50,21 +48,3 @@ fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor>
None None
} }
} }
/// Update returns from a full scan.
pub struct FullScanUpdate<K> {
/// The update to apply to the receiving [`LocalChain`](bdk_chain::local_chain::LocalChain).
pub local_chain: CheckPoint,
/// The update to apply to the receiving [`TxGraph`].
pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
/// Last active indices for the corresponding keychains (`K`).
pub last_active_indices: BTreeMap<K, u32>,
}
/// Update returned from a sync.
pub struct SyncUpdate {
/// The update to apply to the receiving [`LocalChain`](bdk_chain::local_chain::LocalChain).
pub local_chain: CheckPoint,
/// The update to apply to the receiving [`TxGraph`].
pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
}

View File

@ -1,8 +1,9 @@
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_esplora::EsploraAsyncExt; use bdk_esplora::EsploraAsyncExt;
use electrsd::bitcoind::anyhow; use electrsd::bitcoind::anyhow;
use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
use esplora_client::{self, Builder}; use esplora_client::{self, Builder};
use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::str::FromStr; use std::str::FromStr;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
@ -55,20 +56,15 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
// use a full checkpoint linked list (since this is not what we are testing) // use a full checkpoint linked list (since this is not what we are testing)
let cp_tip = env.make_checkpoint_tip(); let cp_tip = env.make_checkpoint_tip();
let sync_update = client let sync_update = {
.sync( let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
cp_tip.clone(), client.sync(request, 1).await?
misc_spks.into_iter(), };
vec![].into_iter(),
vec![].into_iter(),
1,
)
.await?;
assert!( assert!(
{ {
let update_cps = sync_update let update_cps = sync_update
.local_chain .chain_update
.iter() .iter()
.map(|cp| cp.block_id()) .map(|cp| cp.block_id())
.collect::<BTreeSet<_>>(); .collect::<BTreeSet<_>>();
@ -81,7 +77,7 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
"update should not alter original checkpoint tip since we already started with all checkpoints", "update should not alter original checkpoint tip since we already started with all checkpoints",
); );
let graph_update = sync_update.tx_graph; let graph_update = sync_update.graph_update;
// Check to see if we have the floating txouts available from our two created transactions' // Check to see if we have the floating txouts available from our two created transactions'
// previous outputs in order to calculate transaction fees. // previous outputs in order to calculate transaction fees.
for tx in graph_update.full_txs() { for tx in graph_update.full_txs() {
@ -142,8 +138,6 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
.enumerate() .enumerate()
.map(|(i, addr)| (i as u32, addr.script_pubkey())) .map(|(i, addr)| (i as u32, addr.script_pubkey()))
.collect(); .collect();
let mut keychains = BTreeMap::new();
keychains.insert(0, spks);
// Then receive coins on the 4th address. // Then receive coins on the 4th address.
let txid_4th_addr = env.bitcoind.client.send_to_address( let txid_4th_addr = env.bitcoind.client.send_to_address(
@ -166,16 +160,25 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4
// will. // will.
let full_scan_update = client let full_scan_update = {
.full_scan(cp_tip.clone(), keychains.clone(), 3, 1) let request =
.await?; FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
assert!(full_scan_update.tx_graph.full_txs().next().is_none()); client.full_scan(request, 3, 1).await?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty()); assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = client let full_scan_update = {
.full_scan(cp_tip.clone(), keychains.clone(), 4, 1) let request =
.await?; FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1).await?
};
assert_eq!( assert_eq!(
full_scan_update.tx_graph.full_txs().next().unwrap().txid, full_scan_update
.graph_update
.full_txs()
.next()
.unwrap()
.txid,
txid_4th_addr txid_4th_addr
); );
assert_eq!(full_scan_update.last_active_indices[&0], 3); assert_eq!(full_scan_update.last_active_indices[&0], 3);
@ -198,20 +201,26 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one. // The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = client let full_scan_update = {
.full_scan(cp_tip.clone(), keychains.clone(), 5, 1) let request =
.await?; FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1).await?
};
let txs: HashSet<_> = full_scan_update let txs: HashSet<_> = full_scan_update
.tx_graph .graph_update
.full_txs() .full_txs()
.map(|tx| tx.txid) .map(|tx| tx.txid)
.collect(); .collect();
assert_eq!(txs.len(), 1); assert_eq!(txs.len(), 1);
assert!(txs.contains(&txid_4th_addr)); assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3); assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = client.full_scan(cp_tip, keychains, 6, 1).await?; let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1).await?
};
let txs: HashSet<_> = full_scan_update let txs: HashSet<_> = full_scan_update
.tx_graph .graph_update
.full_txs() .full_txs()
.map(|tx| tx.txid) .map(|tx| tx.txid)
.collect(); .collect();

View File

@ -1,8 +1,9 @@
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_esplora::EsploraExt; use bdk_esplora::EsploraExt;
use electrsd::bitcoind::anyhow; use electrsd::bitcoind::anyhow;
use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
use esplora_client::{self, Builder}; use esplora_client::{self, Builder};
use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::str::FromStr; use std::str::FromStr;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
@ -55,18 +56,15 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
// use a full checkpoint linked list (since this is not what we are testing) // use a full checkpoint linked list (since this is not what we are testing)
let cp_tip = env.make_checkpoint_tip(); let cp_tip = env.make_checkpoint_tip();
let sync_update = client.sync( let sync_update = {
cp_tip.clone(), let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks);
misc_spks.into_iter(), client.sync(request, 1)?
vec![].into_iter(), };
vec![].into_iter(),
1,
)?;
assert!( assert!(
{ {
let update_cps = sync_update let update_cps = sync_update
.local_chain .chain_update
.iter() .iter()
.map(|cp| cp.block_id()) .map(|cp| cp.block_id())
.collect::<BTreeSet<_>>(); .collect::<BTreeSet<_>>();
@ -79,7 +77,7 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
"update should not alter original checkpoint tip since we already started with all checkpoints", "update should not alter original checkpoint tip since we already started with all checkpoints",
); );
let graph_update = sync_update.tx_graph; let graph_update = sync_update.graph_update;
// Check to see if we have the floating txouts available from our two created transactions' // Check to see if we have the floating txouts available from our two created transactions'
// previous outputs in order to calculate transaction fees. // previous outputs in order to calculate transaction fees.
for tx in graph_update.full_txs() { for tx in graph_update.full_txs() {
@ -141,8 +139,6 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
.enumerate() .enumerate()
.map(|(i, addr)| (i as u32, addr.script_pubkey())) .map(|(i, addr)| (i as u32, addr.script_pubkey()))
.collect(); .collect();
let mut keychains = BTreeMap::new();
keychains.insert(0, spks);
// Then receive coins on the 4th address. // Then receive coins on the 4th address.
let txid_4th_addr = env.bitcoind.client.send_to_address( let txid_4th_addr = env.bitcoind.client.send_to_address(
@ -165,12 +161,25 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4
// will. // will.
let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 3, 1)?; let full_scan_update = {
assert!(full_scan_update.tx_graph.full_txs().next().is_none()); let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 3, 1)?
};
assert!(full_scan_update.graph_update.full_txs().next().is_none());
assert!(full_scan_update.last_active_indices.is_empty()); assert!(full_scan_update.last_active_indices.is_empty());
let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 4, 1)?; let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 4, 1)?
};
assert_eq!( assert_eq!(
full_scan_update.tx_graph.full_txs().next().unwrap().txid, full_scan_update
.graph_update
.full_txs()
.next()
.unwrap()
.txid,
txid_4th_addr txid_4th_addr
); );
assert_eq!(full_scan_update.last_active_indices[&0], 3); assert_eq!(full_scan_update.last_active_indices[&0], 3);
@ -193,18 +202,26 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> {
// A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will.
// The last active indice won't be updated in the first case but will in the second one. // The last active indice won't be updated in the first case but will in the second one.
let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 5, 1)?; let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 5, 1)?
};
let txs: HashSet<_> = full_scan_update let txs: HashSet<_> = full_scan_update
.tx_graph .graph_update
.full_txs() .full_txs()
.map(|tx| tx.txid) .map(|tx| tx.txid)
.collect(); .collect();
assert_eq!(txs.len(), 1); assert_eq!(txs.len(), 1);
assert!(txs.contains(&txid_4th_addr)); assert!(txs.contains(&txid_4th_addr));
assert_eq!(full_scan_update.last_active_indices[&0], 3); assert_eq!(full_scan_update.last_active_indices[&0], 3);
let full_scan_update = client.full_scan(cp_tip.clone(), keychains, 6, 1)?; let full_scan_update = {
let request =
FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone());
client.full_scan(request, 6, 1)?
};
let txs: HashSet<_> = full_scan_update let txs: HashSet<_> = full_scan_update
.tx_graph .graph_update
.full_txs() .full_txs()
.map(|tx| tx.txid) .map(|tx| tx.txid)
.collect(); .collect();

View File

@ -1,14 +1,15 @@
use std::{ use std::{
collections::BTreeMap, collections::BTreeSet,
io::{self, Write}, io::{self, Write},
sync::Mutex, sync::Mutex,
}; };
use bdk_chain::{ use bdk_chain::{
bitcoin::{constants::genesis_block, Address, Network, OutPoint, ScriptBuf, Txid}, bitcoin::{constants::genesis_block, Address, Network, Txid},
indexed_tx_graph::{self, IndexedTxGraph}, indexed_tx_graph::{self, IndexedTxGraph},
keychain, keychain,
local_chain::{self, LocalChain}, local_chain::{self, LocalChain},
spk_client::{FullScanRequest, SyncRequest},
Append, ConfirmationTimeHeightAnchor, Append, ConfirmationTimeHeightAnchor,
}; };
@ -167,45 +168,34 @@ fn main() -> anyhow::Result<()> {
scan_options, scan_options,
.. ..
} => { } => {
let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); let request = {
let keychain_spks = graph let chain_tip = chain.lock().expect("mutex must not be poisoned").tip();
.lock() let indexed_graph = &*graph.lock().expect("mutex must not be poisoned");
.expect("mutex must not be poisoned") FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index)
.index .inspect_spks_for_all_keychains({
.all_unbounded_spk_iters() let mut once = BTreeSet::<Keychain>::new();
.into_iter() move |keychain, spk_i, _| {
// This `map` is purely for logging. if once.insert(keychain) {
.map(|(keychain, iter)| { eprint!("\nscanning {}: ", keychain);
let mut first = true; }
let spk_iter = iter.inspect(move |(i, _)| { eprint!("{} ", spk_i);
if first { // Flush early to ensure we print at every iteration.
eprint!("\nscanning {}: ", keychain); let _ = io::stderr().flush();
first = false;
} }
eprint!("{} ", i); })
// Flush early to ensure we print at every iteration. };
let _ = io::stderr().flush();
});
(keychain, spk_iter)
})
.collect::<BTreeMap<_, _>>();
// The client scans keychain spks for transaction histories, stopping after `stop_gap` // The client scans keychain spks for transaction histories, stopping after `stop_gap`
// is reached. It returns a `TxGraph` update (`graph_update`) and a structure that // is reached. It returns a `TxGraph` update (`graph_update`) and a structure that
// represents the last active spk derivation indices of keychains // represents the last active spk derivation indices of keychains
// (`keychain_indices_update`). // (`keychain_indices_update`).
let mut update = client let mut update = client
.full_scan( .full_scan(request, *stop_gap, scan_options.parallel_requests)
local_tip,
keychain_spks,
*stop_gap,
scan_options.parallel_requests,
)
.context("scanning for transactions")?; .context("scanning for transactions")?;
// We want to keep track of the latest time a transaction was seen unconfirmed. // We want to keep track of the latest time a transaction was seen unconfirmed.
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.tx_graph.update_last_seen_unconfirmed(now); let _ = update.graph_update.update_last_seen_unconfirmed(now);
let mut graph = graph.lock().expect("mutex must not be poisoned"); let mut graph = graph.lock().expect("mutex must not be poisoned");
let mut chain = chain.lock().expect("mutex must not be poisoned"); let mut chain = chain.lock().expect("mutex must not be poisoned");
@ -213,11 +203,11 @@ fn main() -> anyhow::Result<()> {
// deriviation indices. Usually before a scan you are on a fresh wallet with no // deriviation indices. Usually before a scan you are on a fresh wallet with no
// addresses derived so we need to derive up to last active addresses the scan found // addresses derived so we need to derive up to last active addresses the scan found
// before adding the transactions. // before adding the transactions.
(chain.apply_update(update.local_chain)?, { (chain.apply_update(update.chain_update)?, {
let (_, index_changeset) = graph let (_, index_changeset) = graph
.index .index
.reveal_to_target_multi(&update.last_active_indices); .reveal_to_target_multi(&update.last_active_indices);
let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_graph); let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update);
indexed_tx_graph_changeset.append(index_changeset.into()); indexed_tx_graph_changeset.append(index_changeset.into());
indexed_tx_graph_changeset indexed_tx_graph_changeset
}) })
@ -241,12 +231,9 @@ fn main() -> anyhow::Result<()> {
unused_spks = false; unused_spks = false;
} }
// Spks, outpoints and txids we want updates on will be accumulated here.
let mut spks: Box<dyn Iterator<Item = ScriptBuf>> = Box::new(core::iter::empty());
let mut outpoints: Box<dyn Iterator<Item = OutPoint>> = Box::new(core::iter::empty());
let mut txids: Box<dyn Iterator<Item = Txid>> = Box::new(core::iter::empty());
let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); let local_tip = chain.lock().expect("mutex must not be poisoned").tip();
// Spks, outpoints and txids we want updates on will be accumulated here.
let mut request = SyncRequest::from_chain_tip(local_tip.clone());
// Get a short lock on the structures to get spks, utxos, and txs that we are interested // Get a short lock on the structures to get spks, utxos, and txs that we are interested
// in. // in.
@ -260,12 +247,12 @@ fn main() -> anyhow::Result<()> {
.revealed_spks(..) .revealed_spks(..)
.map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned())) .map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
spks = Box::new(spks.chain(all_spks.into_iter().map(|(k, i, spk)| { request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
eprintln!("scanning {}:{}", k, i); eprintln!("scanning {}:{}", k, i);
// Flush early to ensure we print at every iteration. // Flush early to ensure we print at every iteration.
let _ = io::stderr().flush(); let _ = io::stderr().flush();
spk spk
}))); }));
} }
if unused_spks { if unused_spks {
let unused_spks = graph let unused_spks = graph
@ -273,17 +260,18 @@ fn main() -> anyhow::Result<()> {
.unused_spks() .unused_spks()
.map(|(k, i, spk)| (k, i, spk.to_owned())) .map(|(k, i, spk)| (k, i, spk.to_owned()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
spks = Box::new(spks.chain(unused_spks.into_iter().map(|(k, i, spk)| { request =
eprintln!( request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| {
"Checking if address {} {}:{} has been used", eprintln!(
Address::from_script(&spk, args.network).unwrap(), "Checking if address {} {}:{} has been used",
k, Address::from_script(&spk, args.network).unwrap(),
i, k,
); i,
// Flush early to ensure we print at every iteration. );
let _ = io::stderr().flush(); // Flush early to ensure we print at every iteration.
spk let _ = io::stderr().flush();
}))); spk
}));
} }
if utxos { if utxos {
// We want to search for whether the UTXO is spent, and spent by which // We want to search for whether the UTXO is spent, and spent by which
@ -295,7 +283,7 @@ fn main() -> anyhow::Result<()> {
.filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints) .filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints)
.map(|(_, utxo)| utxo) .map(|(_, utxo)| utxo)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
outpoints = Box::new( request = request.chain_outpoints(
utxos utxos
.into_iter() .into_iter()
.inspect(|utxo| { .inspect(|utxo| {
@ -319,7 +307,7 @@ fn main() -> anyhow::Result<()> {
.filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
.map(|canonical_tx| canonical_tx.tx_node.txid) .map(|canonical_tx| canonical_tx.tx_node.txid)
.collect::<Vec<Txid>>(); .collect::<Vec<Txid>>();
txids = Box::new(unconfirmed_txids.into_iter().inspect(|txid| { request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
eprintln!("Checking if {} is confirmed yet", txid); eprintln!("Checking if {} is confirmed yet", txid);
// Flush early to ensure we print at every iteration. // Flush early to ensure we print at every iteration.
let _ = io::stderr().flush(); let _ = io::stderr().flush();
@ -327,21 +315,15 @@ fn main() -> anyhow::Result<()> {
} }
} }
let mut update = client.sync( let mut update = client.sync(request, scan_options.parallel_requests)?;
local_tip,
spks,
txids,
outpoints,
scan_options.parallel_requests,
)?;
// Update last seen unconfirmed // Update last seen unconfirmed
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.tx_graph.update_last_seen_unconfirmed(now); let _ = update.graph_update.update_last_seen_unconfirmed(now);
( (
chain.lock().unwrap().apply_update(update.local_chain)?, chain.lock().unwrap().apply_update(update.chain_update)?,
graph.lock().unwrap().apply_update(update.tx_graph), graph.lock().unwrap().apply_update(update.graph_update),
) )
} }
}; };

View File

@ -1,8 +1,7 @@
use std::{io::Write, str::FromStr}; use std::{collections::BTreeSet, io::Write, str::FromStr};
use bdk::{ use bdk::{
bitcoin::{Address, Network}, bitcoin::{Address, Network, Script},
wallet::Update,
KeychainKind, SignOptions, Wallet, KeychainKind, SignOptions, Wallet,
}; };
use bdk_esplora::{esplora_client, EsploraAsyncExt}; use bdk_esplora::{esplora_client, EsploraAsyncExt};
@ -37,34 +36,44 @@ async fn main() -> Result<(), anyhow::Error> {
let client = let client =
esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?; esplora_client::Builder::new("https://blockstream.info/testnet/api").build_async()?;
let prev_tip = wallet.latest_checkpoint(); fn generate_inspect(kind: KeychainKind) -> impl FnMut(u32, &Script) + Send + Sync + 'static {
let keychain_spks = wallet let mut once = Some(());
.all_unbounded_spk_iters() let mut stdout = std::io::stdout();
.into_iter() move |spk_i, _| {
.map(|(k, k_spks)| { match once.take() {
let mut once = Some(()); Some(_) => print!("\nScanning keychain [{:?}]", kind),
let mut stdout = std::io::stdout(); None => print!(" {:<3}", spk_i),
let k_spks = k_spks };
.inspect(move |(spk_i, _)| match once.take() { stdout.flush().expect("must flush");
Some(_) => print!("\nScanning keychain [{:?}]", k), }
None => print!(" {:<3}", spk_i), }
}) let request = wallet
.inspect(move |_| stdout.flush().expect("must flush")); .start_full_scan()
(k, k_spks) .inspect_spks_for_all_keychains({
let mut once = BTreeSet::<KeychainKind>::new();
move |keychain, spk_i, _| {
match once.insert(keychain) {
true => print!("\nScanning keychain [{:?}]", keychain),
false => print!(" {:<3}", spk_i),
}
std::io::stdout().flush().expect("must flush")
}
}) })
.collect(); .inspect_spks_for_keychain(
KeychainKind::External,
generate_inspect(KeychainKind::External),
)
.inspect_spks_for_keychain(
KeychainKind::Internal,
generate_inspect(KeychainKind::Internal),
);
let mut update = client let mut update = client
.full_scan(prev_tip, keychain_spks, STOP_GAP, PARALLEL_REQUESTS) .full_scan(request, STOP_GAP, PARALLEL_REQUESTS)
.await?; .await?;
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.tx_graph.update_last_seen_unconfirmed(now); let _ = update.graph_update.update_last_seen_unconfirmed(now);
let update = Update {
last_active_indices: update.last_active_indices,
graph: update.tx_graph,
chain: Some(update.local_chain),
};
wallet.apply_update(update)?; wallet.apply_update(update)?;
wallet.commit()?; wallet.commit()?;
println!(); println!();

View File

@ -3,11 +3,10 @@ const SEND_AMOUNT: u64 = 1000;
const STOP_GAP: usize = 5; const STOP_GAP: usize = 5;
const PARALLEL_REQUESTS: usize = 1; const PARALLEL_REQUESTS: usize = 1;
use std::{io::Write, str::FromStr}; use std::{collections::BTreeSet, io::Write, str::FromStr};
use bdk::{ use bdk::{
bitcoin::{Address, Network}, bitcoin::{Address, Network},
wallet::Update,
KeychainKind, SignOptions, Wallet, KeychainKind, SignOptions, Wallet,
}; };
use bdk_esplora::{esplora_client, EsploraExt}; use bdk_esplora::{esplora_client, EsploraExt};
@ -36,36 +35,22 @@ fn main() -> Result<(), anyhow::Error> {
let client = let client =
esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking(); esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking();
let keychain_spks = wallet let request = wallet.start_full_scan().inspect_spks_for_all_keychains({
.all_unbounded_spk_iters() let mut once = BTreeSet::<KeychainKind>::new();
.into_iter() move |keychain, spk_i, _| {
.map(|(k, k_spks)| { match once.insert(keychain) {
let mut once = Some(()); true => print!("\nScanning keychain [{:?}]", keychain),
let mut stdout = std::io::stdout(); false => print!(" {:<3}", spk_i),
let k_spks = k_spks };
.inspect(move |(spk_i, _)| match once.take() { std::io::stdout().flush().expect("must flush")
Some(_) => print!("\nScanning keychain [{:?}]", k), }
None => print!(" {:<3}", spk_i), });
})
.inspect(move |_| stdout.flush().expect("must flush"));
(k, k_spks)
})
.collect();
let mut update = client.full_scan( let mut update = client.full_scan(request, STOP_GAP, PARALLEL_REQUESTS)?;
wallet.latest_checkpoint(),
keychain_spks,
STOP_GAP,
PARALLEL_REQUESTS,
)?;
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.tx_graph.update_last_seen_unconfirmed(now); let _ = update.graph_update.update_last_seen_unconfirmed(now);
wallet.apply_update(Update { wallet.apply_update(update)?;
last_active_indices: update.last_active_indices,
graph: update.tx_graph,
chain: Some(update.local_chain),
})?;
wallet.commit()?; wallet.commit()?;
println!(); println!();