feat(chain): SyncRequest
now uses ExactSizeIterator
s
This allows the caller to track sync progress.
This commit is contained in:
parent
0f94f24aaf
commit
c0374a0eeb
@ -1,6 +1,6 @@
|
|||||||
//! Helper types for spk-based blockchain clients.
|
//! Helper types for spk-based blockchain clients.
|
||||||
|
|
||||||
use core::{fmt::Debug, ops::RangeBounds};
|
use core::{fmt::Debug, marker::PhantomData, ops::RangeBounds};
|
||||||
|
|
||||||
use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
|
use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
|
||||||
use bitcoin::{OutPoint, Script, ScriptBuf, Txid};
|
use bitcoin::{OutPoint, Script, ScriptBuf, Txid};
|
||||||
@ -18,11 +18,11 @@ pub struct SyncRequest {
|
|||||||
/// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
|
/// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip
|
||||||
pub chain_tip: CheckPoint,
|
pub chain_tip: CheckPoint,
|
||||||
/// Transactions that spend from or to these indexed script pubkeys.
|
/// Transactions that spend from or to these indexed script pubkeys.
|
||||||
pub spks: Box<dyn Iterator<Item = ScriptBuf> + Send>,
|
pub spks: Box<dyn ExactSizeIterator<Item = ScriptBuf> + Send>,
|
||||||
/// Transactions with these txids.
|
/// Transactions with these txids.
|
||||||
pub txids: Box<dyn Iterator<Item = Txid> + Send>,
|
pub txids: Box<dyn ExactSizeIterator<Item = Txid> + Send>,
|
||||||
/// Transactions with these outpoints or spent from these outpoints.
|
/// Transactions with these outpoints or spent from these outpoints.
|
||||||
pub outpoints: Box<dyn Iterator<Item = OutPoint> + Send>,
|
pub outpoints: Box<dyn ExactSizeIterator<Item = OutPoint> + Send>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncRequest {
|
impl SyncRequest {
|
||||||
@ -42,7 +42,7 @@ impl SyncRequest {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn set_spks(
|
pub fn set_spks(
|
||||||
mut self,
|
mut self,
|
||||||
spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send + 'static>,
|
spks: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.spks = Box::new(spks.into_iter());
|
self.spks = Box::new(spks.into_iter());
|
||||||
self
|
self
|
||||||
@ -54,7 +54,7 @@ impl SyncRequest {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn set_txids(
|
pub fn set_txids(
|
||||||
mut self,
|
mut self,
|
||||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send + 'static>,
|
txids: impl IntoIterator<IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.txids = Box::new(txids.into_iter());
|
self.txids = Box::new(txids.into_iter());
|
||||||
self
|
self
|
||||||
@ -66,7 +66,9 @@ impl SyncRequest {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn set_outpoints(
|
pub fn set_outpoints(
|
||||||
mut self,
|
mut self,
|
||||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send + 'static>,
|
outpoints: impl IntoIterator<
|
||||||
|
IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
|
||||||
|
>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.outpoints = Box::new(outpoints.into_iter());
|
self.outpoints = Box::new(outpoints.into_iter());
|
||||||
self
|
self
|
||||||
@ -79,11 +81,11 @@ impl SyncRequest {
|
|||||||
pub fn chain_spks(
|
pub fn chain_spks(
|
||||||
mut self,
|
mut self,
|
||||||
spks: impl IntoIterator<
|
spks: impl IntoIterator<
|
||||||
IntoIter = impl Iterator<Item = ScriptBuf> + Send + 'static,
|
IntoIter = impl ExactSizeIterator<Item = ScriptBuf> + Send + 'static,
|
||||||
Item = ScriptBuf,
|
Item = ScriptBuf,
|
||||||
>,
|
>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.spks = Box::new(self.spks.chain(spks));
|
self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter()));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,9 +95,12 @@ impl SyncRequest {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn chain_txids(
|
pub fn chain_txids(
|
||||||
mut self,
|
mut self,
|
||||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send + 'static, Item = Txid>,
|
txids: impl IntoIterator<
|
||||||
|
IntoIter = impl ExactSizeIterator<Item = Txid> + Send + 'static,
|
||||||
|
Item = Txid,
|
||||||
|
>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.txids = Box::new(self.txids.chain(txids));
|
self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter()));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,39 +111,42 @@ impl SyncRequest {
|
|||||||
pub fn chain_outpoints(
|
pub fn chain_outpoints(
|
||||||
mut self,
|
mut self,
|
||||||
outpoints: impl IntoIterator<
|
outpoints: impl IntoIterator<
|
||||||
IntoIter = impl Iterator<Item = OutPoint> + Send + 'static,
|
IntoIter = impl ExactSizeIterator<Item = OutPoint> + Send + 'static,
|
||||||
Item = OutPoint,
|
Item = OutPoint,
|
||||||
>,
|
>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.outpoints = Box::new(self.outpoints.chain(outpoints));
|
self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter()));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a closure that will be called for each [`Script`] synced in this request.
|
/// Add a closure that will be called for [`Script`]s previously added to this request.
|
||||||
///
|
///
|
||||||
/// This consumes the [`SyncRequest`] and returns the updated one.
|
/// This consumes the [`SyncRequest`] and returns the updated one.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn inspect_spks(mut self, inspect: impl Fn(&Script) + Send + Sync + 'static) -> Self {
|
pub fn inspect_spks(
|
||||||
|
mut self,
|
||||||
|
mut inspect: impl FnMut(&Script) + Send + Sync + 'static,
|
||||||
|
) -> Self {
|
||||||
self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk)));
|
self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk)));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a closure that will be called for each [`Txid`] synced in this request.
|
/// Add a closure that will be called for [`Txid`]s previously added to this request.
|
||||||
///
|
///
|
||||||
/// This consumes the [`SyncRequest`] and returns the updated one.
|
/// This consumes the [`SyncRequest`] and returns the updated one.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn inspect_txids(mut self, inspect: impl Fn(&Txid) + Send + Sync + 'static) -> Self {
|
pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self {
|
||||||
self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid)));
|
self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid)));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a closure that will be called for each [`OutPoint`] synced in this request.
|
/// Add a closure that will be called for [`OutPoint`]s previously added to this request.
|
||||||
///
|
///
|
||||||
/// This consumes the [`SyncRequest`] and returns the updated one.
|
/// This consumes the [`SyncRequest`] and returns the updated one.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn inspect_outpoints(
|
pub fn inspect_outpoints(
|
||||||
mut self,
|
mut self,
|
||||||
inspect: impl Fn(&OutPoint) + Send + Sync + 'static,
|
mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op)));
|
self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op)));
|
||||||
self
|
self
|
||||||
@ -313,3 +321,64 @@ pub struct FullScanResult<K> {
|
|||||||
/// Last active indices for the corresponding keychains (`K`).
|
/// Last active indices for the corresponding keychains (`K`).
|
||||||
pub last_active_indices: BTreeMap<K, u32>,
|
pub last_active_indices: BTreeMap<K, u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new
|
||||||
|
/// [`ExactSizeIterator`].
|
||||||
|
///
|
||||||
|
/// The danger of this is explained in [the `ExactSizeIterator` docs]
|
||||||
|
/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator).
|
||||||
|
/// This does not apply here since it would be impossible to scan an item count that overflows
|
||||||
|
/// `usize` anyway.
|
||||||
|
struct ExactSizeChain<A, B, I> {
|
||||||
|
a: Option<A>,
|
||||||
|
b: Option<B>,
|
||||||
|
i: PhantomData<I>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, B, I> ExactSizeChain<A, B, I> {
|
||||||
|
fn new(a: A, b: B) -> Self {
|
||||||
|
ExactSizeChain {
|
||||||
|
a: Some(a),
|
||||||
|
b: Some(b),
|
||||||
|
i: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, B, I> Iterator for ExactSizeChain<A, B, I>
|
||||||
|
where
|
||||||
|
A: Iterator<Item = I>,
|
||||||
|
B: Iterator<Item = I>,
|
||||||
|
{
|
||||||
|
type Item = I;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
if let Some(a) = &mut self.a {
|
||||||
|
let item = a.next();
|
||||||
|
if item.is_some() {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
self.a = None;
|
||||||
|
}
|
||||||
|
if let Some(b) = &mut self.b {
|
||||||
|
let item = b.next();
|
||||||
|
if item.is_some() {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
self.b = None;
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, B, I> ExactSizeIterator for ExactSizeChain<A, B, I>
|
||||||
|
where
|
||||||
|
A: ExactSizeIterator<Item = I>,
|
||||||
|
B: ExactSizeIterator<Item = I>,
|
||||||
|
{
|
||||||
|
fn len(&self) -> usize {
|
||||||
|
let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0);
|
||||||
|
let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0);
|
||||||
|
a_len + b_len
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -248,7 +248,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned()))
|
.map(|(k, i, spk)| (k.to_owned(), i, spk.to_owned()))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
|
request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| {
|
||||||
eprintln!("scanning {}:{}", k, i);
|
eprint!("scanning {}:{}", k, i);
|
||||||
// Flush early to ensure we print at every iteration.
|
// Flush early to ensure we print at every iteration.
|
||||||
let _ = io::stderr().flush();
|
let _ = io::stderr().flush();
|
||||||
spk
|
spk
|
||||||
@ -262,7 +262,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
request =
|
request =
|
||||||
request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| {
|
request.chain_spks(unused_spks.into_iter().map(move |(k, i, spk)| {
|
||||||
eprintln!(
|
eprint!(
|
||||||
"Checking if address {} {}:{} has been used",
|
"Checking if address {} {}:{} has been used",
|
||||||
Address::from_script(&spk, args.network).unwrap(),
|
Address::from_script(&spk, args.network).unwrap(),
|
||||||
k,
|
k,
|
||||||
@ -287,7 +287,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
utxos
|
utxos
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.inspect(|utxo| {
|
.inspect(|utxo| {
|
||||||
eprintln!(
|
eprint!(
|
||||||
"Checking if outpoint {} (value: {}) has been spent",
|
"Checking if outpoint {} (value: {}) has been spent",
|
||||||
utxo.outpoint, utxo.txout.value
|
utxo.outpoint, utxo.txout.value
|
||||||
);
|
);
|
||||||
@ -308,13 +308,38 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.map(|canonical_tx| canonical_tx.tx_node.txid)
|
.map(|canonical_tx| canonical_tx.tx_node.txid)
|
||||||
.collect::<Vec<Txid>>();
|
.collect::<Vec<Txid>>();
|
||||||
request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
|
request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| {
|
||||||
eprintln!("Checking if {} is confirmed yet", txid);
|
eprint!("Checking if {} is confirmed yet", txid);
|
||||||
// Flush early to ensure we print at every iteration.
|
// Flush early to ensure we print at every iteration.
|
||||||
let _ = io::stderr().flush();
|
let _ = io::stderr().flush();
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let total_spks = request.spks.len();
|
||||||
|
let total_txids = request.txids.len();
|
||||||
|
let total_ops = request.outpoints.len();
|
||||||
|
request = request
|
||||||
|
.inspect_spks({
|
||||||
|
let mut visited = 0;
|
||||||
|
move |_| {
|
||||||
|
visited += 1;
|
||||||
|
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.inspect_txids({
|
||||||
|
let mut visited = 0;
|
||||||
|
move |_| {
|
||||||
|
visited += 1;
|
||||||
|
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.inspect_outpoints({
|
||||||
|
let mut visited = 0;
|
||||||
|
move |_| {
|
||||||
|
visited += 1;
|
||||||
|
eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32)
|
||||||
|
}
|
||||||
|
});
|
||||||
let mut update = client.sync(request, scan_options.parallel_requests)?;
|
let mut update = client.sync(request, scan_options.parallel_requests)?;
|
||||||
|
|
||||||
// Update last seen unconfirmed
|
// Update last seen unconfirmed
|
||||||
|
Loading…
x
Reference in New Issue
Block a user