diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs
index e790b8a8..ce5e863b 100644
--- a/crates/bitcoind_rpc/src/lib.rs
+++ b/crates/bitcoind_rpc/src/lib.rs
@@ -43,11 +43,13 @@ pub struct Emitter<'c, C> {
}
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
- /// Construct a new [`Emitter`] with the given RPC `client`, `last_cp` and `start_height`.
+ /// Construct a new [`Emitter`].
///
- /// * `last_cp` is the check point used to find the latest block which is still part of the best
- /// chain.
- /// * `start_height` is the block height to start emitting blocks from.
+ /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
+ /// can start emission from a block that connects to the original chain.
+ ///
+ /// `start_height` starts emission from a given height (if there are no conflicts with the
+ /// original chain).
pub fn new(client: &'c C, last_cp: CheckPoint, start_height: u32) -> Self {
Self {
client,
@@ -127,13 +129,58 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
}
/// Emit the next block height and header (if any).
- pub fn next_header(&mut self) -> Result, bitcoincore_rpc::Error> {
- poll(self, |hash| self.client.get_block_header(hash))
+ pub fn next_header(&mut self) -> Result >, bitcoincore_rpc::Error> {
+ Ok(poll(self, |hash| self.client.get_block_header(hash))?
+ .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
}
/// Emit the next block height and block (if any).
- pub fn next_block(&mut self) -> Result , bitcoincore_rpc::Error> {
- poll(self, |hash| self.client.get_block(hash))
+ pub fn next_block(&mut self) -> Result >, bitcoincore_rpc::Error> {
+ Ok(poll(self, |hash| self.client.get_block(hash))?
+ .map(|(checkpoint, block)| BlockEvent { block, checkpoint }))
+ }
+}
+
+/// A newly emitted block from [`Emitter`].
+#[derive(Debug)]
+pub struct BlockEvent {
+ /// Either a full [`Block`] or [`Header`] of the new block.
+ pub block: B,
+
+ /// The checkpoint of the new block.
+ ///
+ /// A [`CheckPoint`] is a node of a linked list of [`BlockId`]s. This checkpoint is linked to
+ /// all [`BlockId`]s originally passed in [`Emitter::new`] as well as emitted blocks since then.
+ /// These blocks are guaranteed to be of the same chain.
+ ///
+ /// This is important as BDK structures require block-to-apply to be connected with another
+ /// block in the original chain.
+ pub checkpoint: CheckPoint,
+}
+
+impl BlockEvent {
+ /// The block height of this new block.
+ pub fn block_height(&self) -> u32 {
+ self.checkpoint.height()
+ }
+
+ /// The block hash of this new block.
+ pub fn block_hash(&self) -> BlockHash {
+ self.checkpoint.hash()
+ }
+
+ /// The [`BlockId`] of a previous block that this block connects to.
+ ///
+ /// This either returns a [`BlockId`] of a previously emitted block or from the chain we started
+ /// with (passed in as `last_cp` in [`Emitter::new`]).
+ ///
+ /// This value is derived from [`BlockEvent::checkpoint`].
+ pub fn connected_to(&self) -> BlockId {
+ match self.checkpoint.prev() {
+ Some(prev_cp) => prev_cp.block_id(),
+ // there is no previous checkpoint, so just connect with itself
+ None => self.checkpoint.block_id(),
+ }
}
}
@@ -203,7 +250,7 @@ where
fn poll(
emitter: &mut Emitter,
get_item: F,
-) -> Result, bitcoincore_rpc::Error>
+) -> Result , bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
F: Fn(&BlockHash) -> Result,
@@ -215,13 +262,14 @@ where
let hash = res.hash;
let item = get_item(&hash)?;
- emitter.last_cp = emitter
+ let new_cp = emitter
.last_cp
.clone()
.push(BlockId { height, hash })
.expect("must push");
+ emitter.last_cp = new_cp.clone();
emitter.last_block = Some(res);
- return Ok(Some((height, item)));
+ return Ok(Some((new_cp, item)));
}
PollResponse::NoMoreBlocks => {
emitter.last_block = None;
diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs
index 521124e5..384df92d 100644
--- a/crates/bitcoind_rpc/tests/test_emitter.rs
+++ b/crates/bitcoind_rpc/tests/test_emitter.rs
@@ -157,28 +157,6 @@ impl TestEnv {
}
}
-fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update {
- let this_id = BlockId {
- height,
- hash: block.block_hash(),
- };
- let tip = if block.header.prev_blockhash == BlockHash::all_zeros() {
- CheckPoint::new(this_id)
- } else {
- CheckPoint::new(BlockId {
- height: height - 1,
- hash: block.header.prev_blockhash,
- })
- .extend(core::iter::once(this_id))
- .expect("must construct checkpoint")
- };
-
- local_chain::Update {
- tip,
- introduce_older_blocks: false,
- }
-}
-
/// Ensure that blocks are emitted in order even after reorg.
///
/// 1. Mine 101 blocks.
@@ -200,17 +178,21 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
// see if the emitter outputs the right blocks
println!("first sync:");
- while let Some((height, block)) = emitter.next_block()? {
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+ let hash = emission.block_hash();
assert_eq!(
- block.block_hash(),
+ emission.block_hash(),
exp_hashes[height as usize],
"emitted block hash is unexpected"
);
- let chain_update = block_to_chain_update(&block, height);
assert_eq!(
- local_chain.apply_update(chain_update)?,
- BTreeMap::from([(height, Some(block.block_hash()))]),
+ local_chain.apply_update(local_chain::Update {
+ tip: emission.checkpoint,
+ introduce_older_blocks: false,
+ })?,
+ BTreeMap::from([(height, Some(hash))]),
"chain update changeset is unexpected",
);
}
@@ -237,27 +219,30 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
// see if the emitter outputs the right blocks
println!("after reorg:");
let mut exp_height = exp_hashes.len() - reorged_blocks.len();
- while let Some((height, block)) = emitter.next_block()? {
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+ let hash = emission.block_hash();
assert_eq!(
height, exp_height as u32,
"emitted block has unexpected height"
);
assert_eq!(
- block.block_hash(),
- exp_hashes[height as usize],
+ hash, exp_hashes[height as usize],
"emitted block is unexpected"
);
- let chain_update = block_to_chain_update(&block, height);
assert_eq!(
- local_chain.apply_update(chain_update)?,
+ local_chain.apply_update(local_chain::Update {
+ tip: emission.checkpoint,
+ introduce_older_blocks: false,
+ })?,
if exp_height == exp_hashes.len() - reorged_blocks.len() {
- core::iter::once((height, Some(block.block_hash())))
+ core::iter::once((height, Some(hash)))
.chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None)))
.collect::()
} else {
- BTreeMap::from([(height, Some(block.block_hash()))])
+ BTreeMap::from([(height, Some(hash))])
},
"chain update changeset is unexpected",
);
@@ -307,9 +292,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
let emitter = &mut Emitter::new(&env.client, chain.tip(), 0);
- while let Some((height, block)) = emitter.next_block()? {
- let _ = chain.apply_update(block_to_chain_update(&block, height))?;
- let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+ let _ = chain.apply_update(local_chain::Update {
+ tip: emission.checkpoint,
+ introduce_older_blocks: false,
+ })?;
+ let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
assert!(indexed_additions.is_empty());
}
@@ -367,10 +356,13 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
// must receive mined block which will confirm the transactions.
{
- let (height, block) = emitter.next_block()?.expect("must get mined block");
- let _ = chain
- .apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
- let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
+ let emission = emitter.next_block()?.expect("must get mined block");
+ let height = emission.block_height();
+ let _ = chain.apply_update(local_chain::Update {
+ tip: emission.checkpoint,
+ introduce_older_blocks: false,
+ })?;
+ let indexed_additions = indexed_tx_graph.apply_block_relevant(emission.block, height);
assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty());
assert_eq!(indexed_additions.graph.anchors, exp_anchors);
@@ -407,9 +399,12 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
for reorg_count in 1..=10 {
let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
- let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg");
+ let next_emission = emitter.next_header()?.expect("must emit block after reorg");
assert_eq!(
- (height as usize, next_header.block_hash()),
+ (
+ next_emission.block_height() as usize,
+ next_emission.block_hash()
+ ),
replaced_blocks[0],
"block emitted after reorg should be at the reorg height"
);
@@ -439,8 +434,9 @@ fn sync_from_emitter(
where
C: bitcoincore_rpc::RpcApi,
{
- while let Some((height, block)) = emitter.next_block()? {
- process_block(recv_chain, recv_graph, block, height)?;
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+ process_block(recv_chain, recv_graph, emission.block, height)?;
}
Ok(())
}
@@ -660,7 +656,8 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
// At this point, the emitter has seen all mempool transactions. It should only re-emit those
// that have introduction heights less than the emitter's last-emitted block tip.
- while let Some((height, _)) = emitter.next_header()? {
+ while let Some(emission) = emitter.next_header()? {
+ let height = emission.block_height();
// We call `mempool()` twice.
// The second call (at height `h`) should skip the tx introduced at height `h`.
for try_index in 0..2 {
@@ -754,7 +751,8 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
.collect::>());
// `next_header` emits the replacement block of the reorg
- if let Some((height, _)) = emitter.next_header()? {
+ if let Some(emission) = emitter.next_header()? {
+ let height = emission.block_height();
println!("\t- replacement height: {}", height);
// the mempool emission (that follows the first block emission after reorg) should only
@@ -835,12 +833,12 @@ fn no_agreement_point() -> anyhow::Result<()> {
env.mine_blocks(PREMINE_COUNT, None)?;
// emit block 99a
- let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header");
+ let block_header_99a = emitter.next_header()?.expect("block 99a header").block;
let block_hash_99a = block_header_99a.block_hash();
let block_hash_98a = block_header_99a.prev_blockhash;
// emit block 100a
- let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header");
+ let block_header_100a = emitter.next_header()?.expect("block 100a header").block;
let block_hash_100a = block_header_100a.block_hash();
// get hash for block 101a
@@ -855,7 +853,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
env.mine_blocks(3, None)?;
// emit block header 99b
- let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header");
+ let block_header_99b = emitter.next_header()?.expect("block 99b header").block;
let block_hash_99b = block_header_99b.block_hash();
let block_hash_98b = block_header_99b.prev_blockhash;
diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs
index 449242e4..648962c2 100644
--- a/example-crates/example_bitcoind_rpc_polling/src/main.rs
+++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs
@@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{
use bdk_chain::{
bitcoin::{constants::genesis_block, Block, Transaction},
indexed_tx_graph, keychain,
- local_chain::{self, CheckPoint, LocalChain},
+ local_chain::{self, LocalChain},
ConfirmationTimeHeightAnchor, IndexedTxGraph,
};
use example_cli::{
@@ -42,7 +42,7 @@ type ChangeSet = (
#[derive(Debug)]
enum Emission {
- Block { height: u32, block: Block },
+ Block(bdk_bitcoind_rpc::BlockEvent),
Mempool(Vec<(Transaction, u64)>),
Tip(u32),
}
@@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> {
let mut last_db_commit = Instant::now();
let mut last_print = Instant::now();
- while let Some((height, block)) = emitter.next_block()? {
+ while let Some(emission) = emitter.next_block()? {
+ let height = emission.block_height();
+
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();
- let chain_update =
- CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain
- .apply_update(chain_update)
+ .apply_update(local_chain::Update {
+ tip: emission.checkpoint,
+ introduce_older_blocks: false,
+ })
.expect("must always apply as we receive blocks in order from emitter");
- let graph_changeset = graph.apply_block_relevant(block, height);
+ let graph_changeset = graph.apply_block_relevant(emission.block, height);
db.stage((chain_changeset, graph_changeset));
// commit staged db changes in intervals
@@ -256,7 +259,8 @@ fn main() -> anyhow::Result<()> {
loop {
match emitter.next_block()? {
- Some((height, block)) => {
+ Some(block_emission) => {
+ let height = block_emission.block_height();
if sigterm_flag.load(Ordering::Acquire) {
break;
}
@@ -264,7 +268,7 @@ fn main() -> anyhow::Result<()> {
block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
}
- tx.send(Emission::Block { height, block })?;
+ tx.send(Emission::Block(block_emission))?;
}
None => {
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
@@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> {
let mut chain = chain.lock().unwrap();
let changeset = match emission {
- Emission::Block { height, block } => {
- let chain_update =
- CheckPoint::from_header(&block.header, height).into_update(false);
+ Emission::Block(block_emission) => {
+ let height = block_emission.block_height();
+ let chain_update = local_chain::Update {
+ tip: block_emission.checkpoint,
+ introduce_older_blocks: false,
+ };
let chain_changeset = chain
.apply_update(chain_update)
.expect("must always apply as we receive blocks in order from emitter");
- let graph_changeset = graph.apply_block_relevant(block, height);
+ let graph_changeset =
+ graph.apply_block_relevant(block_emission.block, height);
(chain_changeset, graph_changeset)
}
Emission::Mempool(mempool_txs) => {