feat(esplora): greatly simplify update_local_chain
This commit is contained in:
parent
25653d71b8
commit
f05e8502e6
@ -2,14 +2,14 @@ use async_trait::async_trait;
|
|||||||
use bdk_chain::collections::btree_map;
|
use bdk_chain::collections::btree_map;
|
||||||
use bdk_chain::{
|
use bdk_chain::{
|
||||||
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
|
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::BTreeMap,
|
||||||
local_chain::{self, CheckPoint},
|
local_chain::{self, CheckPoint},
|
||||||
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
|
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
|
||||||
};
|
};
|
||||||
use esplora_client::{Error, TxStatus};
|
use esplora_client::{Error, TxStatus};
|
||||||
use futures::{stream::FuturesOrdered, TryStreamExt};
|
use futures::{stream::FuturesOrdered, TryStreamExt};
|
||||||
|
|
||||||
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
|
use crate::anchor_from_status;
|
||||||
|
|
||||||
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
|
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
|
||||||
///
|
///
|
||||||
@ -85,10 +85,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
|
|||||||
local_tip: CheckPoint,
|
local_tip: CheckPoint,
|
||||||
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
||||||
) -> Result<local_chain::Update, Error> {
|
) -> Result<local_chain::Update, Error> {
|
||||||
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
|
|
||||||
let new_tip_height = self.get_height().await?;
|
let new_tip_height = self.get_height().await?;
|
||||||
|
|
||||||
// atomically fetch blocks from esplora
|
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
|
||||||
|
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
|
||||||
|
// latest blocks fetched).
|
||||||
let mut fetched_blocks = {
|
let mut fetched_blocks = {
|
||||||
let heights = (0..=new_tip_height).rev();
|
let heights = (0..=new_tip_height).rev();
|
||||||
let hashes = self
|
let hashes = self
|
||||||
@ -99,7 +100,8 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
|
|||||||
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
||||||
};
|
};
|
||||||
|
|
||||||
// fetch heights that the caller is interested in
|
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
|
||||||
|
// already fetched.
|
||||||
for height in request_heights {
|
for height in request_heights {
|
||||||
// do not fetch blocks higher than remote tip
|
// do not fetch blocks higher than remote tip
|
||||||
if height > new_tip_height {
|
if height > new_tip_height {
|
||||||
@ -107,81 +109,32 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
|
|||||||
}
|
}
|
||||||
// only fetch what is missing
|
// only fetch what is missing
|
||||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||||
let hash = self.get_block_hash(height).await?;
|
entry.insert(self.get_block_hash(height).await?);
|
||||||
entry.insert(hash);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// find the earliest point of agreement between local chain and fetched chain
|
// Ensure `fetched_blocks` can create an update that connects with the original chain.
|
||||||
let earliest_agreement_cp = {
|
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||||
let mut earliest_agreement_cp = Option::<CheckPoint>::None;
|
if height > new_tip_height {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let local_tip_height = local_tip.height();
|
let fetched_hash = match fetched_blocks.entry(height) {
|
||||||
for local_cp in local_tip.iter() {
|
|
||||||
let local_block = local_cp.block_id();
|
|
||||||
|
|
||||||
// the updated hash (block hash at this height after the update), can either be:
|
|
||||||
// 1. a block that already existed in `fetched_blocks`
|
|
||||||
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
|
|
||||||
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
|
|
||||||
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
|
|
||||||
// remote tip
|
|
||||||
let updated_hash = match fetched_blocks.entry(local_block.height) {
|
|
||||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||||
btree_map::Entry::Vacant(entry) => *entry.insert(
|
btree_map::Entry::Vacant(entry) => {
|
||||||
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
|
*entry.insert(self.get_block_hash(height).await?)
|
||||||
local_block.hash
|
}
|
||||||
} else {
|
|
||||||
self.get_block_hash(local_block.height).await?
|
|
||||||
},
|
|
||||||
),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// since we may introduce blocks below the point of agreement, we cannot break
|
// We have found point of agreement so the update will connect!
|
||||||
// here unconditionally - we only break if we guarantee there are no new heights
|
if fetched_hash == local_hash {
|
||||||
// below our current local checkpoint
|
|
||||||
if local_block.hash == updated_hash {
|
|
||||||
earliest_agreement_cp = Some(local_cp);
|
|
||||||
|
|
||||||
let first_new_height = *fetched_blocks
|
|
||||||
.keys()
|
|
||||||
.next()
|
|
||||||
.expect("must have at least one new block");
|
|
||||||
if first_new_height >= local_block.height {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
earliest_agreement_cp
|
|
||||||
};
|
|
||||||
|
|
||||||
let tip = {
|
|
||||||
// first checkpoint to use for the update chain
|
|
||||||
let first_cp = match earliest_agreement_cp {
|
|
||||||
Some(cp) => cp,
|
|
||||||
None => {
|
|
||||||
let (&height, &hash) = fetched_blocks
|
|
||||||
.iter()
|
|
||||||
.next()
|
|
||||||
.expect("must have at least one new block");
|
|
||||||
CheckPoint::new(BlockId { height, hash })
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// transform fetched chain into the update chain
|
|
||||||
fetched_blocks
|
|
||||||
// we exclude anything at or below the first cp of the update chain otherwise
|
|
||||||
// building the chain will fail
|
|
||||||
.split_off(&(first_cp.height() + 1))
|
|
||||||
.into_iter()
|
|
||||||
.map(|(height, hash)| BlockId { height, hash })
|
|
||||||
.fold(first_cp, |prev_cp, block| {
|
|
||||||
prev_cp.push(block).expect("must extend checkpoint")
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(local_chain::Update {
|
Ok(local_chain::Update {
|
||||||
tip,
|
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
|
||||||
|
.expect("must be in height order"),
|
||||||
introduce_older_blocks: true,
|
introduce_older_blocks: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
|
|
||||||
use bdk_chain::collections::btree_map;
|
use bdk_chain::collections::btree_map;
|
||||||
use bdk_chain::collections::{BTreeMap, BTreeSet};
|
use bdk_chain::collections::BTreeMap;
|
||||||
use bdk_chain::{
|
use bdk_chain::{
|
||||||
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
|
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
|
||||||
local_chain::{self, CheckPoint},
|
local_chain::{self, CheckPoint},
|
||||||
@ -9,7 +9,7 @@ use bdk_chain::{
|
|||||||
};
|
};
|
||||||
use esplora_client::{Error, TxStatus};
|
use esplora_client::{Error, TxStatus};
|
||||||
|
|
||||||
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
|
use crate::anchor_from_status;
|
||||||
|
|
||||||
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
|
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
|
||||||
///
|
///
|
||||||
@ -78,10 +78,11 @@ impl EsploraExt for esplora_client::BlockingClient {
|
|||||||
local_tip: CheckPoint,
|
local_tip: CheckPoint,
|
||||||
request_heights: impl IntoIterator<Item = u32>,
|
request_heights: impl IntoIterator<Item = u32>,
|
||||||
) -> Result<local_chain::Update, Error> {
|
) -> Result<local_chain::Update, Error> {
|
||||||
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
|
|
||||||
let new_tip_height = self.get_height()?;
|
let new_tip_height = self.get_height()?;
|
||||||
|
|
||||||
// atomically fetch blocks from esplora
|
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
|
||||||
|
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
|
||||||
|
// latest blocks fetched).
|
||||||
let mut fetched_blocks = {
|
let mut fetched_blocks = {
|
||||||
let heights = (0..=new_tip_height).rev();
|
let heights = (0..=new_tip_height).rev();
|
||||||
let hashes = self
|
let hashes = self
|
||||||
@ -91,7 +92,8 @@ impl EsploraExt for esplora_client::BlockingClient {
|
|||||||
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
||||||
};
|
};
|
||||||
|
|
||||||
// fetch heights that the caller is interested in
|
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
|
||||||
|
// already fetched.
|
||||||
for height in request_heights {
|
for height in request_heights {
|
||||||
// do not fetch blocks higher than remote tip
|
// do not fetch blocks higher than remote tip
|
||||||
if height > new_tip_height {
|
if height > new_tip_height {
|
||||||
@ -99,81 +101,30 @@ impl EsploraExt for esplora_client::BlockingClient {
|
|||||||
}
|
}
|
||||||
// only fetch what is missing
|
// only fetch what is missing
|
||||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||||
let hash = self.get_block_hash(height)?;
|
entry.insert(self.get_block_hash(height)?);
|
||||||
entry.insert(hash);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// find the earliest point of agreement between local chain and fetched chain
|
// Ensure `fetched_blocks` can create an update that connects with the original chain.
|
||||||
let earliest_agreement_cp = {
|
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
|
||||||
let mut earliest_agreement_cp = Option::<CheckPoint>::None;
|
if height > new_tip_height {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let local_tip_height = local_tip.height();
|
let fetched_hash = match fetched_blocks.entry(height) {
|
||||||
for local_cp in local_tip.iter() {
|
|
||||||
let local_block = local_cp.block_id();
|
|
||||||
|
|
||||||
// the updated hash (block hash at this height after the update), can either be:
|
|
||||||
// 1. a block that already existed in `fetched_blocks`
|
|
||||||
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
|
|
||||||
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
|
|
||||||
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
|
|
||||||
// remote tip
|
|
||||||
let updated_hash = match fetched_blocks.entry(local_block.height) {
|
|
||||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||||
btree_map::Entry::Vacant(entry) => *entry.insert(
|
btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?),
|
||||||
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
|
|
||||||
local_block.hash
|
|
||||||
} else {
|
|
||||||
self.get_block_hash(local_block.height)?
|
|
||||||
},
|
|
||||||
),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// since we may introduce blocks below the point of agreement, we cannot break
|
// We have found point of agreement so the update will connect!
|
||||||
// here unconditionally - we only break if we guarantee there are no new heights
|
if fetched_hash == local_hash {
|
||||||
// below our current local checkpoint
|
|
||||||
if local_block.hash == updated_hash {
|
|
||||||
earliest_agreement_cp = Some(local_cp);
|
|
||||||
|
|
||||||
let first_new_height = *fetched_blocks
|
|
||||||
.keys()
|
|
||||||
.next()
|
|
||||||
.expect("must have at least one new block");
|
|
||||||
if first_new_height >= local_block.height {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
earliest_agreement_cp
|
|
||||||
};
|
|
||||||
|
|
||||||
let tip = {
|
|
||||||
// first checkpoint to use for the update chain
|
|
||||||
let first_cp = match earliest_agreement_cp {
|
|
||||||
Some(cp) => cp,
|
|
||||||
None => {
|
|
||||||
let (&height, &hash) = fetched_blocks
|
|
||||||
.iter()
|
|
||||||
.next()
|
|
||||||
.expect("must have at least one new block");
|
|
||||||
CheckPoint::new(BlockId { height, hash })
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// transform fetched chain into the update chain
|
|
||||||
fetched_blocks
|
|
||||||
// we exclude anything at or below the first cp of the update chain otherwise
|
|
||||||
// building the chain will fail
|
|
||||||
.split_off(&(first_cp.height() + 1))
|
|
||||||
.into_iter()
|
|
||||||
.map(|(height, hash)| BlockId { height, hash })
|
|
||||||
.fold(first_cp, |prev_cp, block| {
|
|
||||||
prev_cp.push(block).expect("must extend checkpoint")
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(local_chain::Update {
|
Ok(local_chain::Update {
|
||||||
tip,
|
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
|
||||||
|
.expect("must be in height order"),
|
||||||
introduce_older_blocks: true,
|
introduce_older_blocks: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -31,8 +31,6 @@ mod async_ext;
|
|||||||
#[cfg(feature = "async")]
|
#[cfg(feature = "async")]
|
||||||
pub use async_ext::*;
|
pub use async_ext::*;
|
||||||
|
|
||||||
const ASSUME_FINAL_DEPTH: u32 = 15;
|
|
||||||
|
|
||||||
fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor> {
|
fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor> {
|
||||||
if let TxStatus {
|
if let TxStatus {
|
||||||
block_height: Some(height),
|
block_height: Some(height),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user