diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 914d0375..42df6adc 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain { /// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum /// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded fn from_config(config: &Self::Config) -> Result { - let wallet_name = config.wallet_name.clone(); - let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name); - debug!("connecting to {} auth:{:?}", wallet_url, config.auth); + let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name); let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?; let rpc_version = client.version()?; - let loaded_wallets = client.list_wallets()?; - if loaded_wallets.contains(&wallet_name) { - debug!("wallet already loaded {:?}", wallet_name); - } else if list_wallet_dir(&client)?.contains(&wallet_name) { - client.load_wallet(&wallet_name)?; - debug!("wallet loaded {:?}", wallet_name); + info!("connected to '{}' with auth: {:?}", wallet_url, config.auth); + + if client.list_wallets()?.contains(&config.wallet_name) { + info!("wallet already loaded: {}", config.wallet_name); + } else if list_wallet_dir(&client)?.contains(&config.wallet_name) { + client.load_wallet(&config.wallet_name)?; + info!("wallet loaded: {}", config.wallet_name); } else { // pre-0.21 use legacy wallets if rpc_version < 210_000 { - client.create_wallet(&wallet_name, Some(true), None, None, None)?; + client.create_wallet(&config.wallet_name, Some(true), None, None, None)?; } else { // TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed let args = [ - Value::String(wallet_name.clone()), + Value::String(config.wallet_name.clone()), Value::Bool(true), Value::Bool(false), Value::Null, @@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain { let _: Value = client.call("createwallet", &args)?; } - debug!("wallet created {:?}", wallet_name); + info!("wallet created: {}", config.wallet_name); } let is_descriptors = is_wallet_descriptor(&client)?; @@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // wait for Core wallet to rescan (TODO: maybe make this async) await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; - // loop through results of Core RPC method `listtransactions` - for tx_res in CoreTxIter::new(client, 100) { - let tx_res = tx_res?; + // obtain iterator of pagenated `listtransactions` RPC calls + const LIST_TX_PAGE_SIZE: usize = 100; // item count per page + let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| { + // filter out conflicting transactions - only accept transactions that are already + // confirmed, or exists in mempool + item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok() + }); + + // iterate through chronological results of `listtransactions` + for tx_res in tx_iter { let mut updated = false; let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| { @@ -695,81 +701,53 @@ where Ok(()) } -/// Iterates through results of multiple `listtransactions` calls. -struct CoreTxIter<'a> { - client: &'a Client, +/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results +/// in chronological order. +/// +/// `page_size` cannot be less than 1 and cannot be greater than 1000. +fn list_transactions( + client: &Client, page_size: usize, - page_index: usize, - - stack: Vec, - done: bool, -} - -impl<'a> CoreTxIter<'a> { - fn new(client: &'a Client, mut page_size: usize) -> Self { - if page_size > 1000 { - page_size = 1000; - } - - Self { - client, - page_size, - page_index: 0, - stack: Vec::with_capacity(page_size), - done: false, - } +) -> Result, Error> { + if !(1..=1000).contains(&page_size) { + return Err(Error::Generic(format!( + "Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}", + page_size + ))); } - /// We want to filter out conflicting transactions. - /// Only accept transactions that are already confirmed, or existing in mempool. - fn keep_tx(&self, item: &ListTransactionResult) -> bool { - item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok() - } -} + // `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`) + let mut got_err = false; -impl<'a> Iterator for CoreTxIter<'a> { - type Item = Result; - - fn next(&mut self) -> Option { - loop { - if self.done { - return None; + // obtain results in batches (of `page_size`) + let nested_list = (0_usize..) + .map(|page_index| { + client.list_transactions( + None, + Some(page_size), + Some(page_size * page_index), + Some(true), + ) + }) + // take until returned rpc call is empty or until error + // TODO: replace with the following when MSRV is 1.57.0: + // `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())` + .take_while(|res| { + if got_err || matches!(res, Ok(list) if list.is_empty()) { + // break if last iteration was an error, or if the current result is empty + false + } else { + // record whether result is error or not + got_err = res.is_err(); + // continue on non-empty result or first error + true } + }) + .collect::, _>>() + .map_err(Error::Rpc)?; - if let Some(item) = self.stack.pop() { - if self.keep_tx(&item) { - return Some(Ok(item)); - } - } - - let res = self - .client - .list_transactions( - None, - Some(self.page_size), - Some(self.page_size * self.page_index), - Some(true), - ) - .map_err(Error::Rpc); - - self.page_index += 1; - - let list = match res { - Ok(list) => list, - Err(err) => { - self.done = true; - return Some(Err(err)); - } - }; - - if list.is_empty() { - self.done = true; - return None; - } - - self.stack = list; - } - } + // reverse here to have txs in chronological order + Ok(nested_list.into_iter().rev().flatten()) } fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> { @@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory { #[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))] mod test { use super::*; - use crate::testutils::blockchain_tests::TestClient; + use crate::{ + descriptor::{into_wallet_descriptor_checked, AsDerived}, + testutils::blockchain_tests::TestClient, + wallet::utils::SecpCtx, + }; - use bitcoin::Network; + use bitcoin::{Address, Network}; use bitcoincore_rpc::RpcApi; + use log::LevelFilter; + use miniscript::DescriptorTrait; crate::bdk_blockchain_tests! { fn test_instance(test_client: &TestClient) -> RpcBlockchain { @@ -942,4 +926,63 @@ mod test { "prefix-bbbbbb" ); } + + /// This test ensures that [list_transactions] always iterates through transactions in + /// chronological order, independent of the `page_size`. + #[test] + fn test_list_transactions() { + let _ = env_logger::builder() + .filter_level(LevelFilter::Info) + .default_format() + .try_init(); + + const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)"; + const AMOUNT_PER_TX: u64 = 10_000; + const TX_COUNT: u32 = 50; + + let secp = SecpCtx::default(); + let network = Network::Regtest; + let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap(); + + let (mut test_client, factory) = get_factory(); + let bc = factory.build("itertest", None).unwrap(); + + // generate scripts (1 tx per script) + let scripts = (0..TX_COUNT) + .map(|index| desc.as_derived(index, &secp).script_pubkey()) + .collect::>(); + + // import scripts and wait + if bc.is_descriptors { + import_descriptors(&bc.client, 0, scripts.iter()).unwrap(); + } else { + import_multi(&bc.client, 0, scripts.iter()).unwrap(); + } + await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap(); + + // create and broadcast txs + let expected_txids = scripts + .iter() + .map(|script| { + let addr = Address::from_script(script, network).unwrap(); + let txid = + test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) }); + test_client.generate(1, None); + txid + }) + .collect::>(); + + // iterate through different page sizes - should always return txs in chronological order + [1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| { + println!("trying with page_size: {}", page_size); + + let txids = list_transactions(&bc.client, *page_size) + .unwrap() + .map(|res| res.info.txid) + .collect::>(); + + assert_eq!(txids.len(), expected_txids.len()); + assert_eq!(txids, expected_txids); + }); + } }