From 5f34df8489fedae2aa3fd001036cc9ef6abe9a7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 7 Oct 2023 00:56:01 +0800 Subject: [PATCH] bitcoind_rpc!: bring back `CheckPoint`s to `Emitter` * `bdk_chain` dependency is added. In the future, we will introduce a separate `bdk_core` crate to contain shared types. * replace `Emitter::new` with `from_height` and `from_checkpoint` * `from_height` emits from the given start height * `from_checkpoint` uses the provided cp to find agreement point * introduce logic that ensures emitted blocks can connect with receiver's `LocalChain` * in our rpc example, we can now `expect()` chain updates to always since we are using checkpoints and receiving blocks in order --- crates/bitcoind_rpc/Cargo.toml | 6 +- crates/bitcoind_rpc/src/lib.rs | 76 ++++++++++++++----- crates/bitcoind_rpc/tests/test_emitter.rs | 16 ++-- .../example_bitcoind_rpc_polling/src/main.rs | 29 +++---- 4 files changed, 84 insertions(+), 43 deletions(-) diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index eeb9de58..f04469d2 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -9,13 +9,13 @@ edition = "2021" # For no-std, remember to enable the bitcoin/no-std feature bitcoin = { version = "0.30", default-features = false } bitcoincore-rpc = { version = "0.17" } +bdk_chain = { path = "../chain", version = "0.5", default-features = false } [dev-dependencies] -bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] } bitcoind = { version = "0.33", features = ["25_0"] } anyhow = { version = "1" } [features] default = ["std"] -std = ["bitcoin/std"] -serde = ["bitcoin/serde"] +std = ["bitcoin/std", "bdk_chain/std"] +serde = ["bitcoin/serde", "bdk_chain/serde"] diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 8ed646c8..f200550b 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -9,8 +9,7 @@ //! mempool. #![warn(missing_docs)] -use std::collections::BTreeMap; - +use bdk_chain::{local_chain::CheckPoint, BlockId}; use bitcoin::{block::Header, Block, BlockHash, Transaction}; pub use bitcoincore_rpc; use bitcoincore_rpc::bitcoincore_rpc_json; @@ -24,7 +23,7 @@ pub struct Emitter<'c, C> { client: &'c C, start_height: u32, - emitted_blocks: BTreeMap, + last_cp: Option, last_block: Option, /// The latest first-seen epoch of emitted mempool transactions. This is used to determine @@ -37,14 +36,29 @@ pub struct Emitter<'c, C> { } impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { - /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// Construct a new [`Emitter`] with the given RPC `client` and `start_height`. /// /// `start_height` is the block height to start emitting blocks from. - pub fn new(client: &'c C, start_height: u32) -> Self { + pub fn from_height(client: &'c C, start_height: u32) -> Self { Self { client, start_height, - emitted_blocks: BTreeMap::new(), + last_cp: None, + last_block: None, + last_mempool_time: 0, + last_mempool_tip: None, + } + } + + /// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`. + /// + /// `checkpoint` is used to find the latest block which is still part of the best chain. The + /// [`Emitter`] will emit blocks starting right above this block. + pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self { + Self { + client, + start_height: 0, + last_cp: Some(checkpoint), last_block: None, last_mempool_time: 0, last_mempool_tip: None, @@ -114,7 +128,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { .collect::, _>>()?; self.last_mempool_time = latest_time; - self.last_mempool_tip = self.emitted_blocks.iter().last().map(|(&height, _)| height); + self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height()); Ok(txs_to_emit) } @@ -135,7 +149,7 @@ enum PollResponse { NoMoreBlocks, /// Fetched block is not in the best chain. BlockNotInBestChain, - AgreementFound(bitcoincore_rpc_json::GetBlockResult), + AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint), AgreementPointNotFound, } @@ -146,7 +160,10 @@ where let client = emitter.client; if let Some(last_res) = &emitter.last_block { - assert!(!emitter.emitted_blocks.is_empty()); + assert!( + emitter.last_cp.is_some(), + "must not have block result without last cp" + ); let next_hash = match last_res.nextblockhash { None => return Ok(PollResponse::NoMoreBlocks), @@ -160,7 +177,7 @@ where return Ok(PollResponse::Block(res)); } - if emitter.emitted_blocks.is_empty() { + if emitter.last_cp.is_none() { let hash = client.get_block_hash(emitter.start_height as _)?; let res = client.get_block_info(&hash)?; @@ -170,15 +187,15 @@ where return Ok(PollResponse::Block(res)); } - for (&_, hash) in emitter.emitted_blocks.iter().rev() { - let res = client.get_block_info(hash)?; + for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) { + let res = client.get_block_info(&cp.hash())?; if res.confirmations < 0 { // block is not in best chain continue; } // agreement point found - return Ok(PollResponse::AgreementFound(res)); + return Ok(PollResponse::AgreementFound(res, cp)); } Ok(PollResponse::AgreementPointNotFound) @@ -196,9 +213,28 @@ where match poll_once(emitter)? { PollResponse::Block(res) => { let height = res.height as u32; - let item = get_item(&res.hash)?; - assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None); + let hash = res.hash; + let item = get_item(&hash)?; + + let this_id = BlockId { height, hash }; + let prev_id = res.previousblockhash.map(|prev_hash| BlockId { + height: height - 1, + hash: prev_hash, + }); + + match (&mut emitter.last_cp, prev_id) { + (Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"), + (last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)), + // When the receiver constructs a local_chain update from a block, the previous + // checkpoint is also included in the update. We need to reflect this state in + // `Emitter::last_cp` as well. + (last_cp, Some(prev_id)) => { + *last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push")) + } + } + emitter.last_block = Some(res); + return Ok(Some((height, item))); } PollResponse::NoMoreBlocks => { @@ -209,11 +245,11 @@ where emitter.last_block = None; continue; } - PollResponse::AgreementFound(res) => { + PollResponse::AgreementFound(res, cp) => { let agreement_h = res.height as u32; // get rid of evicted blocks - emitter.emitted_blocks.split_off(&(agreement_h + 1)); + emitter.last_cp = Some(cp); // The tip during the last mempool emission needs to in the best chain, we reduce // it if it is not. @@ -226,7 +262,11 @@ where continue; } PollResponse::AgreementPointNotFound => { - emitter.emitted_blocks.clear(); + // We want to clear `last_cp` and set `start_height` to the first checkpoint's + // height. This way, the first checkpoint in `LocalChain` can be replaced. + if let Some(last_cp) = emitter.last_cp.take() { + emitter.start_height = last_cp.height(); + } emitter.last_block = None; continue; } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 5d57bde1..f0bbd3d1 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -189,7 +189,7 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let mut local_chain = LocalChain::default(); - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine some blocks and returned the actual block hashes let exp_hashes = { @@ -305,7 +305,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(&env.client, 0); + let emitter = &mut Emitter::from_height(&env.client, 0); while let Some((height, block)) = emitter.next_block()? { let _ = chain.apply_update(block_to_chain_update(&block, height))?; @@ -393,7 +393,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { const CHAIN_TIP_HEIGHT: usize = 110; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _); + let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; while emitter.next_header()?.is_some() {} @@ -461,7 +461,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { const SEND_AMOUNT: Amount = Amount::from_sat(10_000); let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // setup addresses let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked(); @@ -542,7 +542,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { const MEMPOOL_TX_COUNT: usize = 2; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks and sync up emitter let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -597,7 +597,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<() const MEMPOOL_TX_COUNT: usize = 21; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks to get initial balance, sync emitter up to tip let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -674,7 +674,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; let env = TestEnv::new()?; - let mut emitter = Emitter::new(&env.client, 0); + let mut emitter = Emitter::from_height(&env.client, 0); // mine blocks to get initial balance let addr = env.client.get_new_address(None, None)?.assume_checked(); @@ -789,7 +789,7 @@ fn no_agreement_point() -> anyhow::Result<()> { let env = TestEnv::new()?; // start height is 99 - let mut emitter = Emitter::new(&env.client, (PREMINE_COUNT - 2) as u32); + let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32); // mine 101 blocks env.mine_blocks(PREMINE_COUNT, None)?; diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index c9bcc972..ad77030a 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -27,8 +27,6 @@ const DB_MAGIC: &[u8] = b"bdk_example_rpc"; const DB_PATH: &str = ".bdk_example_rpc.db"; const CHANNEL_BOUND: usize = 10; -/// The block depth which we assume no reorgs can happen at. -const ASSUME_FINAL_DEPTH: u32 = 6; /// Delay for printing status to stdout. const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); /// Delay between mempool emissions. @@ -160,13 +158,12 @@ fn main() -> anyhow::Result<()> { let mut db = db.lock().unwrap(); graph.index.set_lookahead_for_all(lookahead); - // we start at a height lower than last-seen tip in case of reorgs - let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| { - cp.height().saturating_sub(ASSUME_FINAL_DEPTH) - }); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, start_height); + let mut emitter = match chain.tip() { + Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), + None => Emitter::from_height(&rpc_client, fallback_height), + }; let mut last_db_commit = Instant::now(); let mut last_print = Instant::now(); @@ -174,7 +171,9 @@ fn main() -> anyhow::Result<()> { while let Some((height, block)) = emitter.next_block()? { let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain + .apply_update(chain_update) + .expect("must always apply as we recieve blocks in order from emitter"); let graph_changeset = graph.apply_block_relevant(block, height); db.stage((chain_changeset, graph_changeset)); @@ -227,17 +226,17 @@ fn main() -> anyhow::Result<()> { let sigterm_flag = start_ctrlc_handler(); graph.lock().unwrap().index.set_lookahead_for_all(lookahead); - // we start at a height lower than last-seen tip in case of reorgs - let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| { - cp.height().saturating_sub(ASSUME_FINAL_DEPTH) - }); + let last_cp = chain.lock().unwrap().tip(); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { println!("emitter thread started..."); let rpc_client = rpc_args.new_client()?; - let mut emitter = Emitter::new(&rpc_client, start_height); + let mut emitter = match last_cp { + Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), + None => Emitter::from_height(&rpc_client, fallback_height), + }; let mut block_count = rpc_client.get_block_count()? as u32; tx.send(Emission::Tip(block_count))?; @@ -284,7 +283,9 @@ fn main() -> anyhow::Result<()> { Emission::Block { height, block } => { let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain + .apply_update(chain_update) + .expect("must always apply as we recieve blocks in order from emitter"); let graph_changeset = graph.apply_block_relevant(block, height); (chain_changeset, graph_changeset) }