From ac19c19f21fce43a99ecf0c4f95ae818b620558c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 23 Jul 2022 07:44:39 +0800 Subject: [PATCH] New `RpcBlockchain` implementation with various fixes The new implementation fixes the following: * We can track more than 100 scriptPubKeys * We can obtain more than 1000 transactions per sync * `TransactionDetails` for already-synced transactions are updated when new scriptPubKeys are introduced (fixing the missing balance/coins issue of supposedly tracked scriptPubKeys) `RpcConfig` changes: * Introduce `RpcSyncParams`. * Remove `RpcConfig::skip_blocks` (this is replaced by `RpcSyncParams::start_time`). --- CHANGELOG.md | 1 + examples/rpcwallet.rs | 2 +- src/blockchain/rpc.rs | 798 ++++++++++++++++++++---------- src/testutils/blockchain_tests.rs | 4 +- 4 files changed, 547 insertions(+), 258 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa369a47..3fda3e06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add the ability to specify whether a taproot transaction should be signed using the internal key or not, using `sign_with_tap_internal_key` in `SignOptions` - Consolidate params `fee_amount` and `amount_needed` in `target_amount` in `CoinSelectionAlgorithm::coin_select` signature. - Change the meaning of the `fee_amount` field inside `CoinSelectionResult`: from now on the `fee_amount` will represent only the fees asociated with the utxos in the `selected` field of `CoinSelectionResult`. +- New `RpcBlockchain` implementation with various fixes. ## [v0.20.0] - [v0.19.0] diff --git a/examples/rpcwallet.rs b/examples/rpcwallet.rs index 3178af6b..24a55591 100644 --- a/examples/rpcwallet.rs +++ b/examples/rpcwallet.rs @@ -103,7 +103,7 @@ fn main() -> Result<(), Box> { auth: bitcoind_auth, network: Network::Regtest, wallet_name, - skip_blocks: None, + sync_params: None, }; // Use the above configuration to create a RPC blockchain backend diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 1d0d884c..410e92f9 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -26,21 +26,23 @@ //! }, //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), -//! skip_blocks: None, +//! sync_params: None, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` -use crate::bitcoin::consensus::deserialize; use crate::bitcoin::hashes::hex::ToHex; -use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid}; +use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid}; use crate::blockchain::*; -use crate::database::{BatchDatabase, DatabaseUtils}; +use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; use crate::descriptor::get_checksum; +use crate::error::MissingCachedScripts; use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails}; +use bitcoin::Script; use bitcoincore_rpc::json::{ - GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest, - ImportMultiRequestScriptPubkey, ImportMultiRescanSince, + GetTransactionResult, GetTransactionResultDetailCategory, ImportMultiOptions, + ImportMultiRequest, ImportMultiRequestScriptPubkey, ImportMultiRescanSince, + ListTransactionResult, ScanningDetails, }; use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; use bitcoincore_rpc::Auth as RpcAuth; @@ -49,7 +51,8 @@ use log::debug; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use std::str::FromStr; +use std::thread; +use std::time::Duration; /// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait #[derive(Debug)] @@ -60,11 +63,8 @@ pub struct RpcBlockchain { is_descriptors: bool, /// Blockchain capabilities, cached here at startup capabilities: HashSet, - /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block - skip_blocks: Option, - - /// This is a fixed Address used as a hack key to store information on the node - _storage_address: Address, + /// Sync parameters. + sync_params: RpcSyncParams, } /// RpcBlockchain configuration options @@ -78,8 +78,33 @@ pub struct RpcConfig { pub network: Network, /// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this pub wallet_name: String, - /// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block - pub skip_blocks: Option, + /// Sync parameters + pub sync_params: Option, +} + +/// Sync parameters for Bitcoin Core RPC. +/// +/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with +/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining +/// how the `importdescriptors` RPC calls are to be made. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct RpcSyncParams { + /// The minimum number of scripts to scan for on initial sync. + pub start_script_count: usize, + /// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis). + pub start_time: u64, + /// RPC poll rate (in seconds) to get state updates. + pub poll_rate_sec: u64, +} + +impl Default for RpcSyncParams { + fn default() -> Self { + Self { + start_script_count: 100, + start_time: 0, + poll_rate_sec: 3, + } + } } /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize] @@ -115,27 +140,6 @@ impl From for RpcAuth { } } -impl RpcBlockchain { - fn get_node_synced_height(&self) -> Result { - let info = self.client.get_address_info(&self._storage_address)?; - if let Some(GetAddressInfoResultLabel::Simple(label)) = info.labels.first() { - Ok(label - .parse::() - .unwrap_or_else(|_| self.skip_blocks.unwrap_or(0))) - } else { - Ok(self.skip_blocks.unwrap_or(0)) - } - } - - /// Set the synced height in the core node by using a label of a fixed address so that - /// another client with the same descriptor doesn't rescan the blockchain - fn set_node_synced_height(&self, height: u32) -> Result<(), Error> { - Ok(self - .client - .set_label(&self._storage_address, &height.to_string())?) - } -} - impl Blockchain for RpcBlockchain { fn get_capabilities(&self) -> HashSet { self.capabilities.clone() @@ -178,224 +182,52 @@ impl GetBlockHash for RpcBlockchain { impl WalletSync for RpcBlockchain { fn wallet_setup( &self, - database: &mut D, + db: &mut D, progress_update: Box, ) -> Result<(), Error> { - let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?; - scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?); - debug!( - "importing {} script_pubkeys (some maybe already imported)", - scripts_pubkeys.len() - ); + let db_scripts = db.iter_script_pubkeys(None)?; + // this is a hack to check whether the scripts are coming from a derivable descriptor + // we assume for non-derivable descriptors, the initial script count is always 1 + let is_derivable = db_scripts.len() > 1; + + // ensure db scripts meet start script count requirements + if is_derivable && db_scripts.len() < self.sync_params.start_script_count { + return Err(Error::MissingCachedScripts(MissingCachedScripts { + last_count: db_scripts.len(), + missing_count: self.sync_params.start_script_count - db_scripts.len(), + })); + } + + // this tells Core wallet where to sync from for imported scripts + let start_epoch = db + .get_sync_time()? + .map_or(self.sync_params.start_time, |st| st.block_time.timestamp); + + // import all scripts from db into Core wallet if self.is_descriptors { - // Core still doesn't support complex descriptors like BDK, but when the wallet type is - // "descriptors" we should import individual addresses using `importdescriptors` rather - // than `importmulti`, using the `raw()` descriptor which allows us to specify an - // arbitrary script - let requests = Value::Array( - scripts_pubkeys - .iter() - .map(|s| { - let desc = format!("raw({})", s.to_hex()); - json!({ - "timestamp": "now", - "desc": format!("{}#{}", desc, get_checksum(&desc).unwrap()), - }) - }) - .collect(), - ); - - let res: Vec = self.client.call("importdescriptors", &[requests])?; - res.into_iter() - .map(|v| match v["success"].as_bool() { - Some(true) => Ok(()), - Some(false) => Err(Error::Generic( - v["error"]["message"] - .as_str() - .unwrap_or("Unknown error") - .to_string(), - )), - _ => Err(Error::Generic("Unexpected response from Core".to_string())), - }) - .collect::, _>>()?; + import_descriptors(&self.client, start_epoch, db_scripts.iter())?; } else { - let requests: Vec<_> = scripts_pubkeys - .iter() - .map(|s| ImportMultiRequest { - timestamp: ImportMultiRescanSince::Timestamp(0), - script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)), - watchonly: Some(true), - ..Default::default() - }) - .collect(); - let options = ImportMultiOptions { - rescan: Some(false), - }; - self.client.import_multi(&requests, Some(&options))?; + import_multi(&self.client, start_epoch, db_scripts.iter())?; } - loop { - let current_height = self.get_height()?; + // await sync (TODO: Maybe make this async) + await_wallet_scan( + &self.client, + self.sync_params.poll_rate_sec, + &*progress_update, + )?; - // min because block invalidate may cause height to go down - let node_synced = self.get_node_synced_height()?.min(current_height); + // begin db batch updates + let mut db_batch = db.begin_batch(); - let sync_up_to = node_synced.saturating_add(10_000).min(current_height); + // update batch: obtain db state then update state with core txids + DbState::from_db(db)? + .update_state(&self.client, db)? + .update_batch::(&mut db_batch)?; - debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to); - self.client - .rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?; - progress_update.update((sync_up_to as f32) / (current_height as f32), None)?; - - self.set_node_synced_height(sync_up_to)?; - - if sync_up_to == current_height { - break; - } - } - - self.wallet_sync(database, progress_update) - } - - fn wallet_sync( - &self, - db: &mut D, - _progress_update: Box, - ) -> Result<(), Error> { - let mut indexes = HashMap::new(); - for keykind in &[KeychainKind::External, KeychainKind::Internal] { - indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0)); - } - - let mut known_txs: HashMap<_, _> = db - .iter_txs(true)? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect(); - let known_utxos: HashSet<_> = db.iter_utxos()?.into_iter().collect(); - - //TODO list_since_blocks would be more efficient - let current_utxo = self - .client - .list_unspent(Some(0), None, None, Some(true), None)?; - debug!("current_utxo len {}", current_utxo.len()); - - //TODO supported up to 1_000 txs, should use since_blocks or do paging - let list_txs = self - .client - .list_transactions(None, Some(1_000), None, Some(true))?; - let mut list_txs_ids = HashSet::new(); - - for tx_result in list_txs.iter().filter(|t| { - // list_txs returns all conflicting txs, we want to - // filter out replaced tx => unconfirmed and not in the mempool - t.info.confirmations > 0 || self.client.get_mempool_entry(&t.info.txid).is_ok() - }) { - let txid = tx_result.info.txid; - list_txs_ids.insert(txid); - if let Some(mut known_tx) = known_txs.get_mut(&txid) { - let confirmation_time = - BlockTime::new(tx_result.info.blockheight, tx_result.info.blocktime); - if confirmation_time != known_tx.confirmation_time { - // reorg may change tx height - debug!( - "updating tx({}) confirmation time to: {:?}", - txid, confirmation_time - ); - known_tx.confirmation_time = confirmation_time; - db.set_tx(known_tx)?; - } - } else { - //TODO check there is already the raw tx in db? - let tx_result = self.client.get_transaction(&txid, Some(true))?; - let tx: Transaction = deserialize(&tx_result.hex)?; - let mut received = 0u64; - let mut sent = 0u64; - for output in tx.output.iter() { - if let Ok(Some((kind, index))) = - db.get_path_from_script_pubkey(&output.script_pubkey) - { - if index > *indexes.get(&kind).unwrap() { - indexes.insert(kind, index); - } - received += output.value; - } - } - - for input in tx.input.iter() { - if let Some(previous_output) = db.get_previous_output(&input.previous_output)? { - if db.is_mine(&previous_output.script_pubkey)? { - sent += previous_output.value; - } - } - } - - let td = TransactionDetails { - transaction: Some(tx), - txid: tx_result.info.txid, - confirmation_time: BlockTime::new( - tx_result.info.blockheight, - tx_result.info.blocktime, - ), - received, - sent, - fee: tx_result.fee.map(|f| f.as_sat().unsigned_abs()), - }; - debug!( - "saving tx: {} tx_result.fee:{:?} td.fees:{:?}", - td.txid, tx_result.fee, td.fee - ); - db.set_tx(&td)?; - } - } - - for known_txid in known_txs.keys() { - if !list_txs_ids.contains(known_txid) { - debug!("removing tx: {}", known_txid); - db.del_tx(known_txid, false)?; - } - } - - // Filter out trasactions that are for script pubkeys that aren't in this wallet. - let current_utxos = current_utxo - .into_iter() - .filter_map( - |u| match db.get_path_from_script_pubkey(&u.script_pub_key) { - Err(e) => Some(Err(e)), - Ok(None) => None, - Ok(Some(path)) => Some(Ok(LocalUtxo { - outpoint: OutPoint::new(u.txid, u.vout), - keychain: path.0, - txout: TxOut { - value: u.amount.as_sat(), - script_pubkey: u.script_pub_key, - }, - is_spent: false, - })), - }, - ) - .collect::, Error>>()?; - - let spent: HashSet<_> = known_utxos.difference(¤t_utxos).collect(); - for utxo in spent { - debug!("setting as spent utxo: {:?}", utxo); - let mut spent_utxo = utxo.clone(); - spent_utxo.is_spent = true; - db.set_utxo(&spent_utxo)?; - } - let received: HashSet<_> = current_utxos.difference(&known_utxos).collect(); - for utxo in received { - debug!("adding utxo: {:?}", utxo); - db.set_utxo(utxo)?; - } - - for (keykind, index) in indexes { - debug!("{:?} max {}", keykind, index); - db.set_last_index(keykind, index)?; - } - - Ok(()) + // apply batch updates to db + db.commit_batch(db_batch) } } @@ -464,17 +296,11 @@ impl ConfigurableBlockchain for RpcBlockchain { } } - // this is just a fixed address used only to store a label containing the synced height in the node - let mut storage_address = - Address::from_str("bc1qst0rewf0wm4kw6qn6kv0e5tc56nkf9yhcxlhqv").unwrap(); - storage_address.network = network; - Ok(RpcBlockchain { client, capabilities, is_descriptors, - _storage_address: storage_address, - skip_blocks: config.skip_blocks, + sync_params: config.sync_params.clone().unwrap_or_default(), }) } } @@ -495,6 +321,461 @@ fn list_wallet_dir(client: &Client) -> Result, Error> { Ok(result.wallets.into_iter().map(|n| n.name).collect()) } +/// Represents the state of the [`crate::database::Database`]. +struct DbState { + txs: HashMap, + utxos: HashSet, + last_indexes: HashMap, + + // "deltas" to apply to database + retained_txs: HashSet, // txs to retain (everything else should be deleted) + updated_txs: HashSet, // txs to update + updated_utxos: HashSet, // utxos to update + updated_last_indexes: HashSet, +} + +impl DbState { + /// Obtain [DbState] from [crate::database::Database]. + fn from_db(db: &D) -> Result { + let txs = db + .iter_txs(true)? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect::>(); + let utxos = db.iter_utxos()?.into_iter().collect::>(); + let last_indexes = [KeychainKind::External, KeychainKind::Internal] + .iter() + .filter_map(|keychain| { + db.get_last_index(*keychain) + .map(|v| v.map(|i| (*keychain, i))) + .transpose() + }) + .collect::, Error>>()?; + + let retained_txs = HashSet::with_capacity(txs.len()); + let updated_txs = HashSet::with_capacity(txs.len()); + let updated_utxos = HashSet::with_capacity(utxos.len()); + let updated_last_indexes = HashSet::with_capacity(last_indexes.len()); + + Ok(Self { + txs, + utxos, + last_indexes, + retained_txs, + updated_txs, + updated_utxos, + updated_last_indexes, + }) + } + + /// Update [DbState] with Core wallet state + fn update_state(&mut self, client: &Client, db: &D) -> Result<&mut Self, Error> + where + D: BatchDatabase, + { + let tx_iter = CoreTxIter::new(client, 10); + + for tx_res in tx_iter { + let tx_res = tx_res?; + + let mut updated = false; + + let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| { + updated = true; + TransactionDetails { + txid: tx_res.info.txid, + ..Default::default() + } + }); + + // update raw tx (if needed) + let raw_tx = + match &db_tx.transaction { + Some(raw_tx) => raw_tx, + None => { + updated = true; + db_tx.transaction.insert(client.get_raw_transaction( + &tx_res.info.txid, + tx_res.info.blockhash.as_ref(), + )?) + } + }; + + // update fee (if needed) + if let (None, Some(new_fee)) = (db_tx.fee, tx_res.detail.fee) { + updated = true; + db_tx.fee = Some(new_fee.as_sat().unsigned_abs()); + } + + // update confirmation time (if needed) + let conf_time = BlockTime::new(tx_res.info.blockheight, tx_res.info.blocktime); + if db_tx.confirmation_time != conf_time { + updated = true; + db_tx.confirmation_time = conf_time; + } + + // update received (if needed) + let received = Self::_received_from_raw_tx(db, raw_tx)?; + if db_tx.received != received { + updated = true; + db_tx.received = received; + } + + // check if tx has an immature coinbase output (add to updated UTXOs) + // this is required because `listunspent` does not include immature coinbase outputs + if tx_res.detail.category == GetTransactionResultDetailCategory::Immature { + // let vout = tx_res.detail.vout; + // let txout = raw_tx.output.get(vout as usize).cloned().ok_or_else(|| { + // Error::Generic(format!( + // "Core RPC returned detail with invalid vout '{}' for tx '{}'", + // vout, tx_res.info.txid, + // )) + // })?; + // println!("got immature detail!"); + + // if let Some((keychain, _)) = db.get_path_from_script_pubkey(&txout.script_pubkey)? { + // let utxo = LocalUtxo { + // outpoint: OutPoint::new(tx_res.info.txid, d.vout), + // txout, + // keychain, + // is_spent: false, + // }; + // self.updated_utxos.insert(utxo); + // } + } + + // update tx deltas + self.retained_txs.insert(tx_res.info.txid); + if updated { + self.updated_txs.insert(tx_res.info.txid); + } + } + + // update sent from tx inputs + let sent_updates = self + .txs + .values() + .filter_map(|db_tx| { + let txid = self.retained_txs.get(&db_tx.txid)?; + self._sent_from_raw_tx(db, db_tx.transaction.as_ref()?) + .map(|sent| { + if db_tx.sent != sent { + Some((*txid, sent)) + } else { + None + } + }) + .transpose() + }) + .collect::, _>>()?; + + // record send updates + sent_updates.into_iter().for_each(|(txid, sent)| { + self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent); + self.updated_txs.insert(txid); + }); + + // obtain UTXOs from Core wallet + let core_utxos = client + .list_unspent(Some(0), None, None, Some(true), None)? + .into_iter() + .filter_map(|utxo_res| { + db.get_path_from_script_pubkey(&utxo_res.script_pub_key) + .transpose() + .map(|v| { + v.map(|(keychain, index)| { + // update last index if needed + self._update_last_index(keychain, index); + + LocalUtxo { + outpoint: OutPoint::new(utxo_res.txid, utxo_res.vout), + keychain, + txout: TxOut { + value: utxo_res.amount.as_sat(), + script_pubkey: utxo_res.script_pub_key, + }, + is_spent: false, + } + }) + }) + }) + .collect::, Error>>()?; + + // mark "spent utxos" to be updated in database + let spent_utxos = self.utxos.difference(&core_utxos).cloned().map(|mut utxo| { + utxo.is_spent = true; + utxo + }); + + // mark new utxos to be added in database + let new_utxos = core_utxos.difference(&self.utxos).cloned(); + + // add to updated utxos + self.updated_utxos = spent_utxos.chain(new_utxos).collect(); + + Ok(self) + } + + /// We want to filter out conflicting transactions. + /// Only accept transactions that are already confirmed, or existing in mempool. + fn _filter_tx(client: &Client, res: GetTransactionResult) -> Option { + if res.info.confirmations > 0 || client.get_mempool_entry(&res.info.txid).is_ok() { + Some(res) + } else { + debug!("tx filtered: {}", res.info.txid); + None + } + } + + /// Calculates received amount from raw tx. + fn _received_from_raw_tx(db: &D, raw_tx: &Transaction) -> Result { + raw_tx.output.iter().try_fold(0_u64, |recv, txo| { + let v = if db.is_mine(&txo.script_pubkey)? { + txo.value + } else { + 0 + }; + Ok(recv + v) + }) + } + + /// Calculates sent from raw tx. + fn _sent_from_raw_tx( + &self, + db: &D, + raw_tx: &Transaction, + ) -> Result { + raw_tx.input.iter().try_fold(0_u64, |sent, txin| { + let v = match self._previous_output(&txin.previous_output) { + Some(prev_txo) => { + if db.is_mine(&prev_txo.script_pubkey)? { + prev_txo.value + } else { + 0 + } + } + None => 0_u64, + }; + Ok(sent + v) + }) + } + + fn _previous_output(&self, outpoint: &OutPoint) -> Option<&TxOut> { + let prev_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?; + prev_tx.output.get(outpoint.vout as usize) + } + + fn _update_last_index(&mut self, keychain: KeychainKind, index: u32) { + let mut updated = false; + + self.last_indexes + .entry(keychain) + .and_modify(|last| { + if *last < index { + updated = true; + *last = index; + } + }) + .or_insert_with(|| { + updated = true; + index + }); + + if updated { + self.updated_last_indexes.insert(keychain); + } + } + + /// Prepare db batch operations. + fn update_batch(&self, batch: &mut D::Batch) -> Result<(), Error> { + // delete stale txs from db + // stale = not retained + self.txs + .keys() + .filter(|&txid| !self.retained_txs.contains(txid)) + .try_for_each(|txid| batch.del_tx(txid, false).map(|_| ()))?; + + // update txs + self.updated_txs + .iter() + .filter_map(|txid| self.txs.get(txid)) + .try_for_each(|txd| batch.set_tx(txd))?; + + // update utxos + self.updated_utxos + .iter() + .try_for_each(|utxo| batch.set_utxo(utxo))?; + + // update last indexes + self.updated_last_indexes + .iter() + .map(|keychain| self.last_indexes.get_key_value(keychain).unwrap()) + .try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?; + + Ok(()) + } +} + +fn import_descriptors<'a, S>( + client: &Client, + start_epoch: u64, + scripts_iter: S, +) -> Result<(), Error> +where + S: Iterator, +{ + let requests = Value::Array( + scripts_iter + .map(|script| { + let desc = descriptor_from_script_pubkey(script); + json!({ "timestamp": start_epoch, "desc": desc }) + }) + .collect(), + ); + for v in client.call::>("importdescriptors", &[requests])? { + match v["success"].as_bool() { + Some(true) => continue, + Some(false) => { + return Err(Error::Generic( + v["error"]["message"] + .as_str() + .map_or("unknown error".into(), ToString::to_string), + )) + } + _ => return Err(Error::Generic("Unexpected response form Core".to_string())), + } + } + Ok(()) +} + +fn import_multi<'a, S>(client: &Client, start_epoch: u64, scripts_iter: S) -> Result<(), Error> +where + S: Iterator, +{ + let requests = scripts_iter + .map(|script| ImportMultiRequest { + timestamp: ImportMultiRescanSince::Timestamp(start_epoch), + script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(script)), + watchonly: Some(true), + ..Default::default() + }) + .collect::>(); + let options = ImportMultiOptions { rescan: Some(true) }; + for v in client.import_multi(&requests, Some(&options))? { + if let Some(err) = v.error { + return Err(Error::Generic(format!( + "{} (code: {})", + err.message, err.code + ))); + } + } + Ok(()) +} + +struct CoreTxIter<'a> { + client: &'a Client, + page_size: usize, + page_index: usize, + + stack: Vec, + done: bool, +} + +impl<'a> CoreTxIter<'a> { + fn new(client: &'a Client, page_size: usize) -> Self { + Self { + client, + page_size, + page_index: 0, + stack: Vec::with_capacity(page_size), + done: false, + } + } + + /// We want to filter out conflicting transactions. + /// Only accept transactions that are already confirmed, or existing in mempool. + fn tx_ok(&self, item: &ListTransactionResult) -> bool { + item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok() + } +} + +impl<'a> Iterator for CoreTxIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + if self.done { + return None; + } + + if let Some(item) = self.stack.pop() { + if self.tx_ok(&item) { + return Some(Ok(item)); + } + } + + let res = self + .client + .list_transactions( + None, + Some(self.page_size), + Some(self.page_size * self.page_index), + Some(true), + ) + .map_err(Error::Rpc); + + self.page_index += 1; + + let list = match res { + Ok(list) => list, + Err(err) => { + self.done = true; + return Some(Err(err)); + } + }; + + if list.is_empty() { + self.done = true; + return None; + } + + self.stack = list; + } + } +} + +fn get_scanning_details(client: &Client) -> Result { + #[derive(Deserialize)] + struct CallResult { + scanning: ScanningDetails, + } + let result: CallResult = client.call("getwalletinfo", &[])?; + Ok(result.scanning) +} + +fn await_wallet_scan( + client: &Client, + poll_rate_sec: u64, + progress_update: &dyn Progress, +) -> Result<(), Error> { + let dur = Duration::from_secs(poll_rate_sec); + loop { + match get_scanning_details(client)? { + ScanningDetails::Scanning { duration, progress } => { + println!("scanning: duration={}, progress={}", duration, progress); + progress_update + .update(progress, Some(format!("elapsed for {} seconds", duration)))?; + thread::sleep(dur); + } + ScanningDetails::NotScanning(_) => { + progress_update.update(1.0, None)?; + println!("scanning: done!"); + return Ok(()); + } + }; + } +} + /// Returns whether a wallet is legacy or descriptors by calling `getwalletinfo`. /// /// This API is mapped by bitcoincore_rpc, but it doesn't have the fields we need (either @@ -509,6 +790,11 @@ fn is_wallet_descriptor(client: &Client) -> Result { Ok(result.descriptors.unwrap_or(false)) } +fn descriptor_from_script_pubkey(script: &Script) -> String { + let desc = format!("raw({})", script.to_hex()); + format!("{}#{}", desc, get_checksum(&desc).unwrap()) +} + /// Factory of [`RpcBlockchain`] instances, implements [`BlockchainFactory`] /// /// Internally caches the node url and authentication params and allows getting many different [`RpcBlockchain`] @@ -529,6 +815,7 @@ fn is_wallet_descriptor(client: &Client) -> Result { /// network: Network::Testnet, /// wallet_name_prefix: Some("prefix-".to_string()), /// default_skip_blocks: 100_000, +/// sync_params: None, /// }; /// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?; /// # Ok(()) @@ -546,6 +833,8 @@ pub struct RpcBlockchainFactory { pub wallet_name_prefix: Option, /// Default number of blocks to skip which will be inherited by blockchain unless overridden pub default_skip_blocks: u32, + /// Sync parameters + pub sync_params: Option, } impl BlockchainFactory for RpcBlockchainFactory { @@ -554,7 +843,7 @@ impl BlockchainFactory for RpcBlockchainFactory { fn build( &self, checksum: &str, - override_skip_blocks: Option, + _override_skip_blocks: Option, ) -> Result { RpcBlockchain::from_config(&RpcConfig { url: self.url.clone(), @@ -565,7 +854,7 @@ impl BlockchainFactory for RpcBlockchainFactory { self.wallet_name_prefix.as_ref().unwrap_or(&String::new()), checksum ), - skip_blocks: Some(override_skip_blocks.unwrap_or(self.default_skip_blocks)), + sync_params: self.sync_params.clone(), }) } } @@ -586,7 +875,7 @@ mod test { auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() }, network: Network::Regtest, wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ), - skip_blocks: None, + sync_params: None, }; RpcBlockchain::from_config(&config).unwrap() } @@ -603,6 +892,7 @@ mod test { network: Network::Regtest, wallet_name_prefix: Some("prefix-".into()), default_skip_blocks: 0, + sync_params: None, }; (test_client, factory) @@ -613,7 +903,6 @@ mod test { let (_test_client, factory) = get_factory(); let a = factory.build("aaaaaa", None).unwrap(); - assert_eq!(a.skip_blocks, Some(0)); assert_eq!( a.client .get_wallet_info() @@ -623,7 +912,6 @@ mod test { ); let b = factory.build("bbbbbb", Some(100)).unwrap(); - assert_eq!(b.skip_blocks, Some(100)); assert_eq!( b.client .get_wallet_info() diff --git a/src/testutils/blockchain_tests.rs b/src/testutils/blockchain_tests.rs index 89e09133..ed73a299 100644 --- a/src/testutils/blockchain_tests.rs +++ b/src/testutils/blockchain_tests.rs @@ -1057,6 +1057,7 @@ macro_rules! bdk_blockchain_tests { let (wallet, blockchain, _, mut test_client) = init_single_sig(); let wallet_addr = wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address; + println!("wallet addr: {}", wallet_addr); wallet.sync(&blockchain, SyncOptions::default()).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 0, "incorrect balance"); @@ -1070,7 +1071,6 @@ macro_rules! bdk_blockchain_tests { test_client.generate(100, Some(node_addr)); } - wallet.sync(&blockchain, SyncOptions::default()).unwrap(); assert!(wallet.get_balance().unwrap() > 0, "incorrect balance after receiving coinbase"); } @@ -1267,7 +1267,7 @@ macro_rules! bdk_blockchain_tests { wallet.sync(&blockchain, SyncOptions::default()).unwrap(); let _ = test_client.receive(testutils! { - @tx ( (@external descriptors, 0) => 50_000 ) + @tx ( (@external descriptors, 0) => 50_000 ) }); wallet.sync(&blockchain, SyncOptions::default()).unwrap();