From 72fe65b65f297ebb7160eee6859c46e29c2d9528 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 16 Apr 2024 17:30:58 +0800 Subject: [PATCH] feat(esplora)!: simplify chain update logic Co-authored-by: LLFourn --- crates/chain/src/local_chain.rs | 3 +- crates/esplora/src/async_ext.rs | 193 +++++++++++++------------ crates/esplora/src/blocking_ext.rs | 219 ++++++++++++++--------------- 3 files changed, 200 insertions(+), 215 deletions(-) diff --git a/crates/chain/src/local_chain.rs b/crates/chain/src/local_chain.rs index a86f1a77..6fa85d42 100644 --- a/crates/chain/src/local_chain.rs +++ b/crates/chain/src/local_chain.rs @@ -220,8 +220,7 @@ impl CheckPoint { cp = cp.prev().expect("will break before genesis block"); }; - base - .extend(core::iter::once(block_id).chain(tail.into_iter().rev())) + base.extend(core::iter::once(block_id).chain(tail.into_iter().rev())) .expect("tail is in order") } } diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index b387cb77..1abc28c8 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,7 +1,6 @@ use std::collections::BTreeSet; use async_trait::async_trait; -use bdk_chain::collections::btree_map; use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, @@ -97,11 +96,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let update_blocks = init_chain_update(self, &local_tip).await?; + let latest_blocks = fetch_latest_blocks(self).await?; let (tx_graph, last_active_indices) = full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?; let local_chain = - finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; Ok(FullScanUpdate { local_chain, tx_graph, @@ -117,11 +116,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { outpoints: impl IntoIterator + Send> + Send, parallel_requests: usize, ) -> Result { - let update_blocks = init_chain_update(self, &local_tip).await?; + let latest_blocks = fetch_latest_blocks(self).await?; let tx_graph = sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?; let local_chain = - finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?; Ok(SyncUpdate { tx_graph, local_chain, @@ -129,112 +128,105 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } } -/// Create the initial chain update. +/// Fetch latest blocks from Esplora in an atomic call. /// -/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the -/// update can connect to the `start_tip`. -/// -/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks AND /// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for /// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use /// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when /// alternating between chain-sources. -async fn init_chain_update( +async fn fetch_latest_blocks( client: &esplora_client::AsyncClient, - local_tip: &CheckPoint, ) -> Result, Error> { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = client + Ok(client .get_blocks(None) .await? .into_iter() .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks + .collect()) +} + +/// Used instead of [`esplora_client::BlockingClient::get_block_hash`]. +/// +/// This first checks the previously fetched `latest_blocks` before fetching from Esplora again. +async fn fetch_block( + client: &esplora_client::AsyncClient, + latest_blocks: &BTreeMap, + height: u32, +) -> Result, Error> { + if let Some(&hash) = latest_blocks.get(&height) { + return Ok(Some(hash)); + } + + // We avoid fetching blocks higher than previously fetched `latest_blocks` as the local chain + // tip is used to signal for the last-synced-up-to-height. + let &tip_height = latest_blocks .keys() .last() - .copied() - .expect("must atleast have one block"); + .expect("must have atleast one entry"); + if height > tip_height { + return Ok(None); + } - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } + Ok(Some(client.get_block_hash(height).await?)) +} - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?), +/// Create the [`local_chain::Update`]. +/// +/// We want to have a corresponding checkpoint per anchor height. However, checkpoints fetched +/// should not surpass `latest_blocks`. +async fn chain_update( + client: &esplora_client::AsyncClient, + latest_blocks: &BTreeMap, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, +) -> Result { + let mut point_of_agreement = None; + let mut conflicts = vec![]; + for local_cp in local_tip.iter() { + let remote_hash = match fetch_block(client, latest_blocks, local_cp.height()).await? { + Some(hash) => hash, + None => continue, }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { + if remote_hash == local_cp.hash() { + point_of_agreement = Some(local_cp.clone()); break; + } else { + // it is not strictly necessary to include all the conflicted heights (we do need the + // first one) but it seems prudent to make sure the updated chain's heights are a + // superset of the existing chain after update. + conflicts.push(BlockId { + height: local_cp.height(), + hash: remote_hash, + }); } } - Ok(fetched_blocks) -} + let mut tip = point_of_agreement.expect("remote esplora should have same genesis block"); -/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. -/// -/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an -/// existing checkpoint/block under `local_tip` or `update_blocks`. -async fn finalize_chain_update( - client: &esplora_client::AsyncClient, - local_tip: &CheckPoint, - anchors: &BTreeSet<(A, Txid)>, - mut update_blocks: BTreeMap, -) -> Result { - let update_tip_height = update_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); + tip = tip + .extend(conflicts.into_iter().rev()) + .expect("evicted are in order"); - // We want to have a corresponding checkpoint per height. We iterate the heights of anchors - // backwards, comparing it against our `local_tip`'s chain and our current set of - // `update_blocks` to see if a corresponding checkpoint already exists. - let anchor_heights = anchors - .iter() - .rev() - .map(|(a, _)| a.anchor_block().height) - // filter out heights that surpass the update tip - .filter(|h| *h <= update_tip_height) - // filter out duplicate heights - .filter({ - let mut prev_height = Option::::None; - move |h| match prev_height.replace(*h) { - None => true, - Some(prev_h) => prev_h != *h, - } - }); - - // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of - // checkpoints more efficient. - let mut curr_cp = local_tip.clone(); - - for h in anchor_heights { - if let Some(cp) = curr_cp.range(h..).last() { - curr_cp = cp.clone(); - if cp.height() == h { - continue; - } - } - if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { - entry.insert(client.get_block_hash(h).await?); + for anchor in anchors { + let height = anchor.0.anchor_block().height; + if tip.get(height).is_none() { + let hash = match fetch_block(client, latest_blocks, height).await? { + Some(hash) => hash, + None => continue, + }; + tip = tip.insert(BlockId { height, hash }); } } + // insert the most recent blocks at the tip to make sure we update the tip and make the update + // robust. + for (&height, &hash) in latest_blocks.iter() { + tip = tip.insert(BlockId { height, hash }); + } + Ok(local_chain::Update { - tip: CheckPoint::from_block_ids( - update_blocks - .into_iter() - .map(|(height, hash)| BlockId { height, hash }), - ) - .expect("must be in order"), + tip, introduce_older_blocks: true, }) } @@ -424,7 +416,7 @@ mod test { use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use esplora_client::Builder; - use crate::async_ext::{finalize_chain_update, init_chain_update}; + use crate::async_ext::{chain_update, fetch_latest_blocks}; macro_rules! h { ($index:literal) => {{ @@ -493,9 +485,8 @@ mod test { // craft initial `local_chain` let local_chain = { let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?); - let chain_tip = chain.tip(); - let update_blocks = init_chain_update(&client, &chain_tip).await?; - let update_anchors = t + // force `chain_update_blocking` to add all checkpoints in `t.initial_cps` + let anchors = t .initial_cps .iter() .map(|&height| -> anyhow::Result<_> { @@ -508,10 +499,14 @@ mod test { )) }) .collect::>>()?; - let chain_update = - finalize_chain_update(&client, &chain_tip, &update_anchors, update_blocks) - .await?; - chain.apply_update(chain_update)?; + let update = chain_update( + &client, + &fetch_latest_blocks(&client).await?, + &chain.tip(), + &anchors, + ) + .await?; + chain.apply_update(update)?; chain }; println!("local chain height: {}", local_chain.tip().height()); @@ -529,9 +524,7 @@ mod test { // craft update let update = { - let local_tip = local_chain.tip(); - let update_blocks = init_chain_update(&client, &local_tip).await?; - let update_anchors = t + let anchors = t .anchors .iter() .map(|&(height, txid)| -> anyhow::Result<_> { @@ -544,7 +537,13 @@ mod test { )) }) .collect::>()?; - finalize_chain_update(&client, &local_tip, &update_anchors, update_blocks).await? + chain_update( + &client, + &fetch_latest_blocks(&client).await?, + &local_chain.tip(), + &anchors, + ) + .await? }; // apply update diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 419a2ae6..5b7cd628 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -2,7 +2,6 @@ use std::collections::BTreeSet; use std::thread::JoinHandle; use std::usize; -use bdk_chain::collections::btree_map; use bdk_chain::collections::BTreeMap; use bdk_chain::Anchor; use bdk_chain::{ @@ -89,19 +88,14 @@ impl EsploraExt for esplora_client::BlockingClient { stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let latest_blocks = fetch_latest_blocks(self)?; let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( self, keychain_spks, stop_gap, parallel_requests, )?; - let local_chain = finalize_chain_update_blocking( - self, - &local_tip, - tx_graph.all_anchors(), - update_blocks, - )?; + let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; Ok(FullScanUpdate { local_chain, tx_graph, @@ -117,7 +111,7 @@ impl EsploraExt for esplora_client::BlockingClient { outpoints: impl IntoIterator, parallel_requests: usize, ) -> Result { - let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let latest_blocks = fetch_latest_blocks(self)?; let tx_graph = sync_for_index_and_graph_blocking( self, misc_spks, @@ -125,12 +119,7 @@ impl EsploraExt for esplora_client::BlockingClient { outpoints, parallel_requests, )?; - let local_chain = finalize_chain_update_blocking( - self, - &local_tip, - tx_graph.all_anchors(), - update_blocks, - )?; + let local_chain = chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors())?; Ok(SyncUpdate { local_chain, tx_graph, @@ -138,111 +127,104 @@ impl EsploraExt for esplora_client::BlockingClient { } } -/// Create the initial chain update. +/// Fetch latest blocks from Esplora in an atomic call. /// -/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the -/// update can connect to the `start_tip`. -/// -/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks AND /// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for /// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use /// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when /// alternating between chain-sources. -fn init_chain_update_blocking( +fn fetch_latest_blocks( client: &esplora_client::BlockingClient, - local_tip: &CheckPoint, ) -> Result, Error> { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = client + Ok(client .get_blocks(None)? .into_iter() .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks + .collect()) +} + +/// Used instead of [`esplora_client::BlockingClient::get_block_hash`]. +/// +/// This first checks the previously fetched `latest_blocks` before fetching from Esplora again. +fn fetch_block( + client: &esplora_client::BlockingClient, + latest_blocks: &BTreeMap, + height: u32, +) -> Result, Error> { + if let Some(&hash) = latest_blocks.get(&height) { + return Ok(Some(hash)); + } + + // We avoid fetching blocks higher than previously fetched `latest_blocks` as the local chain + // tip is used to signal for the last-synced-up-to-height. + let &tip_height = latest_blocks .keys() .last() - .copied() - .expect("must atleast have one block"); + .expect("must have atleast one entry"); + if height > tip_height { + return Ok(None); + } - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } + Ok(Some(client.get_block_hash(height)?)) +} - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?), +/// Create the [`local_chain::Update`]. +/// +/// We want to have a corresponding checkpoint per anchor height. However, checkpoints fetched +/// should not surpass `latest_blocks`. +fn chain_update( + client: &esplora_client::BlockingClient, + latest_blocks: &BTreeMap, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, +) -> Result { + let mut point_of_agreement = None; + let mut conflicts = vec![]; + for local_cp in local_tip.iter() { + let remote_hash = match fetch_block(client, latest_blocks, local_cp.height())? { + Some(hash) => hash, + None => continue, }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { + if remote_hash == local_cp.hash() { + point_of_agreement = Some(local_cp.clone()); break; + } else { + // it is not strictly necessary to include all the conflicted heights (we do need the + // first one) but it seems prudent to make sure the updated chain's heights are a + // superset of the existing chain after update. + conflicts.push(BlockId { + height: local_cp.height(), + hash: remote_hash, + }); } } - Ok(fetched_blocks) -} + let mut tip = point_of_agreement.expect("remote esplora should have same genesis block"); -/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. -/// -/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an -/// existing checkpoint/block under `local_tip` or `update_blocks`. -fn finalize_chain_update_blocking( - client: &esplora_client::BlockingClient, - local_tip: &CheckPoint, - anchors: &BTreeSet<(A, Txid)>, - mut update_blocks: BTreeMap, -) -> Result { - let update_tip_height = update_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); + tip = tip + .extend(conflicts.into_iter().rev()) + .expect("evicted are in order"); - // We want to have a corresponding checkpoint per height. We iterate the heights of anchors - // backwards, comparing it against our `local_tip`'s chain and our current set of - // `update_blocks` to see if a corresponding checkpoint already exists. - let anchor_heights = anchors - .iter() - .rev() - .map(|(a, _)| a.anchor_block().height) - // filter out heights that surpass the update tip - .filter(|h| *h <= update_tip_height) - // filter out duplicate heights - .filter({ - let mut prev_height = Option::::None; - move |h| match prev_height.replace(*h) { - None => true, - Some(prev_h) => prev_h != *h, - } - }); - - // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of - // checkpoints more efficient. - let mut curr_cp = local_tip.clone(); - - for h in anchor_heights { - if let Some(cp) = curr_cp.range(h..).last() { - curr_cp = cp.clone(); - if cp.height() == h { - continue; - } - } - if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { - entry.insert(client.get_block_hash(h)?); + for anchor in anchors { + let height = anchor.0.anchor_block().height; + if tip.get(height).is_none() { + let hash = match fetch_block(client, latest_blocks, height)? { + Some(hash) => hash, + None => continue, + }; + tip = tip.insert(BlockId { height, hash }); } } + // insert the most recent blocks at the tip to make sure we update the tip and make the update + // robust. + for (&height, &hash) in latest_blocks.iter() { + tip = tip.insert(BlockId { height, hash }); + } + Ok(local_chain::Update { - tip: CheckPoint::from_block_ids( - update_blocks - .into_iter() - .map(|(height, hash)| BlockId { height, hash }), - ) - .expect("must be in order"), + tip, introduce_older_blocks: true, }) } @@ -430,7 +412,7 @@ fn sync_for_index_and_graph_blocking( #[cfg(test)] mod test { - use crate::blocking_ext::{finalize_chain_update_blocking, init_chain_update_blocking}; + use crate::blocking_ext::{chain_update, fetch_latest_blocks}; use bdk_chain::bitcoin::hashes::Hash; use bdk_chain::bitcoin::Txid; use bdk_chain::local_chain::LocalChain; @@ -462,7 +444,7 @@ mod test { name: &'a str, /// Initial blockchain height to start the env with. initial_env_height: u32, - /// Initial checkpoint heights to start with. + /// Initial checkpoint heights to start with in the local chain. initial_cps: &'a [u32], /// The final blockchain height of the env. final_env_height: u32, @@ -516,9 +498,8 @@ mod test { // craft initial `local_chain` let local_chain = { let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?); - let chain_tip = chain.tip(); - let update_blocks = init_chain_update_blocking(&client, &chain_tip)?; - let update_anchors = t + // force `chain_update_blocking` to add all checkpoints in `t.initial_cps` + let anchors = t .initial_cps .iter() .map(|&height| -> anyhow::Result<_> { @@ -531,13 +512,13 @@ mod test { )) }) .collect::>>()?; - let chain_update = finalize_chain_update_blocking( + let update = chain_update( &client, - &chain_tip, - &update_anchors, - update_blocks, + &fetch_latest_blocks(&client)?, + &chain.tip(), + &anchors, )?; - chain.apply_update(chain_update)?; + chain.apply_update(update)?; chain }; println!("local chain height: {}", local_chain.tip().height()); @@ -555,9 +536,7 @@ mod test { // craft update let update = { - let local_tip = local_chain.tip(); - let update_blocks = init_chain_update_blocking(&client, &local_tip)?; - let update_anchors = t + let anchors = t .anchors .iter() .map(|&(height, txid)| -> anyhow::Result<_> { @@ -570,7 +549,12 @@ mod test { )) }) .collect::>()?; - finalize_chain_update_blocking(&client, &local_tip, &update_anchors, update_blocks)? + chain_update( + &client, + &fetch_latest_blocks(&client)?, + &local_chain.tip(), + &anchors, + )? }; // apply update @@ -640,8 +624,12 @@ mod test { struct TestCase { name: &'static str, + /// Original local chain to start off with. chain: LocalChain, + /// Heights of floating anchors. [`chain_update_blocking`] will request for checkpoints + /// of these heights. request_heights: &'static [u32], + /// The expected local chain result (heights only). exp_update_heights: &'static [u32], } @@ -738,11 +726,6 @@ mod test { for (i, t) in test_cases.into_iter().enumerate() { println!("Case {}: {}", i, t.name); let mut chain = t.chain; - let cp_tip = chain.tip(); - - let new_blocks = init_chain_update_blocking(&client, &cp_tip).map_err(|err| { - anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err) - })?; let mock_anchors = t .request_heights @@ -761,9 +744,13 @@ mod test { (anchor, txid) }) .collect::>(); + let chain_update = chain_update( + &client, + &fetch_latest_blocks(&client)?, + &chain.tip(), + &mock_anchors, + )?; - let chain_update = - finalize_chain_update_blocking(&client, &cp_tip, &mock_anchors, new_blocks)?; let update_blocks = chain_update .tip .iter()