use std::collections::{BTreeMap, BTreeSet}; use bdk_bitcoind_rpc::Emitter; use bdk_chain::{ bitcoin::{Address, Amount, BlockHash, Txid}, indexed_tx_graph::InsertTxItem, keychain::Balance, local_chain::{self, CheckPoint, LocalChain}, Append, BlockId, IndexedTxGraph, SpkTxOutIndex, }; use bitcoin::{ address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, secp256k1::rand::random, Block, CompactTarget, OutPoint, ScriptBuf, ScriptHash, Transaction, TxIn, TxOut, WScriptHash, }; use bitcoincore_rpc::{ bitcoincore_rpc_json::{GetBlockTemplateModes, GetBlockTemplateRules}, RpcApi, }; struct TestEnv { #[allow(dead_code)] daemon: bitcoind::BitcoinD, client: bitcoincore_rpc::Client, } impl TestEnv { fn new() -> anyhow::Result { let daemon = match std::env::var_os("TEST_BITCOIND") { Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path), None => bitcoind::BitcoinD::from_downloaded(), }?; let client = bitcoincore_rpc::Client::new( &daemon.rpc_url(), bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()), )?; Ok(Self { daemon, client }) } fn mine_blocks( &self, count: usize, address: Option
, ) -> anyhow::Result> { let coinbase_address = match address { Some(address) => address, None => self.client.get_new_address(None, None)?.assume_checked(), }; let block_hashes = self .client .generate_to_address(count as _, &coinbase_address)?; Ok(block_hashes) } fn mine_empty_block(&self) -> anyhow::Result<(usize, BlockHash)> { let bt = self.client.get_block_template( GetBlockTemplateModes::Template, &[GetBlockTemplateRules::SegWit], &[], )?; let txdata = vec![Transaction { version: 1, lock_time: bitcoin::absolute::LockTime::from_height(0)?, input: vec![TxIn { previous_output: bitcoin::OutPoint::default(), script_sig: ScriptBuf::builder() .push_int(bt.height as _) // randomn number so that re-mining creates unique block .push_int(random()) .into_script(), sequence: bitcoin::Sequence::default(), witness: bitcoin::Witness::new(), }], output: vec![TxOut { value: 0, script_pubkey: ScriptBuf::new_p2sh(&ScriptHash::all_zeros()), }], }]; let bits: [u8; 4] = bt .bits .clone() .try_into() .expect("rpc provided us with invalid bits"); let mut block = Block { header: Header { version: bitcoin::block::Version::default(), prev_blockhash: bt.previous_block_hash, merkle_root: TxMerkleNode::all_zeros(), time: Ord::max(bt.min_time, std::time::UNIX_EPOCH.elapsed()?.as_secs()) as u32, bits: CompactTarget::from_consensus(u32::from_be_bytes(bits)), nonce: 0, }, txdata, }; block.header.merkle_root = block.compute_merkle_root().expect("must compute"); for nonce in 0..=u32::MAX { block.header.nonce = nonce; if block.header.target().is_met_by(block.block_hash()) { break; } } self.client.submit_block(&block)?; Ok((bt.height as usize, block.block_hash())) } fn invalidate_blocks(&self, count: usize) -> anyhow::Result<()> { let mut hash = self.client.get_best_block_hash()?; for _ in 0..count { let prev_hash = self.client.get_block_info(&hash)?.previousblockhash; self.client.invalidate_block(&hash)?; match prev_hash { Some(prev_hash) => hash = prev_hash, None => break, } } Ok(()) } fn reorg(&self, count: usize) -> anyhow::Result> { let start_height = self.client.get_block_count()?; self.invalidate_blocks(count)?; let res = self.mine_blocks(count, None); assert_eq!( self.client.get_block_count()?, start_height, "reorg should not result in height change" ); res } fn reorg_empty_blocks(&self, count: usize) -> anyhow::Result> { let start_height = self.client.get_block_count()?; self.invalidate_blocks(count)?; let res = (0..count) .map(|_| self.mine_empty_block()) .collect::, _>>()?; assert_eq!( self.client.get_block_count()?, start_height, "reorg should not result in height change" ); Ok(res) } fn send(&self, address: &Address, amount: Amount) -> anyhow::Result { let txid = self .client .send_to_address(address, amount, None, None, None, None, None, None)?; Ok(txid) } } fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update { let this_id = BlockId { height, hash: block.block_hash(), }; let tip = if block.header.prev_blockhash == BlockHash::all_zeros() { CheckPoint::new(this_id) } else { CheckPoint::new(BlockId { height: height - 1, hash: block.header.prev_blockhash, }) .extend(core::iter::once(this_id)) .expect("must construct checkpoint") }; local_chain::Update { tip, introduce_older_blocks: false, } } fn block_to_tx_graph_update( block: &bitcoin::Block, height: u32, ) -> impl Iterator>> { let anchor = BlockId { hash: block.block_hash(), height, }; block.txdata.iter().map(move |tx| (tx, Some(anchor), None)) } /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. /// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. /// 3. Reorg highest 6 blocks. /// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. #[test] pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let mut local_chain = LocalChain::default(); let mut emitter = Emitter::new(&env.client, 0); // mine some blocks and returned the actual block hashes let exp_hashes = { let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block hashes.extend(env.mine_blocks(101, None)?); hashes }; // see if the emitter outputs the right blocks println!("first sync:"); while let Some((height, block)) = emitter.next_block()? { assert_eq!( block.block_hash(), exp_hashes[height as usize], "emitted block hash is unexpected" ); let chain_update = block_to_chain_update(&block, height); assert_eq!( local_chain.apply_update(chain_update)?, BTreeMap::from([(height, Some(block.block_hash()))]), "chain update changeset is unexpected", ); } assert_eq!( local_chain.blocks(), &exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) .collect(), "final local_chain state is unexpected", ); // perform reorg let reorged_blocks = env.reorg(6)?; let exp_hashes = exp_hashes .iter() .take(exp_hashes.len() - reorged_blocks.len()) .chain(&reorged_blocks) .cloned() .collect::>(); // see if the emitter outputs the right blocks println!("after reorg:"); let mut exp_height = exp_hashes.len() - reorged_blocks.len(); while let Some((height, block)) = emitter.next_block()? { assert_eq!( height, exp_height as u32, "emitted block has unexpected height" ); assert_eq!( block.block_hash(), exp_hashes[height as usize], "emitted block is unexpected" ); let chain_update = block_to_chain_update(&block, height); assert_eq!( local_chain.apply_update(chain_update)?, if exp_height == exp_hashes.len() - reorged_blocks.len() { core::iter::once((height, Some(block.block_hash()))) .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None))) .collect::() } else { BTreeMap::from([(height, Some(block.block_hash()))]) }, "chain update changeset is unexpected", ); exp_height += 1; } assert_eq!( local_chain.blocks(), &exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) .collect(), "final local_chain state is unexpected after reorg", ); Ok(()) } /// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and /// block updates. /// /// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update #[test] fn test_into_tx_graph() -> anyhow::Result<()> { let env = TestEnv::new()?; println!("getting new addresses!"); let addr_0 = env.client.get_new_address(None, None)?.assume_checked(); let addr_1 = env.client.get_new_address(None, None)?.assume_checked(); let addr_2 = env.client.get_new_address(None, None)?.assume_checked(); println!("got new addresses!"); println!("mining block!"); env.mine_blocks(101, None)?; println!("mined blocks!"); let mut chain = LocalChain::default(); let mut indexed_tx_graph = IndexedTxGraph::::new({ let mut index = SpkTxOutIndex::::default(); index.insert_spk(0, addr_0.script_pubkey()); index.insert_spk(1, addr_1.script_pubkey()); index.insert_spk(2, addr_2.script_pubkey()); index }); let emitter = &mut Emitter::new(&env.client, 0); while let Some((height, block)) = emitter.next_block()? { let _ = chain.apply_update(block_to_chain_update(&block, height))?; let indexed_additions = indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); assert!(indexed_additions.is_empty()); } // send 3 txs to a tracked address, these txs will be in the mempool let exp_txids = { let mut txids = BTreeSet::new(); for _ in 0..3 { txids.insert(env.client.send_to_address( &addr_0, Amount::from_sat(10_000), None, None, None, None, None, None, )?); } txids }; // expect that the next block should be none and we should get 3 txs from mempool { // next block should be `None` assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; let indexed_additions = indexed_tx_graph .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); assert_eq!( indexed_additions .graph .txs .iter() .map(|tx| tx.txid()) .collect::>(), exp_txids, "changeset should have the 3 mempool transactions", ); assert!(indexed_additions.graph.anchors.is_empty()); } // mine a block that confirms the 3 txs let exp_block_hash = env.mine_blocks(1, None)?[0]; let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32; let exp_anchors = exp_txids .iter() .map({ let anchor = BlockId { height: exp_block_height, hash: exp_block_hash, }; move |&txid| (anchor, txid) }) .collect::>(); // must receive mined block which will confirm the transactions. { let (height, block) = emitter.next_block()?.expect("must get mined block"); let _ = chain.apply_update(block_to_chain_update(&block, height))?; let indexed_additions = indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height)); assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txouts.is_empty()); assert_eq!(indexed_additions.graph.anchors, exp_anchors); } Ok(()) } /// Ensure next block emitted after reorg is at reorg height. /// /// After a reorg, if the last-emitted block height is equal or greater than the reorg height, and /// the fallback height is equal to or lower than the reorg height, the next block/header emission /// should be at the reorg height. /// /// TODO: If the reorg height is lower than the fallback height, how do we find a block height to /// emit that can connect with our receiver chain? #[test] fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { const EMITTER_START_HEIGHT: usize = 100; const CHAIN_TIP_HEIGHT: usize = 110; let env = TestEnv::new()?; let mut emitter = Emitter::new(&env.client, EMITTER_START_HEIGHT as _); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; while emitter.next_header()?.is_some() {} for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg"); assert_eq!( (height as usize, next_header.block_hash()), replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); while emitter.next_header()?.is_some() {} } Ok(()) } fn process_block( recv_chain: &mut LocalChain, recv_graph: &mut IndexedTxGraph>, block: Block, block_height: u32, ) -> anyhow::Result<()> { recv_chain .apply_update(CheckPoint::from_header(&block.header, block_height).into_update(false))?; let _ = recv_graph.apply_block(block, block_height); Ok(()) } fn sync_from_emitter( recv_chain: &mut LocalChain, recv_graph: &mut IndexedTxGraph>, emitter: &mut Emitter, ) -> anyhow::Result<()> where C: bitcoincore_rpc::RpcApi, { while let Some((height, block)) = emitter.next_block()? { process_block(recv_chain, recv_graph, block, height)?; } Ok(()) } fn get_balance( recv_chain: &LocalChain, recv_graph: &IndexedTxGraph>, ) -> anyhow::Result { let chain_tip = recv_chain .tip() .map_or(BlockId::default(), |cp| cp.block_id()); let outpoints = recv_graph.index.outpoints().clone(); let balance = recv_graph .graph() .balance(recv_chain, chain_tip, outpoints, |_, _| true); Ok(balance) } /// If a block is reorged out, ensure that containing transactions that do not exist in the /// replacement block(s) become unconfirmed. #[test] fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; const ADDITIONAL_COUNT: usize = 11; const SEND_AMOUNT: Amount = Amount::from_sat(10_000); let env = TestEnv::new()?; let mut emitter = Emitter::new(&env.client, 0); // setup addresses let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked(); let spk_to_track = ScriptBuf::new_v0_p2wsh(&WScriptHash::all_zeros()); let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; // setup receiver let mut recv_chain = LocalChain::default(); let mut recv_graph = IndexedTxGraph::::new({ let mut recv_index = SpkTxOutIndex::default(); recv_index.insert_spk((), spk_to_track.clone()); recv_index }); // mine and sync receiver up to tip env.mine_blocks(PREMINE_COUNT, Some(addr_to_mine))?; // create transactions that are tracked by our receiver for _ in 0..ADDITIONAL_COUNT { let txid = env.send(&addr_to_track, SEND_AMOUNT)?; // lock outputs that send to `addr_to_track` let outpoints_to_lock = env .client .get_transaction(&txid, None)? .transaction()? .output .into_iter() .enumerate() .filter(|(_, txo)| txo.script_pubkey == spk_to_track) .map(|(vout, _)| OutPoint::new(txid, vout as _)) .collect::>(); env.client.lock_unspent(&outpoints_to_lock)?; let _ = env.mine_blocks(1, None)?; } // get emitter up to tip sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; assert_eq!( get_balance(&recv_chain, &recv_graph)?, Balance { confirmed: SEND_AMOUNT.to_sat() * ADDITIONAL_COUNT as u64, ..Balance::default() }, "initial balance must be correct", ); // perform reorgs with different depths for reorg_count in 1..=ADDITIONAL_COUNT { env.reorg_empty_blocks(reorg_count)?; sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; assert_eq!( get_balance(&recv_chain, &recv_graph)?, Balance { confirmed: SEND_AMOUNT.to_sat() * (ADDITIONAL_COUNT - reorg_count) as u64, trusted_pending: SEND_AMOUNT.to_sat() * reorg_count as u64, ..Balance::default() }, "reorg_count: {}", reorg_count, ); } Ok(()) } /// Ensure avoid-re-emission-logic is sound when [`Emitter`] is synced to tip. /// /// The receiver (bdk_chain structures) is synced to the chain tip, and there is txs in the mempool. /// When we call Emitter::mempool multiple times, mempool txs should not be re-emitted, even if the /// chain tip is extended. #[test] fn mempool_avoids_re_emission() -> anyhow::Result<()> { const BLOCKS_TO_MINE: usize = 101; const MEMPOOL_TX_COUNT: usize = 2; let env = TestEnv::new()?; let mut emitter = Emitter::new(&env.client, 0); // mine blocks and sync up emitter let addr = env.client.get_new_address(None, None)?.assume_checked(); env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?; while emitter.next_header()?.is_some() {} // have some random txs in mempool let exp_txids = (0..MEMPOOL_TX_COUNT) .map(|_| env.send(&addr, Amount::from_sat(2100))) .collect::, _>>()?; // the first emission should include all transactions let emitted_txids = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); assert_eq!( emitted_txids, exp_txids, "all mempool txs should be emitted" ); // second emission should be empty assert!( emitter.mempool()?.is_empty(), "second emission should be empty" ); // mine empty blocks + sync up our emitter -> we should still not re-emit for _ in 0..BLOCKS_TO_MINE { env.mine_empty_block()?; } while emitter.next_header()?.is_some() {} assert!( emitter.mempool()?.is_empty(), "third emission, after chain tip is extended, should also be empty" ); Ok(()) } /// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction /// height. /// /// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous /// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check /// that `mempool` should always re-emit txs that have introduced at a height greater than the last /// emitted block height. #[test] fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; const MEMPOOL_TX_COUNT: usize = 21; let env = TestEnv::new()?; let mut emitter = Emitter::new(&env.client, 0); // mine blocks to get initial balance, sync emitter up to tip let addr = env.client.get_new_address(None, None)?.assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; while emitter.next_header()?.is_some() {} // mine blocks to introduce txs to mempool at different heights let tx_introductions = (0..MEMPOOL_TX_COUNT) .map(|_| -> anyhow::Result<_> { let (height, _) = env.mine_empty_block()?; let txid = env.send(&addr, Amount::from_sat(2100))?; Ok((height, txid)) }) .collect::>>()?; assert_eq!( emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(), tx_introductions.iter().map(|&(_, txid)| txid).collect(), "first mempool emission should include all txs", ); assert_eq!( emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(), tx_introductions.iter().map(|&(_, txid)| txid).collect(), "second mempool emission should still include all txs", ); // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. while let Some((height, _)) = emitter.next_header()? { // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. for try_index in 0..2 { let exp_txids = tx_introductions .range((height as usize + try_index, Txid::all_zeros())..) .map(|&(_, txid)| txid) .collect::>(); let emitted_txids = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); assert_eq!( emitted_txids, exp_txids, "\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}", height, try_index, exp_txids .difference(&emitted_txids) .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) .collect::>(), emitted_txids .difference(&exp_txids) .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) .collect::>(), ); } } Ok(()) } /// Ensure we force re-emit all mempool txs after reorg. #[test] fn mempool_during_reorg() -> anyhow::Result<()> { const TIP_DIFF: usize = 10; const PREMINE_COUNT: usize = 101; let env = TestEnv::new()?; let mut emitter = Emitter::new(&env.client, 0); // mine blocks to get initial balance let addr = env.client.get_new_address(None, None)?.assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; // introduce mempool tx at each block extension for _ in 0..TIP_DIFF { env.mine_empty_block()?; env.send(&addr, Amount::from_sat(2100))?; } // perform reorgs at different heights for reorg_count in 1..TIP_DIFF { // sync emitter to tip while emitter.next_header()?.is_some() {} println!("REORG COUNT: {}", reorg_count); env.reorg_empty_blocks(reorg_count)?; // we recalculate this at every loop as reorgs may evict transactions from mempool let tx_introductions = env .client .get_raw_mempool_verbose()? .into_iter() .map(|(txid, entry)| (txid, entry.height as usize)) .collect::>(); if let Some((height, _)) = emitter.next_header()? { // the mempool emission (that follows the first block emission after reorg) should return // the entire mempool contents let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); let exp_mempool = tx_introductions.keys().copied().collect::>(); assert_eq!( mempool, exp_mempool, "the first mempool emission after reorg should include all mempool txs" ); let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.txid()) .collect::>(); let exp_mempool = tx_introductions .iter() .filter(|&(_, &intro_height)| intro_height > (height as usize)) .map(|(&txid, _)| txid) .collect::>(); assert_eq!( mempool, exp_mempool, "following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}", exp_mempool .difference(&mempool) .map(|txid| (txid, tx_introductions.get(txid).unwrap())) .collect::>(), mempool .difference(&exp_mempool) .map(|txid| (txid, tx_introductions.get(txid).unwrap())) .collect::>(), ); } } Ok(()) }