Add wallet_esplora_async
example and various fixes
Fixes include: * Allow `bdk_esplora` to use async with tls * Reorganize `bdk_esplora` crate to have separate files for async vs blocking * Use optional dependencies for `bdk_esplora` async
This commit is contained in:
parent
26ab2e2d6c
commit
def0c9ed39
@ -9,6 +9,7 @@ members = [
|
|||||||
"example-crates/keychain_tracker_example_cli",
|
"example-crates/keychain_tracker_example_cli",
|
||||||
"example-crates/wallet_electrum",
|
"example-crates/wallet_electrum",
|
||||||
"example-crates/wallet_esplora",
|
"example-crates/wallet_esplora",
|
||||||
|
"example-crates/wallet_esplora_async",
|
||||||
"nursery/tmp_plan",
|
"nursery/tmp_plan",
|
||||||
"nursery/coin_select"
|
"nursery/coin_select"
|
||||||
]
|
]
|
||||||
|
@ -14,10 +14,11 @@ readme = "README.md"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] }
|
bdk_chain = { path = "../chain", version = "0.3.1", features = ["serde", "miniscript"] }
|
||||||
esplora-client = { version = "0.3", default-features = false }
|
esplora-client = { version = "0.3", default-features = false }
|
||||||
async-trait = "0.1.66"
|
async-trait = { version = "0.1.66", optional = true }
|
||||||
futures = "0.3.26"
|
futures = { version = "0.3.26", optional = true }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["async", "blocking"]
|
default = ["async-https", "blocking"]
|
||||||
async = ["esplora-client/async"]
|
async = ["async-trait", "futures", "esplora-client/async"]
|
||||||
|
async-https = ["async", "esplora-client/async-https"]
|
||||||
blocking = ["esplora-client/blocking"]
|
blocking = ["esplora-client/blocking"]
|
||||||
|
296
crates/esplora/src/async_ext.rs
Normal file
296
crates/esplora/src/async_ext.rs
Normal file
@ -0,0 +1,296 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use bdk_chain::{
|
||||||
|
bitcoin::{BlockHash, OutPoint, Script, Txid},
|
||||||
|
chain_graph::ChainGraph,
|
||||||
|
keychain::KeychainScan,
|
||||||
|
sparse_chain, BlockId, ConfirmationTime,
|
||||||
|
};
|
||||||
|
use esplora_client::{Error, OutputStatus};
|
||||||
|
use futures::stream::{FuturesOrdered, TryStreamExt};
|
||||||
|
|
||||||
|
use crate::map_confirmation_time;
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait EsploraAsyncExt {
|
||||||
|
/// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
|
||||||
|
///
|
||||||
|
/// - `local_chain`: the most recent block hashes present locally
|
||||||
|
/// - `keychain_spks`: keychains that we want to scan transactions for
|
||||||
|
/// - `txids`: transactions that we want updated [`ChainPosition`]s for
|
||||||
|
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||||
|
/// want to included in the update
|
||||||
|
///
|
||||||
|
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||||
|
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||||
|
/// parallel.
|
||||||
|
///
|
||||||
|
/// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
|
||||||
|
#[allow(clippy::result_large_err)] // FIXME
|
||||||
|
async fn scan<K: Ord + Clone>(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
stop_gap: usize,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
|
||||||
|
|
||||||
|
/// Convenience method to call [`scan`] without requiring a keychain.
|
||||||
|
///
|
||||||
|
/// [`scan`]: EsploraAsyncExt::scan
|
||||||
|
#[allow(clippy::result_large_err)] // FIXME
|
||||||
|
async fn scan_without_keychain(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
misc_spks: impl IntoIterator<Item = Script>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<ChainGraph<ConfirmationTime>, Error> {
|
||||||
|
let wallet_scan = self
|
||||||
|
.scan(
|
||||||
|
local_chain,
|
||||||
|
[(
|
||||||
|
(),
|
||||||
|
misc_spks
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, spk)| (i as u32, spk)),
|
||||||
|
)]
|
||||||
|
.into(),
|
||||||
|
txids,
|
||||||
|
outpoints,
|
||||||
|
usize::MAX,
|
||||||
|
parallel_requests,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(wallet_scan.update)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl EsploraAsyncExt for esplora_client::AsyncClient {
|
||||||
|
async fn scan<K: Ord + Clone>(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
stop_gap: usize,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
|
||||||
|
let parallel_requests = parallel_requests.max(1);
|
||||||
|
let mut scan = KeychainScan::default();
|
||||||
|
let update = &mut scan.update;
|
||||||
|
let last_active_indices = &mut scan.last_active_indices;
|
||||||
|
|
||||||
|
for (&height, &original_hash) in local_chain.iter().rev() {
|
||||||
|
let update_block_id = BlockId {
|
||||||
|
height,
|
||||||
|
hash: self.get_block_hash(height).await?,
|
||||||
|
};
|
||||||
|
let _ = update
|
||||||
|
.insert_checkpoint(update_block_id)
|
||||||
|
.expect("cannot repeat height here");
|
||||||
|
if update_block_id.hash == original_hash {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let tip_at_start = BlockId {
|
||||||
|
height: self.get_height().await?,
|
||||||
|
hash: self.get_tip_hash().await?,
|
||||||
|
};
|
||||||
|
if let Err(failure) = update.insert_checkpoint(tip_at_start) {
|
||||||
|
match failure {
|
||||||
|
sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
|
||||||
|
// there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
|
||||||
|
return EsploraAsyncExt::scan(
|
||||||
|
self,
|
||||||
|
local_chain,
|
||||||
|
keychain_spks,
|
||||||
|
txids,
|
||||||
|
outpoints,
|
||||||
|
stop_gap,
|
||||||
|
parallel_requests,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (keychain, spks) in keychain_spks {
|
||||||
|
let mut spks = spks.into_iter();
|
||||||
|
let mut last_active_index = None;
|
||||||
|
let mut empty_scripts = 0;
|
||||||
|
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let futures: FuturesOrdered<_> = (0..parallel_requests)
|
||||||
|
.filter_map(|_| {
|
||||||
|
let (index, script) = spks.next()?;
|
||||||
|
let client = self.clone();
|
||||||
|
Some(async move {
|
||||||
|
let mut related_txs = client.scripthash_txs(&script, None).await?;
|
||||||
|
|
||||||
|
let n_confirmed =
|
||||||
|
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
||||||
|
// esplora pages on 25 confirmed transactions. If there's 25 or more we
|
||||||
|
// keep requesting to see if there's more.
|
||||||
|
if n_confirmed >= 25 {
|
||||||
|
loop {
|
||||||
|
let new_related_txs = client
|
||||||
|
.scripthash_txs(
|
||||||
|
&script,
|
||||||
|
Some(related_txs.last().unwrap().txid),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let n = new_related_txs.len();
|
||||||
|
related_txs.extend(new_related_txs);
|
||||||
|
// we've reached the end
|
||||||
|
if n < 25 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let n_futures = futures.len();
|
||||||
|
|
||||||
|
let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
|
||||||
|
|
||||||
|
for (index, related_txs) in idx_with_tx {
|
||||||
|
if related_txs.is_empty() {
|
||||||
|
empty_scripts += 1;
|
||||||
|
} else {
|
||||||
|
last_active_index = Some(index);
|
||||||
|
empty_scripts = 0;
|
||||||
|
}
|
||||||
|
for tx in related_txs {
|
||||||
|
let confirmation_time =
|
||||||
|
map_confirmation_time(&tx.status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
|
||||||
|
use bdk_chain::{
|
||||||
|
chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
|
||||||
|
};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if n_futures == 0 || empty_scripts >= stop_gap {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(last_active_index) = last_active_index {
|
||||||
|
last_active_indices.insert(keychain, last_active_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for txid in txids.into_iter() {
|
||||||
|
let (tx, tx_status) =
|
||||||
|
match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
|
||||||
|
(Some(tx), Some(tx_status)) => (tx, tx_status),
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
||||||
|
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for op in outpoints.into_iter() {
|
||||||
|
let mut op_txs = Vec::with_capacity(2);
|
||||||
|
if let (Some(tx), Some(tx_status)) = (
|
||||||
|
self.get_tx(&op.txid).await?,
|
||||||
|
self.get_tx_status(&op.txid).await?,
|
||||||
|
) {
|
||||||
|
op_txs.push((tx, tx_status));
|
||||||
|
if let Some(OutputStatus {
|
||||||
|
txid: Some(txid),
|
||||||
|
status: Some(spend_status),
|
||||||
|
..
|
||||||
|
}) = self.get_output_status(&op.txid, op.vout as _).await?
|
||||||
|
{
|
||||||
|
if let Some(spend_tx) = self.get_tx(&txid).await? {
|
||||||
|
op_txs.push((spend_tx, spend_status));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (tx, status) in op_txs {
|
||||||
|
let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
||||||
|
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let reorg_occurred = {
|
||||||
|
if let Some(checkpoint) = update.chain().latest_checkpoint() {
|
||||||
|
self.get_block_hash(checkpoint.height).await? != checkpoint.hash
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if reorg_occurred {
|
||||||
|
// A reorg occurred so lets find out where all the txids we found are in the chain now.
|
||||||
|
// XXX: collect required because of weird type naming issues
|
||||||
|
let txids_found = update
|
||||||
|
.chain()
|
||||||
|
.txids()
|
||||||
|
.map(|(_, txid)| *txid)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
scan.update = EsploraAsyncExt::scan_without_keychain(
|
||||||
|
self,
|
||||||
|
local_chain,
|
||||||
|
[],
|
||||||
|
txids_found,
|
||||||
|
[],
|
||||||
|
parallel_requests,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(scan)
|
||||||
|
}
|
||||||
|
}
|
290
crates/esplora/src/blocking_ext.rs
Normal file
290
crates/esplora/src/blocking_ext.rs
Normal file
@ -0,0 +1,290 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use bdk_chain::{
|
||||||
|
bitcoin::{BlockHash, OutPoint, Script, Txid},
|
||||||
|
chain_graph::ChainGraph,
|
||||||
|
keychain::KeychainScan,
|
||||||
|
sparse_chain, BlockId, ConfirmationTime,
|
||||||
|
};
|
||||||
|
use esplora_client::{Error, OutputStatus};
|
||||||
|
|
||||||
|
use crate::map_confirmation_time;
|
||||||
|
|
||||||
|
/// Trait to extend [`esplora_client::BlockingClient`] functionality.
|
||||||
|
///
|
||||||
|
/// Refer to [crate-level documentation] for more.
|
||||||
|
///
|
||||||
|
/// [crate-level documentation]: crate
|
||||||
|
pub trait EsploraExt {
|
||||||
|
/// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
|
||||||
|
///
|
||||||
|
/// - `local_chain`: the most recent block hashes present locally
|
||||||
|
/// - `keychain_spks`: keychains that we want to scan transactions for
|
||||||
|
/// - `txids`: transactions that we want updated [`ChainPosition`]s for
|
||||||
|
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||||
|
/// want to included in the update
|
||||||
|
///
|
||||||
|
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||||
|
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||||
|
/// parallel.
|
||||||
|
///
|
||||||
|
/// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
|
||||||
|
#[allow(clippy::result_large_err)] // FIXME
|
||||||
|
fn scan<K: Ord + Clone>(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
stop_gap: usize,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
|
||||||
|
|
||||||
|
/// Convenience method to call [`scan`] without requiring a keychain.
|
||||||
|
///
|
||||||
|
/// [`scan`]: EsploraExt::scan
|
||||||
|
#[allow(clippy::result_large_err)] // FIXME
|
||||||
|
fn scan_without_keychain(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
misc_spks: impl IntoIterator<Item = Script>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<ChainGraph<ConfirmationTime>, Error> {
|
||||||
|
let wallet_scan = self.scan(
|
||||||
|
local_chain,
|
||||||
|
[(
|
||||||
|
(),
|
||||||
|
misc_spks
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, spk)| (i as u32, spk)),
|
||||||
|
)]
|
||||||
|
.into(),
|
||||||
|
txids,
|
||||||
|
outpoints,
|
||||||
|
usize::MAX,
|
||||||
|
parallel_requests,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(wallet_scan.update)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EsploraExt for esplora_client::BlockingClient {
|
||||||
|
fn scan<K: Ord + Clone>(
|
||||||
|
&self,
|
||||||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||||||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||||
|
txids: impl IntoIterator<Item = Txid>,
|
||||||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||||
|
stop_gap: usize,
|
||||||
|
parallel_requests: usize,
|
||||||
|
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
|
||||||
|
let parallel_requests = parallel_requests.max(1);
|
||||||
|
let mut scan = KeychainScan::default();
|
||||||
|
let update = &mut scan.update;
|
||||||
|
let last_active_indices = &mut scan.last_active_indices;
|
||||||
|
|
||||||
|
for (&height, &original_hash) in local_chain.iter().rev() {
|
||||||
|
let update_block_id = BlockId {
|
||||||
|
height,
|
||||||
|
hash: self.get_block_hash(height)?,
|
||||||
|
};
|
||||||
|
let _ = update
|
||||||
|
.insert_checkpoint(update_block_id)
|
||||||
|
.expect("cannot repeat height here");
|
||||||
|
if update_block_id.hash == original_hash {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let tip_at_start = BlockId {
|
||||||
|
height: self.get_height()?,
|
||||||
|
hash: self.get_tip_hash()?,
|
||||||
|
};
|
||||||
|
if let Err(failure) = update.insert_checkpoint(tip_at_start) {
|
||||||
|
match failure {
|
||||||
|
sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
|
||||||
|
// there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
|
||||||
|
return EsploraExt::scan(
|
||||||
|
self,
|
||||||
|
local_chain,
|
||||||
|
keychain_spks,
|
||||||
|
txids,
|
||||||
|
outpoints,
|
||||||
|
stop_gap,
|
||||||
|
parallel_requests,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (keychain, spks) in keychain_spks {
|
||||||
|
let mut spks = spks.into_iter();
|
||||||
|
let mut last_active_index = None;
|
||||||
|
let mut empty_scripts = 0;
|
||||||
|
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let handles = (0..parallel_requests)
|
||||||
|
.filter_map(
|
||||||
|
|_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
|
||||||
|
let (index, script) = spks.next()?;
|
||||||
|
let client = self.clone();
|
||||||
|
Some(std::thread::spawn(move || {
|
||||||
|
let mut related_txs = client.scripthash_txs(&script, None)?;
|
||||||
|
|
||||||
|
let n_confirmed =
|
||||||
|
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
||||||
|
// esplora pages on 25 confirmed transactions. If there's 25 or more we
|
||||||
|
// keep requesting to see if there's more.
|
||||||
|
if n_confirmed >= 25 {
|
||||||
|
loop {
|
||||||
|
let new_related_txs = client.scripthash_txs(
|
||||||
|
&script,
|
||||||
|
Some(related_txs.last().unwrap().txid),
|
||||||
|
)?;
|
||||||
|
let n = new_related_txs.len();
|
||||||
|
related_txs.extend(new_related_txs);
|
||||||
|
// we've reached the end
|
||||||
|
if n < 25 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
||||||
|
}))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let n_handles = handles.len();
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
|
||||||
|
if related_txs.is_empty() {
|
||||||
|
empty_scripts += 1;
|
||||||
|
} else {
|
||||||
|
last_active_index = Some(index);
|
||||||
|
empty_scripts = 0;
|
||||||
|
}
|
||||||
|
for tx in related_txs {
|
||||||
|
let confirmation_time =
|
||||||
|
map_confirmation_time(&tx.status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
|
||||||
|
use bdk_chain::{
|
||||||
|
chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
|
||||||
|
};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if n_handles == 0 || empty_scripts >= stop_gap {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(last_active_index) = last_active_index {
|
||||||
|
last_active_indices.insert(keychain, last_active_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for txid in txids.into_iter() {
|
||||||
|
let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
|
||||||
|
(Some(tx), Some(tx_status)) => (tx, tx_status),
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
||||||
|
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for op in outpoints.into_iter() {
|
||||||
|
let mut op_txs = Vec::with_capacity(2);
|
||||||
|
if let (Some(tx), Some(tx_status)) =
|
||||||
|
(self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
|
||||||
|
{
|
||||||
|
op_txs.push((tx, tx_status));
|
||||||
|
if let Some(OutputStatus {
|
||||||
|
txid: Some(txid),
|
||||||
|
status: Some(spend_status),
|
||||||
|
..
|
||||||
|
}) = self.get_output_status(&op.txid, op.vout as _)?
|
||||||
|
{
|
||||||
|
if let Some(spend_tx) = self.get_tx(&txid)? {
|
||||||
|
op_txs.push((spend_tx, spend_status));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (tx, status) in op_txs {
|
||||||
|
let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
|
||||||
|
|
||||||
|
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
||||||
|
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
||||||
|
match failure {
|
||||||
|
InsertTxError::Chain(TxTooHigh { .. }) => {
|
||||||
|
unreachable!("chain position already checked earlier")
|
||||||
|
}
|
||||||
|
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
||||||
|
| InsertTxError::UnresolvableConflict(_) => {
|
||||||
|
/* implies reorg during scan. We deal with that below */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let reorg_occurred = {
|
||||||
|
if let Some(checkpoint) = update.chain().latest_checkpoint() {
|
||||||
|
self.get_block_hash(checkpoint.height)? != checkpoint.hash
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if reorg_occurred {
|
||||||
|
// A reorg occurred so lets find out where all the txids we found are in the chain now.
|
||||||
|
// XXX: collect required because of weird type naming issues
|
||||||
|
let txids_found = update
|
||||||
|
.chain()
|
||||||
|
.txids()
|
||||||
|
.map(|(_, txid)| *txid)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
scan.update = EsploraExt::scan_without_keychain(
|
||||||
|
self,
|
||||||
|
local_chain,
|
||||||
|
[],
|
||||||
|
txids_found,
|
||||||
|
[],
|
||||||
|
parallel_requests,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(scan)
|
||||||
|
}
|
||||||
|
}
|
@ -1,305 +1,27 @@
|
|||||||
//! This crate is used for updating structures of [`bdk_chain`] with data from an esplora server.
|
//! This crate is used for updating structures of [`bdk_chain`] with data from an esplora server.
|
||||||
//!
|
//!
|
||||||
//! The star of the show is the [`EsploraExt::scan`] method which scans for relevant
|
//! The star of the show is the [`EsploraExt::scan`] method which scans for relevant
|
||||||
//! blockchain data (via esplora) and outputs a [`KeychainScan`].
|
//! blockchain data (via esplora) and outputs a [`KeychainScan`](bdk_chain::keychain::KeychainScan).
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use bdk_chain::ConfirmationTime;
|
||||||
use bdk_chain::{
|
use esplora_client::TxStatus;
|
||||||
bitcoin::{BlockHash, OutPoint, Script, Txid},
|
|
||||||
chain_graph::ChainGraph,
|
|
||||||
keychain::KeychainScan,
|
|
||||||
sparse_chain, BlockId, ConfirmationTime,
|
|
||||||
};
|
|
||||||
use esplora_client::{OutputStatus, TxStatus};
|
|
||||||
use futures::stream::{FuturesOrdered, TryStreamExt};
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
pub use esplora_client;
|
pub use esplora_client;
|
||||||
use esplora_client::Error;
|
|
||||||
|
|
||||||
/// Trait to extend [`esplora_client::BlockingClient`] functionality.
|
|
||||||
///
|
|
||||||
/// Refer to [crate-level documentation] for more.
|
|
||||||
///
|
|
||||||
/// [crate-level documentation]: crate
|
|
||||||
|
|
||||||
#[cfg(feature = "blocking")]
|
#[cfg(feature = "blocking")]
|
||||||
pub trait EsploraExt {
|
mod blocking_ext;
|
||||||
/// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
|
|
||||||
///
|
|
||||||
/// - `local_chain`: the most recent block hashes present locally
|
|
||||||
/// - `keychain_spks`: keychains that we want to scan transactions for
|
|
||||||
/// - `txids`: transactions that we want updated [`ChainPosition`]s for
|
|
||||||
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
|
||||||
/// want to included in the update
|
|
||||||
///
|
|
||||||
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
|
||||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
|
||||||
/// parallel.
|
|
||||||
///
|
|
||||||
/// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
|
|
||||||
#[allow(clippy::result_large_err)] // FIXME
|
|
||||||
fn scan<K: Ord + Clone>(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
stop_gap: usize,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
|
|
||||||
|
|
||||||
/// Convenience method to call [`scan`] without requiring a keychain.
|
|
||||||
///
|
|
||||||
/// [`scan`]: EsploraExt::scan
|
|
||||||
#[allow(clippy::result_large_err)] // FIXME
|
|
||||||
fn scan_without_keychain(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
misc_spks: impl IntoIterator<Item = Script>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<ChainGraph<ConfirmationTime>, Error> {
|
|
||||||
let wallet_scan = self.scan(
|
|
||||||
local_chain,
|
|
||||||
[(
|
|
||||||
(),
|
|
||||||
misc_spks
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, spk)| (i as u32, spk)),
|
|
||||||
)]
|
|
||||||
.into(),
|
|
||||||
txids,
|
|
||||||
outpoints,
|
|
||||||
usize::MAX,
|
|
||||||
parallel_requests,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(wallet_scan.update)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "blocking")]
|
#[cfg(feature = "blocking")]
|
||||||
impl EsploraExt for esplora_client::BlockingClient {
|
pub use blocking_ext::*;
|
||||||
fn scan<K: Ord + Clone>(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
stop_gap: usize,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
|
|
||||||
let parallel_requests = parallel_requests.max(1);
|
|
||||||
let mut scan = KeychainScan::default();
|
|
||||||
let update = &mut scan.update;
|
|
||||||
let last_active_indices = &mut scan.last_active_indices;
|
|
||||||
|
|
||||||
for (&height, &original_hash) in local_chain.iter().rev() {
|
#[cfg(feature = "async")]
|
||||||
let update_block_id = BlockId {
|
mod async_ext;
|
||||||
height,
|
#[cfg(feature = "async")]
|
||||||
hash: self.get_block_hash(height)?,
|
pub use async_ext::*;
|
||||||
};
|
|
||||||
let _ = update
|
|
||||||
.insert_checkpoint(update_block_id)
|
|
||||||
.expect("cannot repeat height here");
|
|
||||||
if update_block_id.hash == original_hash {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let tip_at_start = BlockId {
|
|
||||||
height: self.get_height()?,
|
|
||||||
hash: self.get_tip_hash()?,
|
|
||||||
};
|
|
||||||
if let Err(failure) = update.insert_checkpoint(tip_at_start) {
|
|
||||||
match failure {
|
|
||||||
sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
|
|
||||||
// there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
|
|
||||||
return EsploraExt::scan(
|
|
||||||
self,
|
|
||||||
local_chain,
|
|
||||||
keychain_spks,
|
|
||||||
txids,
|
|
||||||
outpoints,
|
|
||||||
stop_gap,
|
|
||||||
parallel_requests,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (keychain, spks) in keychain_spks {
|
pub(crate) fn map_confirmation_time(
|
||||||
let mut spks = spks.into_iter();
|
tx_status: &TxStatus,
|
||||||
let mut last_active_index = None;
|
height_at_start: u32,
|
||||||
let mut empty_scripts = 0;
|
) -> ConfirmationTime {
|
||||||
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let handles = (0..parallel_requests)
|
|
||||||
.filter_map(
|
|
||||||
|_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
|
|
||||||
let (index, script) = spks.next()?;
|
|
||||||
let client = self.clone();
|
|
||||||
Some(std::thread::spawn(move || {
|
|
||||||
let mut related_txs = client.scripthash_txs(&script, None)?;
|
|
||||||
|
|
||||||
let n_confirmed =
|
|
||||||
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
|
||||||
// esplora pages on 25 confirmed transactions. If there's 25 or more we
|
|
||||||
// keep requesting to see if there's more.
|
|
||||||
if n_confirmed >= 25 {
|
|
||||||
loop {
|
|
||||||
let new_related_txs = client.scripthash_txs(
|
|
||||||
&script,
|
|
||||||
Some(related_txs.last().unwrap().txid),
|
|
||||||
)?;
|
|
||||||
let n = new_related_txs.len();
|
|
||||||
related_txs.extend(new_related_txs);
|
|
||||||
// we've reached the end
|
|
||||||
if n < 25 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
|
||||||
}))
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let n_handles = handles.len();
|
|
||||||
|
|
||||||
for handle in handles {
|
|
||||||
let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
|
|
||||||
if related_txs.is_empty() {
|
|
||||||
empty_scripts += 1;
|
|
||||||
} else {
|
|
||||||
last_active_index = Some(index);
|
|
||||||
empty_scripts = 0;
|
|
||||||
}
|
|
||||||
for tx in related_txs {
|
|
||||||
let confirmation_time =
|
|
||||||
map_confirmation_time(&tx.status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
|
|
||||||
use bdk_chain::{
|
|
||||||
chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
|
|
||||||
};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if n_handles == 0 || empty_scripts >= stop_gap {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(last_active_index) = last_active_index {
|
|
||||||
last_active_indices.insert(keychain, last_active_index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for txid in txids.into_iter() {
|
|
||||||
let (tx, tx_status) = match (self.get_tx(&txid)?, self.get_tx_status(&txid)?) {
|
|
||||||
(Some(tx), Some(tx_status)) => (tx, tx_status),
|
|
||||||
_ => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
|
||||||
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for op in outpoints.into_iter() {
|
|
||||||
let mut op_txs = Vec::with_capacity(2);
|
|
||||||
if let (Some(tx), Some(tx_status)) =
|
|
||||||
(self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
|
|
||||||
{
|
|
||||||
op_txs.push((tx, tx_status));
|
|
||||||
if let Some(OutputStatus {
|
|
||||||
txid: Some(txid),
|
|
||||||
status: Some(spend_status),
|
|
||||||
..
|
|
||||||
}) = self.get_output_status(&op.txid, op.vout as _)?
|
|
||||||
{
|
|
||||||
if let Some(spend_tx) = self.get_tx(&txid)? {
|
|
||||||
op_txs.push((spend_tx, spend_status));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (tx, status) in op_txs {
|
|
||||||
let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
|
||||||
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let reorg_occurred = {
|
|
||||||
if let Some(checkpoint) = update.chain().latest_checkpoint() {
|
|
||||||
self.get_block_hash(checkpoint.height)? != checkpoint.hash
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if reorg_occurred {
|
|
||||||
// A reorg occurred so lets find out where all the txids we found are in the chain now.
|
|
||||||
// XXX: collect required because of weird type naming issues
|
|
||||||
let txids_found = update
|
|
||||||
.chain()
|
|
||||||
.txids()
|
|
||||||
.map(|(_, txid)| *txid)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
scan.update = EsploraExt::scan_without_keychain(
|
|
||||||
self,
|
|
||||||
local_chain,
|
|
||||||
[],
|
|
||||||
txids_found,
|
|
||||||
[],
|
|
||||||
parallel_requests,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(scan)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> ConfirmationTime {
|
|
||||||
match (tx_status.block_time, tx_status.block_height) {
|
match (tx_status.block_time, tx_status.block_height) {
|
||||||
(Some(time), Some(height)) if height <= height_at_start => {
|
(Some(time), Some(height)) if height <= height_at_start => {
|
||||||
ConfirmationTime::Confirmed { height, time }
|
ConfirmationTime::Confirmed { height, time }
|
||||||
@ -307,288 +29,3 @@ fn map_confirmation_time(tx_status: &TxStatus, height_at_start: u32) -> Confirma
|
|||||||
_ => ConfirmationTime::Unconfirmed,
|
_ => ConfirmationTime::Unconfirmed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "async")]
|
|
||||||
#[async_trait(?Send)]
|
|
||||||
pub trait EsploraAsyncExt {
|
|
||||||
/// Scan the blockchain (via esplora) for the data specified and returns a [`KeychainScan`].
|
|
||||||
///
|
|
||||||
/// - `local_chain`: the most recent block hashes present locally
|
|
||||||
/// - `keychain_spks`: keychains that we want to scan transactions for
|
|
||||||
/// - `txids`: transactions that we want updated [`ChainPosition`]s for
|
|
||||||
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
|
||||||
/// want to included in the update
|
|
||||||
///
|
|
||||||
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
|
||||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
|
||||||
/// parallel.
|
|
||||||
///
|
|
||||||
/// [`ChainPosition`]: bdk_chain::sparse_chain::ChainPosition
|
|
||||||
#[allow(clippy::result_large_err)] // FIXME
|
|
||||||
async fn scan<K: Ord + Clone>(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
stop_gap: usize,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<KeychainScan<K, ConfirmationTime>, Error>;
|
|
||||||
|
|
||||||
/// Convenience method to call [`scan`] without requiring a keychain.
|
|
||||||
///
|
|
||||||
/// [`scan`]: EsploraAsyncExt::scan
|
|
||||||
#[allow(clippy::result_large_err)] // FIXME
|
|
||||||
async fn scan_without_keychain(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
misc_spks: impl IntoIterator<Item = Script>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<ChainGraph<ConfirmationTime>, Error> {
|
|
||||||
let wallet_scan = self
|
|
||||||
.scan(
|
|
||||||
local_chain,
|
|
||||||
[(
|
|
||||||
(),
|
|
||||||
misc_spks
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, spk)| (i as u32, spk)),
|
|
||||||
)]
|
|
||||||
.into(),
|
|
||||||
txids,
|
|
||||||
outpoints,
|
|
||||||
usize::MAX,
|
|
||||||
parallel_requests,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(wallet_scan.update)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "async")]
|
|
||||||
#[async_trait(?Send)]
|
|
||||||
impl EsploraAsyncExt for esplora_client::AsyncClient {
|
|
||||||
async fn scan<K: Ord + Clone>(
|
|
||||||
&self,
|
|
||||||
local_chain: &BTreeMap<u32, BlockHash>,
|
|
||||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
|
||||||
txids: impl IntoIterator<Item = Txid>,
|
|
||||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
|
||||||
stop_gap: usize,
|
|
||||||
parallel_requests: usize,
|
|
||||||
) -> Result<KeychainScan<K, ConfirmationTime>, Error> {
|
|
||||||
let parallel_requests = parallel_requests.max(1);
|
|
||||||
let mut scan = KeychainScan::default();
|
|
||||||
let update = &mut scan.update;
|
|
||||||
let last_active_indices = &mut scan.last_active_indices;
|
|
||||||
|
|
||||||
for (&height, &original_hash) in local_chain.iter().rev() {
|
|
||||||
let update_block_id = BlockId {
|
|
||||||
height,
|
|
||||||
hash: self.get_block_hash(height).await?,
|
|
||||||
};
|
|
||||||
let _ = update
|
|
||||||
.insert_checkpoint(update_block_id)
|
|
||||||
.expect("cannot repeat height here");
|
|
||||||
if update_block_id.hash == original_hash {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let tip_at_start = BlockId {
|
|
||||||
height: self.get_height().await?,
|
|
||||||
hash: self.get_tip_hash().await?,
|
|
||||||
};
|
|
||||||
if let Err(failure) = update.insert_checkpoint(tip_at_start) {
|
|
||||||
match failure {
|
|
||||||
sparse_chain::InsertCheckpointError::HashNotMatching { .. } => {
|
|
||||||
// there has been a re-org before we started scanning. We haven't consumed any iterators so it's safe to recursively call.
|
|
||||||
return EsploraAsyncExt::scan(
|
|
||||||
self,
|
|
||||||
local_chain,
|
|
||||||
keychain_spks,
|
|
||||||
txids,
|
|
||||||
outpoints,
|
|
||||||
stop_gap,
|
|
||||||
parallel_requests,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (keychain, spks) in keychain_spks {
|
|
||||||
let mut spks = spks.into_iter();
|
|
||||||
let mut last_active_index = None;
|
|
||||||
let mut empty_scripts = 0;
|
|
||||||
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let futures: FuturesOrdered<_> = (0..parallel_requests)
|
|
||||||
.filter_map(|_| {
|
|
||||||
let (index, script) = spks.next()?;
|
|
||||||
let client = self.clone();
|
|
||||||
Some(async move {
|
|
||||||
let mut related_txs = client.scripthash_txs(&script, None).await?;
|
|
||||||
|
|
||||||
let n_confirmed =
|
|
||||||
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
|
||||||
// esplora pages on 25 confirmed transactions. If there's 25 or more we
|
|
||||||
// keep requesting to see if there's more.
|
|
||||||
if n_confirmed >= 25 {
|
|
||||||
loop {
|
|
||||||
let new_related_txs = client
|
|
||||||
.scripthash_txs(
|
|
||||||
&script,
|
|
||||||
Some(related_txs.last().unwrap().txid),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let n = new_related_txs.len();
|
|
||||||
related_txs.extend(new_related_txs);
|
|
||||||
// we've reached the end
|
|
||||||
if n < 25 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let n_futures = futures.len();
|
|
||||||
|
|
||||||
let idx_with_tx: Vec<IndexWithTxs> = futures.try_collect().await?;
|
|
||||||
|
|
||||||
for (index, related_txs) in idx_with_tx {
|
|
||||||
if related_txs.is_empty() {
|
|
||||||
empty_scripts += 1;
|
|
||||||
} else {
|
|
||||||
last_active_index = Some(index);
|
|
||||||
empty_scripts = 0;
|
|
||||||
}
|
|
||||||
for tx in related_txs {
|
|
||||||
let confirmation_time =
|
|
||||||
map_confirmation_time(&tx.status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx.to_tx(), confirmation_time) {
|
|
||||||
use bdk_chain::{
|
|
||||||
chain_graph::InsertTxError, sparse_chain::InsertTxError::*,
|
|
||||||
};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if n_futures == 0 || empty_scripts >= stop_gap {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(last_active_index) = last_active_index {
|
|
||||||
last_active_indices.insert(keychain, last_active_index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for txid in txids.into_iter() {
|
|
||||||
let (tx, tx_status) =
|
|
||||||
match (self.get_tx(&txid).await?, self.get_tx_status(&txid).await?) {
|
|
||||||
(Some(tx), Some(tx_status)) => (tx, tx_status),
|
|
||||||
_ => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let confirmation_time = map_confirmation_time(&tx_status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
|
||||||
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for op in outpoints.into_iter() {
|
|
||||||
let mut op_txs = Vec::with_capacity(2);
|
|
||||||
if let (Some(tx), Some(tx_status)) = (
|
|
||||||
self.get_tx(&op.txid).await?,
|
|
||||||
self.get_tx_status(&op.txid).await?,
|
|
||||||
) {
|
|
||||||
op_txs.push((tx, tx_status));
|
|
||||||
if let Some(OutputStatus {
|
|
||||||
txid: Some(txid),
|
|
||||||
status: Some(spend_status),
|
|
||||||
..
|
|
||||||
}) = self.get_output_status(&op.txid, op.vout as _).await?
|
|
||||||
{
|
|
||||||
if let Some(spend_tx) = self.get_tx(&txid).await? {
|
|
||||||
op_txs.push((spend_tx, spend_status));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (tx, status) in op_txs {
|
|
||||||
let confirmation_time = map_confirmation_time(&status, tip_at_start.height);
|
|
||||||
|
|
||||||
if let Err(failure) = update.insert_tx(tx, confirmation_time) {
|
|
||||||
use bdk_chain::{chain_graph::InsertTxError, sparse_chain::InsertTxError::*};
|
|
||||||
match failure {
|
|
||||||
InsertTxError::Chain(TxTooHigh { .. }) => {
|
|
||||||
unreachable!("chain position already checked earlier")
|
|
||||||
}
|
|
||||||
InsertTxError::Chain(TxMovedUnexpectedly { .. })
|
|
||||||
| InsertTxError::UnresolvableConflict(_) => {
|
|
||||||
/* implies reorg during scan. We deal with that below */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let reorg_occurred = {
|
|
||||||
if let Some(checkpoint) = update.chain().latest_checkpoint() {
|
|
||||||
self.get_block_hash(checkpoint.height).await? != checkpoint.hash
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if reorg_occurred {
|
|
||||||
// A reorg occurred so lets find out where all the txids we found are in the chain now.
|
|
||||||
// XXX: collect required because of weird type naming issues
|
|
||||||
let txids_found = update
|
|
||||||
.chain()
|
|
||||||
.txids()
|
|
||||||
.map(|(_, txid)| *txid)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
scan.update = EsploraAsyncExt::scan_without_keychain(
|
|
||||||
self,
|
|
||||||
local_chain,
|
|
||||||
[],
|
|
||||||
txids_found,
|
|
||||||
[],
|
|
||||||
parallel_requests,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(scan)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -8,5 +8,5 @@ publish = false
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bdk = { path = "../../crates/bdk" }
|
bdk = { path = "../../crates/bdk" }
|
||||||
bdk_esplora = { path = "../../crates/esplora" }
|
bdk_esplora = { path = "../../crates/esplora", features = ["blocking"] }
|
||||||
bdk_file_store = { path = "../../crates/file_store" }
|
bdk_file_store = { path = "../../crates/file_store" }
|
||||||
|
12
example-crates/wallet_esplora_async/Cargo.toml
Normal file
12
example-crates/wallet_esplora_async/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
[package]
|
||||||
|
name = "wallet_esplora_async"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
bdk = { path = "../../crates/bdk" }
|
||||||
|
bdk_esplora = { path = "../../crates/esplora", features = ["async-https"] }
|
||||||
|
bdk_file_store = { path = "../../crates/file_store" }
|
||||||
|
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
|
99
example-crates/wallet_esplora_async/src/main.rs
Normal file
99
example-crates/wallet_esplora_async/src/main.rs
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
use std::{io::Write, str::FromStr};
|
||||||
|
|
||||||
|
use bdk::{
|
||||||
|
bitcoin::{Address, Network},
|
||||||
|
wallet::AddressIndex,
|
||||||
|
SignOptions, Wallet,
|
||||||
|
};
|
||||||
|
use bdk_esplora::{esplora_client, EsploraAsyncExt};
|
||||||
|
use bdk_file_store::KeychainStore;
|
||||||
|
|
||||||
|
const SEND_AMOUNT: u64 = 5000;
|
||||||
|
const STOP_GAP: usize = 50;
|
||||||
|
const PARALLEL_REQUESTS: usize = 5;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let db_path = std::env::temp_dir().join("bdk-esplora-example");
|
||||||
|
let db = KeychainStore::new_from_path(db_path)?;
|
||||||
|
let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/0/*)";
|
||||||
|
let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/1/*)";
|
||||||
|
|
||||||
|
let mut wallet = Wallet::new(
|
||||||
|
external_descriptor,
|
||||||
|
Some(internal_descriptor),
|
||||||
|
db,
|
||||||
|
Network::Testnet,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let address = wallet.get_address(AddressIndex::New);
|
||||||
|
println!("Generated Address: {}", address);
|
||||||
|
|
||||||
|
let balance = wallet.get_balance();
|
||||||
|
println!("Wallet balance before syncing: {} sats", balance.total());
|
||||||
|
|
||||||
|
print!("Syncing...");
|
||||||
|
// Scanning the blockchain
|
||||||
|
let esplora_url = "https://mempool.space/testnet/api";
|
||||||
|
let client = esplora_client::Builder::new(esplora_url).build_async()?;
|
||||||
|
let checkpoints = wallet.checkpoints();
|
||||||
|
let spks = wallet
|
||||||
|
.spks_of_all_keychains()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, spks)| {
|
||||||
|
let mut first = true;
|
||||||
|
(
|
||||||
|
k,
|
||||||
|
spks.inspect(move |(spk_i, _)| {
|
||||||
|
if first {
|
||||||
|
first = false;
|
||||||
|
print!("\nScanning keychain [{:?}]:", k);
|
||||||
|
}
|
||||||
|
print!(" {}", spk_i);
|
||||||
|
let _ = std::io::stdout().flush();
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let update = client
|
||||||
|
.scan(
|
||||||
|
checkpoints,
|
||||||
|
spks,
|
||||||
|
std::iter::empty(),
|
||||||
|
std::iter::empty(),
|
||||||
|
STOP_GAP,
|
||||||
|
PARALLEL_REQUESTS,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
println!();
|
||||||
|
wallet.apply_update(update)?;
|
||||||
|
wallet.commit()?;
|
||||||
|
|
||||||
|
let balance = wallet.get_balance();
|
||||||
|
println!("Wallet balance after syncing: {} sats", balance.total());
|
||||||
|
|
||||||
|
if balance.total() < SEND_AMOUNT {
|
||||||
|
println!(
|
||||||
|
"Please send at least {} sats to the receiving address",
|
||||||
|
SEND_AMOUNT
|
||||||
|
);
|
||||||
|
std::process::exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?;
|
||||||
|
|
||||||
|
let mut tx_builder = wallet.build_tx();
|
||||||
|
tx_builder
|
||||||
|
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
|
||||||
|
.enable_rbf();
|
||||||
|
|
||||||
|
let (mut psbt, _) = tx_builder.finish()?;
|
||||||
|
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
|
||||||
|
assert!(finalized);
|
||||||
|
|
||||||
|
let tx = psbt.extract_tx();
|
||||||
|
client.broadcast(&tx).await?;
|
||||||
|
println!("Tx broadcasted! Txid: {}", tx.txid());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user