feat(esplora)!: remove EsploraExt::update_local_chain
Previously, we would update the `TxGraph` and `KeychainTxOutIndex` first, then create a second update for `LocalChain`. This required locking the receiving structures 3 times (instead of twice, which is optimal). This PR eliminates this requirement by making use of the new `query` method of `CheckPoint`. Examples are also updated to use the new API.
This commit is contained in:
@@ -1,15 +1,18 @@
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bdk_chain::collections::btree_map;
|
||||
use bdk_chain::Anchor;
|
||||
use bdk_chain::{
|
||||
bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
|
||||
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
|
||||
collections::BTreeMap,
|
||||
local_chain::{self, CheckPoint},
|
||||
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
|
||||
};
|
||||
use esplora_client::TxStatus;
|
||||
use esplora_client::{Amount, TxStatus};
|
||||
use futures::{stream::FuturesOrdered, TryStreamExt};
|
||||
|
||||
use crate::anchor_from_status;
|
||||
use crate::{anchor_from_status, FullScanUpdate, SyncUpdate};
|
||||
|
||||
/// [`esplora_client::Error`]
|
||||
type Error = Box<esplora_client::Error>;
|
||||
@@ -22,36 +25,15 @@ type Error = Box<esplora_client::Error>;
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait EsploraAsyncExt {
|
||||
/// Prepare a [`LocalChain`] update with blocks fetched from Esplora.
|
||||
///
|
||||
/// * `local_tip` is the previous tip of [`LocalChain::tip`].
|
||||
/// * `request_heights` is the block heights that we are interested in fetching from Esplora.
|
||||
///
|
||||
/// The result of this method can be applied to [`LocalChain::apply_update`].
|
||||
///
|
||||
/// ## Consistency
|
||||
///
|
||||
/// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org
|
||||
/// during the call. The size of re-org we can tollerate is server dependent but will be at
|
||||
/// least 10.
|
||||
///
|
||||
/// [`LocalChain`]: bdk_chain::local_chain::LocalChain
|
||||
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
|
||||
/// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
|
||||
async fn update_local_chain(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
||||
) -> Result<local_chain::Update, Error>;
|
||||
|
||||
/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
|
||||
/// returns a [`TxGraph`] and a map of last active indices.
|
||||
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
|
||||
/// applied to the receiving structures.
|
||||
///
|
||||
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
|
||||
/// * `keychain_spks`: keychains that we want to scan transactions for
|
||||
///
|
||||
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||
/// parallel.
|
||||
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
|
||||
/// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
|
||||
/// make in parallel.
|
||||
///
|
||||
/// ## Note
|
||||
///
|
||||
@@ -65,19 +47,23 @@ pub trait EsploraAsyncExt {
|
||||
/// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
|
||||
///
|
||||
/// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
|
||||
///
|
||||
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
|
||||
async fn full_scan<K: Ord + Clone + Send>(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
keychain_spks: BTreeMap<
|
||||
K,
|
||||
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
|
||||
>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
|
||||
) -> Result<FullScanUpdate<K>, Error>;
|
||||
|
||||
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
|
||||
/// specified and return a [`TxGraph`].
|
||||
///
|
||||
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
|
||||
/// * `misc_spks`: scripts that we want to sync transactions for
|
||||
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
|
||||
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
@@ -86,210 +72,216 @@ pub trait EsploraAsyncExt {
|
||||
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
|
||||
/// may include scripts that have been used, use [`full_scan`] with the keychain.
|
||||
///
|
||||
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
|
||||
/// [`full_scan`]: EsploraAsyncExt::full_scan
|
||||
async fn sync(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
|
||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
|
||||
) -> Result<SyncUpdate, Error>;
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
impl EsploraAsyncExt for esplora_client::AsyncClient {
|
||||
async fn update_local_chain(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
|
||||
// consistent.
|
||||
let mut fetched_blocks = self
|
||||
.get_blocks(None)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|b| (b.time.height, b.id))
|
||||
.collect::<BTreeMap<u32, BlockHash>>();
|
||||
let new_tip_height = fetched_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must have atleast one block");
|
||||
|
||||
// Fetch blocks of heights that the caller is interested in, skipping blocks that are
|
||||
// already fetched when constructing `fetched_blocks`.
|
||||
for height in request_heights {
|
||||
// do not fetch blocks higher than remote tip
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
// only fetch what is missing
|
||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||
// ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent
|
||||
// with the chain at the time of `get_blocks` above (there could have been a deep
|
||||
// re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's
|
||||
// not possible to have a re-org deeper than that.
|
||||
entry.insert(self.get_block_hash(height).await?);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure `fetched_blocks` can create an update that connects with the original chain by
|
||||
// finding a "Point of Agreement".
|
||||
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
let fetched_hash = match fetched_blocks.entry(height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => {
|
||||
*entry.insert(self.get_block_hash(height).await?)
|
||||
}
|
||||
};
|
||||
|
||||
// We have found point of agreement so the update will connect!
|
||||
if fetched_hash == local_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
|
||||
.expect("must be in height order"),
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
async fn full_scan<K: Ord + Clone + Send>(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
keychain_spks: BTreeMap<
|
||||
K,
|
||||
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
|
||||
>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
|
||||
let mut last_active_indexes = BTreeMap::<K, u32>::new();
|
||||
let stop_gap = Ord::max(stop_gap, 1);
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
loop {
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.map(|(spk_index, spk)| {
|
||||
let client = self.clone();
|
||||
async move {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen).await?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Result::<_, Error>::Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
}
|
||||
for tx in txs {
|
||||
let _ = graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
|
||||
let previous_outputs = tx.vin.iter().filter_map(|vin| {
|
||||
let prevout = vin.prevout.as_ref()?;
|
||||
Some((
|
||||
OutPoint {
|
||||
txid: vin.txid,
|
||||
vout: vin.vout,
|
||||
},
|
||||
TxOut {
|
||||
script_pubkey: prevout.scriptpubkey.clone(),
|
||||
value: Amount::from_sat(prevout.value),
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
for (outpoint, txout) in previous_outputs {
|
||||
let _ = graph.insert_txout(outpoint, txout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let last_index = last_index.expect("Must be set since handles wasn't empty.");
|
||||
let gap_limit_reached = if let Some(i) = last_active_index {
|
||||
last_index >= i.saturating_add(stop_gap as u32)
|
||||
} else {
|
||||
last_index + 1 >= stop_gap as u32
|
||||
};
|
||||
if gap_limit_reached {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
last_active_indexes.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((graph, last_active_indexes))
|
||||
) -> Result<FullScanUpdate<K>, Error> {
|
||||
let update_blocks = init_chain_update(self, &local_tip).await?;
|
||||
let (tx_graph, last_active_indices) =
|
||||
full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?;
|
||||
let local_chain =
|
||||
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
|
||||
Ok(FullScanUpdate {
|
||||
local_chain,
|
||||
tx_graph,
|
||||
last_active_indices,
|
||||
})
|
||||
}
|
||||
|
||||
async fn sync(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
|
||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
|
||||
let mut graph = self
|
||||
.full_scan(
|
||||
[(
|
||||
(),
|
||||
misc_spks
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, spk)| (i as u32, spk)),
|
||||
)]
|
||||
.into(),
|
||||
usize::MAX,
|
||||
parallel_requests,
|
||||
)
|
||||
.await
|
||||
.map(|(g, _)| g)?;
|
||||
) -> Result<SyncUpdate, Error> {
|
||||
let update_blocks = init_chain_update(self, &local_tip).await?;
|
||||
let tx_graph =
|
||||
sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?;
|
||||
let local_chain =
|
||||
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
|
||||
Ok(SyncUpdate {
|
||||
tx_graph,
|
||||
local_chain,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Create the initial chain update.
|
||||
///
|
||||
/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
|
||||
/// update can connect to the `start_tip`.
|
||||
///
|
||||
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
|
||||
/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
|
||||
/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
|
||||
/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
|
||||
/// alternating between chain-sources.
|
||||
#[doc(hidden)]
|
||||
pub async fn init_chain_update(
|
||||
client: &esplora_client::AsyncClient,
|
||||
local_tip: &CheckPoint,
|
||||
) -> Result<BTreeMap<u32, BlockHash>, Error> {
|
||||
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
|
||||
// consistent.
|
||||
let mut fetched_blocks = client
|
||||
.get_blocks(None)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|b| (b.time.height, b.id))
|
||||
.collect::<BTreeMap<u32, BlockHash>>();
|
||||
let new_tip_height = fetched_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must atleast have one block");
|
||||
|
||||
// Ensure `fetched_blocks` can create an update that connects with the original chain by
|
||||
// finding a "Point of Agreement".
|
||||
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
let fetched_hash = match fetched_blocks.entry(height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?),
|
||||
};
|
||||
|
||||
// We have found point of agreement so the update will connect!
|
||||
if fetched_hash == local_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(fetched_blocks)
|
||||
}
|
||||
|
||||
/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
|
||||
///
|
||||
/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
|
||||
/// existing checkpoint/block under `local_tip` or `update_blocks`.
|
||||
#[doc(hidden)]
|
||||
pub async fn finalize_chain_update<A: Anchor>(
|
||||
client: &esplora_client::AsyncClient,
|
||||
local_tip: &CheckPoint,
|
||||
anchors: &BTreeSet<(A, Txid)>,
|
||||
mut update_blocks: BTreeMap<u32, BlockHash>,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
let update_tip_height = update_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must atleast have one block");
|
||||
|
||||
// We want to have a corresponding checkpoint per height. We iterate the heights of anchors
|
||||
// backwards, comparing it against our `local_tip`'s chain and our current set of
|
||||
// `update_blocks` to see if a corresponding checkpoint already exists.
|
||||
let anchor_heights = anchors
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|(a, _)| a.anchor_block().height)
|
||||
// filter out heights that surpass the update tip
|
||||
.filter(|h| *h <= update_tip_height)
|
||||
// filter out duplicate heights
|
||||
.filter({
|
||||
let mut prev_height = Option::<u32>::None;
|
||||
move |h| match prev_height.replace(*h) {
|
||||
None => true,
|
||||
Some(prev_h) => prev_h != *h,
|
||||
}
|
||||
});
|
||||
|
||||
// We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
|
||||
// checkpoints more efficient.
|
||||
let mut curr_cp = local_tip.clone();
|
||||
|
||||
for h in anchor_heights {
|
||||
if let Some(cp) = curr_cp.range(h..).last() {
|
||||
curr_cp = cp.clone();
|
||||
if cp.height() == h {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
|
||||
entry.insert(client.get_block_hash(h).await?);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip: CheckPoint::from_block_ids(
|
||||
update_blocks
|
||||
.into_iter()
|
||||
.map(|(height, hash)| BlockId { height, hash }),
|
||||
)
|
||||
.expect("must be in order"),
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
/// This performs a full scan to get an update for the [`TxGraph`] and
|
||||
/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
|
||||
#[doc(hidden)]
|
||||
pub async fn full_scan_for_index_and_graph<K: Ord + Clone + Send>(
|
||||
client: &esplora_client::AsyncClient,
|
||||
keychain_spks: BTreeMap<
|
||||
K,
|
||||
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
|
||||
>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
|
||||
let mut last_active_indexes = BTreeMap::<K, u32>::new();
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
let client = self.clone();
|
||||
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
|
||||
.map(|(spk_index, spk)| {
|
||||
let client = client.clone();
|
||||
async move {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen).await?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Result::<_, Error>::Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>();
|
||||
|
||||
@@ -297,38 +289,128 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
|
||||
break;
|
||||
}
|
||||
|
||||
for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
}
|
||||
for tx in txs {
|
||||
let _ = graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
|
||||
let previous_outputs = tx.vin.iter().filter_map(|vin| {
|
||||
let prevout = vin.prevout.as_ref()?;
|
||||
Some((
|
||||
OutPoint {
|
||||
txid: vin.txid,
|
||||
vout: vin.vout,
|
||||
},
|
||||
TxOut {
|
||||
script_pubkey: prevout.scriptpubkey.clone(),
|
||||
value: Amount::from_sat(prevout.value),
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
for (outpoint, txout) in previous_outputs {
|
||||
let _ = graph.insert_txout(outpoint, txout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let last_index = last_index.expect("Must be set since handles wasn't empty.");
|
||||
let gap_limit_reached = if let Some(i) = last_active_index {
|
||||
last_index >= i.saturating_add(stop_gap as u32)
|
||||
} else {
|
||||
last_index + 1 >= stop_gap as u32
|
||||
};
|
||||
if gap_limit_reached {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints.into_iter() {
|
||||
if graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&op.txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&op.txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
last_active_indexes.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
Ok((graph, last_active_indexes))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn sync_for_index_and_graph(
|
||||
client: &esplora_client::AsyncClient,
|
||||
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
|
||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
|
||||
let mut graph = full_scan_for_index_and_graph(
|
||||
client,
|
||||
[(
|
||||
(),
|
||||
misc_spks
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, spk)| (i as u32, spk)),
|
||||
)]
|
||||
.into(),
|
||||
usize::MAX,
|
||||
parallel_requests,
|
||||
)
|
||||
.await
|
||||
.map(|(g, _)| g)?;
|
||||
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
let client = client.clone();
|
||||
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints.into_iter() {
|
||||
if graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = client.get_tx(&op.txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = client.get_tx_status(&op.txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = client.get_tx(&txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = client.get_tx_status(&txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(graph)
|
||||
}
|
||||
|
||||
Ok(graph)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::thread::JoinHandle;
|
||||
use std::usize;
|
||||
|
||||
use bdk_chain::collections::btree_map;
|
||||
use bdk_chain::collections::BTreeMap;
|
||||
use bdk_chain::Anchor;
|
||||
use bdk_chain::{
|
||||
bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
|
||||
local_chain::{self, CheckPoint},
|
||||
@@ -10,9 +13,11 @@ use bdk_chain::{
|
||||
use esplora_client::TxStatus;
|
||||
|
||||
use crate::anchor_from_status;
|
||||
use crate::FullScanUpdate;
|
||||
use crate::SyncUpdate;
|
||||
|
||||
/// [`esplora_client::Error`]
|
||||
type Error = Box<esplora_client::Error>;
|
||||
pub type Error = Box<esplora_client::Error>;
|
||||
|
||||
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
|
||||
///
|
||||
@@ -20,36 +25,15 @@ type Error = Box<esplora_client::Error>;
|
||||
///
|
||||
/// [crate-level documentation]: crate
|
||||
pub trait EsploraExt {
|
||||
/// Prepare a [`LocalChain`] update with blocks fetched from Esplora.
|
||||
///
|
||||
/// * `local_tip` is the previous tip of [`LocalChain::tip`].
|
||||
/// * `request_heights` is the block heights that we are interested in fetching from Esplora.
|
||||
///
|
||||
/// The result of this method can be applied to [`LocalChain::apply_update`].
|
||||
///
|
||||
/// ## Consistency
|
||||
///
|
||||
/// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org
|
||||
/// during the call. The size of re-org we can tollerate is server dependent but will be at
|
||||
/// least 10.
|
||||
///
|
||||
/// [`LocalChain`]: bdk_chain::local_chain::LocalChain
|
||||
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
|
||||
/// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
|
||||
fn update_local_chain(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
request_heights: impl IntoIterator<Item = u32>,
|
||||
) -> Result<local_chain::Update, Error>;
|
||||
|
||||
/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
|
||||
/// returns a [`TxGraph`] and a map of last active indices.
|
||||
/// Scan keychain scripts for transactions against Esplora, returning an update that can be
|
||||
/// applied to the receiving structures.
|
||||
///
|
||||
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
|
||||
/// * `keychain_spks`: keychains that we want to scan transactions for
|
||||
///
|
||||
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||
/// parallel.
|
||||
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no
|
||||
/// associated transactions. `parallel_requests` specifies the max number of HTTP requests to
|
||||
/// make in parallel.
|
||||
///
|
||||
/// ## Note
|
||||
///
|
||||
@@ -63,16 +47,20 @@ pub trait EsploraExt {
|
||||
/// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing).
|
||||
///
|
||||
/// A `stop_gap` of 0 will be treated as a `stop_gap` of 1.
|
||||
///
|
||||
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
|
||||
fn full_scan<K: Ord + Clone>(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error>;
|
||||
) -> Result<FullScanUpdate<K>, Error>;
|
||||
|
||||
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
|
||||
/// specified and return a [`TxGraph`].
|
||||
///
|
||||
/// * `local_tip`: the previously seen tip from [`LocalChain::tip`].
|
||||
/// * `misc_spks`: scripts that we want to sync transactions for
|
||||
/// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s
|
||||
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
@@ -81,251 +69,365 @@ pub trait EsploraExt {
|
||||
/// If the scripts to sync are unknown, such as when restoring or importing a keychain that
|
||||
/// may include scripts that have been used, use [`full_scan`] with the keychain.
|
||||
///
|
||||
/// [`LocalChain::tip`]: local_chain::LocalChain::tip
|
||||
/// [`full_scan`]: EsploraExt::full_scan
|
||||
fn sync(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
misc_spks: impl IntoIterator<Item = ScriptBuf>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error>;
|
||||
) -> Result<SyncUpdate, Error>;
|
||||
}
|
||||
|
||||
impl EsploraExt for esplora_client::BlockingClient {
|
||||
fn update_local_chain(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
request_heights: impl IntoIterator<Item = u32>,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
|
||||
// consistent.
|
||||
let mut fetched_blocks = self
|
||||
.get_blocks(None)?
|
||||
.into_iter()
|
||||
.map(|b| (b.time.height, b.id))
|
||||
.collect::<BTreeMap<u32, BlockHash>>();
|
||||
let new_tip_height = fetched_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must atleast have one block");
|
||||
|
||||
// Fetch blocks of heights that the caller is interested in, skipping blocks that are
|
||||
// already fetched when constructing `fetched_blocks`.
|
||||
for height in request_heights {
|
||||
// do not fetch blocks higher than remote tip
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
// only fetch what is missing
|
||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||
// ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent
|
||||
// with the chain at the time of `get_blocks` above (there could have been a deep
|
||||
// re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's
|
||||
// not possible to have a re-org deeper than that.
|
||||
entry.insert(self.get_block_hash(height)?);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure `fetched_blocks` can create an update that connects with the original chain by
|
||||
// finding a "Point of Agreement".
|
||||
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
let fetched_hash = match fetched_blocks.entry(height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?),
|
||||
};
|
||||
|
||||
// We have found point of agreement so the update will connect!
|
||||
if fetched_hash == local_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
|
||||
.expect("must be in height order"),
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
fn full_scan<K: Ord + Clone>(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
let mut graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
|
||||
let mut last_active_indexes = BTreeMap::<K, u32>::new();
|
||||
let stop_gap = Ord::max(stop_gap, 1);
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
loop {
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.map(|(spk_index, spk)| {
|
||||
std::thread::spawn({
|
||||
let client = self.clone();
|
||||
move || -> Result<TxsOfSpkIndex, Error> {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen)?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (index, txs) = handle.join().expect("thread must not panic")?;
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
}
|
||||
for tx in txs {
|
||||
let _ = graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
|
||||
let previous_outputs = tx.vin.iter().filter_map(|vin| {
|
||||
let prevout = vin.prevout.as_ref()?;
|
||||
Some((
|
||||
OutPoint {
|
||||
txid: vin.txid,
|
||||
vout: vin.vout,
|
||||
},
|
||||
TxOut {
|
||||
script_pubkey: prevout.scriptpubkey.clone(),
|
||||
value: Amount::from_sat(prevout.value),
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
for (outpoint, txout) in previous_outputs {
|
||||
let _ = graph.insert_txout(outpoint, txout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let last_index = last_index.expect("Must be set since handles wasn't empty.");
|
||||
let gap_limit_reached = if let Some(i) = last_active_index {
|
||||
last_index >= i.saturating_add(stop_gap as u32)
|
||||
} else {
|
||||
last_index + 1 >= stop_gap as u32
|
||||
};
|
||||
if gap_limit_reached {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
last_active_indexes.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((graph, last_active_indexes))
|
||||
) -> Result<FullScanUpdate<K>, Error> {
|
||||
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
|
||||
let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking(
|
||||
self,
|
||||
keychain_spks,
|
||||
stop_gap,
|
||||
parallel_requests,
|
||||
)?;
|
||||
let local_chain = finalize_chain_update_blocking(
|
||||
self,
|
||||
&local_tip,
|
||||
tx_graph.all_anchors(),
|
||||
update_blocks,
|
||||
)?;
|
||||
Ok(FullScanUpdate {
|
||||
local_chain,
|
||||
tx_graph,
|
||||
last_active_indices,
|
||||
})
|
||||
}
|
||||
|
||||
fn sync(
|
||||
&self,
|
||||
local_tip: CheckPoint,
|
||||
misc_spks: impl IntoIterator<Item = ScriptBuf>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
|
||||
let mut graph = self
|
||||
.full_scan(
|
||||
[(
|
||||
(),
|
||||
misc_spks
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, spk)| (i as u32, spk)),
|
||||
)]
|
||||
.into(),
|
||||
usize::MAX,
|
||||
parallel_requests,
|
||||
)
|
||||
.map(|(g, _)| g)?;
|
||||
) -> Result<SyncUpdate, Error> {
|
||||
let update_blocks = init_chain_update_blocking(self, &local_tip)?;
|
||||
let tx_graph = sync_for_index_and_graph_blocking(
|
||||
self,
|
||||
misc_spks,
|
||||
txids,
|
||||
outpoints,
|
||||
parallel_requests,
|
||||
)?;
|
||||
let local_chain = finalize_chain_update_blocking(
|
||||
self,
|
||||
&local_tip,
|
||||
tx_graph.all_anchors(),
|
||||
update_blocks,
|
||||
)?;
|
||||
Ok(SyncUpdate {
|
||||
local_chain,
|
||||
tx_graph,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Create the initial chain update.
|
||||
///
|
||||
/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
|
||||
/// update can connect to the `start_tip`.
|
||||
///
|
||||
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
|
||||
/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
|
||||
/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
|
||||
/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
|
||||
/// alternating between chain-sources.
|
||||
#[doc(hidden)]
|
||||
pub fn init_chain_update_blocking(
|
||||
client: &esplora_client::BlockingClient,
|
||||
local_tip: &CheckPoint,
|
||||
) -> Result<BTreeMap<u32, BlockHash>, Error> {
|
||||
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
|
||||
// consistent.
|
||||
let mut fetched_blocks = client
|
||||
.get_blocks(None)?
|
||||
.into_iter()
|
||||
.map(|b| (b.time.height, b.id))
|
||||
.collect::<BTreeMap<u32, BlockHash>>();
|
||||
let new_tip_height = fetched_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must atleast have one block");
|
||||
|
||||
// Ensure `fetched_blocks` can create an update that connects with the original chain by
|
||||
// finding a "Point of Agreement".
|
||||
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
let fetched_hash = match fetched_blocks.entry(height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?),
|
||||
};
|
||||
|
||||
// We have found point of agreement so the update will connect!
|
||||
if fetched_hash == local_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(fetched_blocks)
|
||||
}
|
||||
|
||||
/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
|
||||
///
|
||||
/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
|
||||
/// existing checkpoint/block under `local_tip` or `update_blocks`.
|
||||
#[doc(hidden)]
|
||||
pub fn finalize_chain_update_blocking<A: Anchor>(
|
||||
client: &esplora_client::BlockingClient,
|
||||
local_tip: &CheckPoint,
|
||||
anchors: &BTreeSet<(A, Txid)>,
|
||||
mut update_blocks: BTreeMap<u32, BlockHash>,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
let update_tip_height = update_blocks
|
||||
.keys()
|
||||
.last()
|
||||
.copied()
|
||||
.expect("must atleast have one block");
|
||||
|
||||
// We want to have a corresponding checkpoint per height. We iterate the heights of anchors
|
||||
// backwards, comparing it against our `local_tip`'s chain and our current set of
|
||||
// `update_blocks` to see if a corresponding checkpoint already exists.
|
||||
let anchor_heights = anchors
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|(a, _)| a.anchor_block().height)
|
||||
// filter out heights that surpass the update tip
|
||||
.filter(|h| *h <= update_tip_height)
|
||||
// filter out duplicate heights
|
||||
.filter({
|
||||
let mut prev_height = Option::<u32>::None;
|
||||
move |h| match prev_height.replace(*h) {
|
||||
None => true,
|
||||
Some(prev_h) => prev_h != *h,
|
||||
}
|
||||
});
|
||||
|
||||
// We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
|
||||
// checkpoints more efficient.
|
||||
let mut curr_cp = local_tip.clone();
|
||||
|
||||
for h in anchor_heights {
|
||||
if let Some(cp) = curr_cp.range(h..).last() {
|
||||
curr_cp = cp.clone();
|
||||
if cp.height() == h {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
|
||||
entry.insert(client.get_block_hash(h)?);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip: CheckPoint::from_block_ids(
|
||||
update_blocks
|
||||
.into_iter()
|
||||
.map(|(height, hash)| BlockId { height, hash }),
|
||||
)
|
||||
.expect("must be in order"),
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
/// This performs a full scan to get an update for the [`TxGraph`] and
|
||||
/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex).
|
||||
#[doc(hidden)]
|
||||
pub fn full_scan_for_index_and_graph_blocking<K: Ord + Clone>(
|
||||
client: &esplora_client::BlockingClient,
|
||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<(TxGraph<ConfirmationTimeHeightAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
let mut tx_graph = TxGraph::<ConfirmationTimeHeightAnchor>::default();
|
||||
let mut last_active_indices = BTreeMap::<K, u32>::new();
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
.map(|(spk_index, spk)| {
|
||||
std::thread::spawn({
|
||||
let client = self.clone();
|
||||
move || {
|
||||
client
|
||||
.get_tx_status(&txid)
|
||||
.map_err(Box::new)
|
||||
.map(|s| (txid, s))
|
||||
let client = client.clone();
|
||||
move || -> Result<TxsOfSpkIndex, Error> {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen)?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
|
||||
.collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (txid, status) = handle.join().expect("thread must not panic")?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
let (index, txs) = handle.join().expect("thread must not panic")?;
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
}
|
||||
for tx in txs {
|
||||
let _ = tx_graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = tx_graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
|
||||
let previous_outputs = tx.vin.iter().filter_map(|vin| {
|
||||
let prevout = vin.prevout.as_ref()?;
|
||||
Some((
|
||||
OutPoint {
|
||||
txid: vin.txid,
|
||||
vout: vin.vout,
|
||||
},
|
||||
TxOut {
|
||||
script_pubkey: prevout.scriptpubkey.clone(),
|
||||
value: Amount::from_sat(prevout.value),
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
for (outpoint, txout) in previous_outputs {
|
||||
let _ = tx_graph.insert_txout(outpoint, txout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let last_index = last_index.expect("Must be set since handles wasn't empty.");
|
||||
let gap_limit_reached = if let Some(i) = last_active_index {
|
||||
last_index >= i.saturating_add(stop_gap as u32)
|
||||
} else {
|
||||
last_index + 1 >= stop_gap as u32
|
||||
};
|
||||
if gap_limit_reached {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints {
|
||||
if graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&op.txid)? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&op.txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
last_active_indices.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&txid)? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
Ok((tx_graph, last_active_indices))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn sync_for_index_and_graph_blocking(
|
||||
client: &esplora_client::BlockingClient,
|
||||
misc_spks: impl IntoIterator<Item = ScriptBuf>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
parallel_requests: usize,
|
||||
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
|
||||
let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking(
|
||||
client,
|
||||
{
|
||||
let mut keychains = BTreeMap::new();
|
||||
keychains.insert(
|
||||
(),
|
||||
misc_spks
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, spk)| (i as u32, spk)),
|
||||
);
|
||||
keychains
|
||||
},
|
||||
usize::MAX,
|
||||
parallel_requests,
|
||||
)?;
|
||||
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| tx_graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
std::thread::spawn({
|
||||
let client = client.clone();
|
||||
move || {
|
||||
client
|
||||
.get_tx_status(&txid)
|
||||
.map_err(Box::new)
|
||||
.map(|s| (txid, s))
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (txid, status) = handle.join().expect("thread must not panic")?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = tx_graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints {
|
||||
if tx_graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = client.get_tx(&op.txid)? {
|
||||
let _ = tx_graph.insert_tx(tx);
|
||||
}
|
||||
let status = client.get_tx_status(&op.txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = tx_graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if tx_graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = client.get_tx(&txid)? {
|
||||
let _ = tx_graph.insert_tx(tx);
|
||||
}
|
||||
let status = client.get_tx_status(&txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = tx_graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(graph)
|
||||
}
|
||||
|
||||
Ok(tx_graph)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,9 @@
|
||||
//! [`TxGraph`]: bdk_chain::tx_graph::TxGraph
|
||||
//! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora
|
||||
|
||||
use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use bdk_chain::{local_chain, BlockId, ConfirmationTimeHeightAnchor, TxGraph};
|
||||
use esplora_client::TxStatus;
|
||||
|
||||
pub use esplora_client;
|
||||
@@ -48,3 +50,21 @@ fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor>
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Update returns from a full scan.
|
||||
pub struct FullScanUpdate<K> {
|
||||
/// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain).
|
||||
pub local_chain: local_chain::Update,
|
||||
/// The update to apply to the receiving [`TxGraph`].
|
||||
pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
|
||||
/// Last active indices for the corresponding keychains (`K`).
|
||||
pub last_active_indices: BTreeMap<K, u32>,
|
||||
}
|
||||
|
||||
/// Update returned from a sync.
|
||||
pub struct SyncUpdate {
|
||||
/// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain).
|
||||
pub local_chain: local_chain::Update,
|
||||
/// The update to apply to the receiving [`TxGraph`].
|
||||
pub tx_graph: TxGraph<ConfirmationTimeHeightAnchor>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user