Merge branch 'release/0.21.0', commit 'refs/pull/704/head' of github.com:bitcoindevkit/bdk into release/0.21.0

This commit is contained in:
Alekos Filini 2022-08-09 11:54:23 +02:00
commit 2db881519a
No known key found for this signature in database
GPG Key ID: 431401E4A4530061

View File

@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain {
/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_name = config.wallet_name.clone();
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let rpc_version = client.version()?;
let loaded_wallets = client.list_wallets()?;
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name);
} else if list_wallet_dir(&client)?.contains(&wallet_name) {
client.load_wallet(&wallet_name)?;
debug!("wallet loaded {:?}", wallet_name);
info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
if client.list_wallets()?.contains(&config.wallet_name) {
info!("wallet already loaded: {}", config.wallet_name);
} else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
client.load_wallet(&config.wallet_name)?;
info!("wallet loaded: {}", config.wallet_name);
} else {
// pre-0.21 use legacy wallets
if rpc_version < 210_000 {
client.create_wallet(&wallet_name, Some(true), None, None, None)?;
client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
} else {
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
let args = [
Value::String(wallet_name.clone()),
Value::String(config.wallet_name.clone()),
Value::Bool(true),
Value::Bool(false),
Value::Null,
@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
let _: Value = client.call("createwallet", &args)?;
}
debug!("wallet created {:?}", wallet_name);
info!("wallet created: {}", config.wallet_name);
}
let is_descriptors = is_wallet_descriptor(&client)?;
@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
// loop through results of Core RPC method `listtransactions`
for tx_res in CoreTxIter::new(client, 100) {
let tx_res = tx_res?;
// obtain iterator of pagenated `listtransactions` RPC calls
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
});
// iterate through chronological results of `listtransactions`
for tx_res in tx_iter {
let mut updated = false;
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
@ -695,81 +701,53 @@ where
Ok(())
}
/// Iterates through results of multiple `listtransactions` calls.
struct CoreTxIter<'a> {
client: &'a Client,
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
/// in chronological order.
///
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
fn list_transactions(
client: &Client,
page_size: usize,
page_index: usize,
stack: Vec<ListTransactionResult>,
done: bool,
}
impl<'a> CoreTxIter<'a> {
fn new(client: &'a Client, mut page_size: usize) -> Self {
if page_size > 1000 {
page_size = 1000;
}
Self {
client,
page_size,
page_index: 0,
stack: Vec::with_capacity(page_size),
done: false,
}
) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
if !(1..=1000).contains(&page_size) {
return Err(Error::Generic(format!(
"Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
page_size
)));
}
/// We want to filter out conflicting transactions.
/// Only accept transactions that are already confirmed, or existing in mempool.
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
}
}
// `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
let mut got_err = false;
impl<'a> Iterator for CoreTxIter<'a> {
type Item = Result<ListTransactionResult, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done {
return None;
// obtain results in batches (of `page_size`)
let nested_list = (0_usize..)
.map(|page_index| {
client.list_transactions(
None,
Some(page_size),
Some(page_size * page_index),
Some(true),
)
})
// take until returned rpc call is empty or until error
// TODO: replace with the following when MSRV is 1.57.0:
// `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
.take_while(|res| {
if got_err || matches!(res, Ok(list) if list.is_empty()) {
// break if last iteration was an error, or if the current result is empty
false
} else {
// record whether result is error or not
got_err = res.is_err();
// continue on non-empty result or first error
true
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::Rpc)?;
if let Some(item) = self.stack.pop() {
if self.keep_tx(&item) {
return Some(Ok(item));
}
}
let res = self
.client
.list_transactions(
None,
Some(self.page_size),
Some(self.page_size * self.page_index),
Some(true),
)
.map_err(Error::Rpc);
self.page_index += 1;
let list = match res {
Ok(list) => list,
Err(err) => {
self.done = true;
return Some(Err(err));
}
};
if list.is_empty() {
self.done = true;
return None;
}
self.stack = list;
}
}
// reverse here to have txs in chronological order
Ok(nested_list.into_iter().rev().flatten())
}
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory {
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
mod test {
use super::*;
use crate::testutils::blockchain_tests::TestClient;
use crate::{
descriptor::{into_wallet_descriptor_checked, AsDerived},
testutils::blockchain_tests::TestClient,
wallet::utils::SecpCtx,
};
use bitcoin::Network;
use bitcoin::{Address, Network};
use bitcoincore_rpc::RpcApi;
use log::LevelFilter;
use miniscript::DescriptorTrait;
crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
@ -942,4 +926,63 @@ mod test {
"prefix-bbbbbb"
);
}
/// This test ensures that [list_transactions] always iterates through transactions in
/// chronological order, independent of the `page_size`.
#[test]
fn test_list_transactions() {
let _ = env_logger::builder()
.filter_level(LevelFilter::Info)
.default_format()
.try_init();
const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
const AMOUNT_PER_TX: u64 = 10_000;
const TX_COUNT: u32 = 50;
let secp = SecpCtx::default();
let network = Network::Regtest;
let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
let (mut test_client, factory) = get_factory();
let bc = factory.build("itertest", None).unwrap();
// generate scripts (1 tx per script)
let scripts = (0..TX_COUNT)
.map(|index| desc.as_derived(index, &secp).script_pubkey())
.collect::<Vec<_>>();
// import scripts and wait
if bc.is_descriptors {
import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
} else {
import_multi(&bc.client, 0, scripts.iter()).unwrap();
}
await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
// create and broadcast txs
let expected_txids = scripts
.iter()
.map(|script| {
let addr = Address::from_script(script, network).unwrap();
let txid =
test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
test_client.generate(1, None);
txid
})
.collect::<Vec<_>>();
// iterate through different page sizes - should always return txs in chronological order
[1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
println!("trying with page_size: {}", page_size);
let txids = list_transactions(&bc.client, *page_size)
.unwrap()
.map(|res| res.info.txid)
.collect::<Vec<_>>();
assert_eq!(txids.len(), expected_txids.len());
assert_eq!(txids, expected_txids);
});
}
}