Replace rpc::CoreTxIter with list_transactions fn.

This fixes a bug where `CoreTxIter` attempts to call `listtransactions`
immediately after a tx result is filtered (instead of being returned),
when in fact, the correct logic will be to pop another tx result.

The new logic also ensures that tx results are returned in chonological
order. The test `test_list_transactions` verifies this. We also now
ensure that `page_size` is between the range `[0 to 1000]` otherwise an
error is returned.

Some needless cloning is removed from `from_config` as well as logging
improvements.
This commit is contained in:
志宇 2022-08-06 10:58:04 +08:00
parent 5eeba6cced
commit 74e2c477f1
No known key found for this signature in database
GPG Key ID: F6345C9837C2BDE8

View File

@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain {
/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum /// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded /// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
fn from_config(config: &Self::Config) -> Result<Self, Error> { fn from_config(config: &Self::Config) -> Result<Self, Error> {
let wallet_name = config.wallet_name.clone(); let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?; let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let rpc_version = client.version()?; let rpc_version = client.version()?;
let loaded_wallets = client.list_wallets()?; info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name); if client.list_wallets()?.contains(&config.wallet_name) {
} else if list_wallet_dir(&client)?.contains(&wallet_name) { info!("wallet already loaded: {}", config.wallet_name);
client.load_wallet(&wallet_name)?; } else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
debug!("wallet loaded {:?}", wallet_name); client.load_wallet(&config.wallet_name)?;
info!("wallet loaded: {}", config.wallet_name);
} else { } else {
// pre-0.21 use legacy wallets // pre-0.21 use legacy wallets
if rpc_version < 210_000 { if rpc_version < 210_000 {
client.create_wallet(&wallet_name, Some(true), None, None, None)?; client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
} else { } else {
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed // TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
let args = [ let args = [
Value::String(wallet_name.clone()), Value::String(config.wallet_name.clone()),
Value::Bool(true), Value::Bool(true),
Value::Bool(false), Value::Bool(false),
Value::Null, Value::Null,
@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
let _: Value = client.call("createwallet", &args)?; let _: Value = client.call("createwallet", &args)?;
} }
debug!("wallet created {:?}", wallet_name); info!("wallet created: {}", config.wallet_name);
} }
let is_descriptors = is_wallet_descriptor(&client)?; let is_descriptors = is_wallet_descriptor(&client)?;
@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
// wait for Core wallet to rescan (TODO: maybe make this async) // wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
// loop through results of Core RPC method `listtransactions` // obtain iterator of pagenated `listtransactions` RPC calls
for tx_res in CoreTxIter::new(client, 100) { const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_res = tx_res?; let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
});
// iterate through chronological results of `listtransactions`
for tx_res in tx_iter {
let mut updated = false; let mut updated = false;
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| { let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
@ -695,81 +701,53 @@ where
Ok(()) Ok(())
} }
/// Iterates through results of multiple `listtransactions` calls. /// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
struct CoreTxIter<'a> { /// in chronological order.
client: &'a Client, ///
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
fn list_transactions(
client: &Client,
page_size: usize, page_size: usize,
page_index: usize, ) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
if !(1..=1000).contains(&page_size) {
stack: Vec<ListTransactionResult>, return Err(Error::Generic(format!(
done: bool, "Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
} page_size
)));
impl<'a> CoreTxIter<'a> {
fn new(client: &'a Client, mut page_size: usize) -> Self {
if page_size > 1000 {
page_size = 1000;
}
Self {
client,
page_size,
page_index: 0,
stack: Vec::with_capacity(page_size),
done: false,
}
} }
/// We want to filter out conflicting transactions. // `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
/// Only accept transactions that are already confirmed, or existing in mempool. let mut got_err = false;
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
}
}
impl<'a> Iterator for CoreTxIter<'a> { // obtain results in batches (of `page_size`)
type Item = Result<ListTransactionResult, Error>; let nested_list = (0_usize..)
.map(|page_index| {
fn next(&mut self) -> Option<Self::Item> { client.list_transactions(
loop { None,
if self.done { Some(page_size),
return None; Some(page_size * page_index),
Some(true),
)
})
// take until returned rpc call is empty or until error
// TODO: replace with the following when MSRV is 1.57.0:
// `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
.take_while(|res| {
if got_err || matches!(res, Ok(list) if list.is_empty()) {
// break if last iteration was an error, or if the current result is empty
false
} else {
// record whether result is error or not
got_err = res.is_err();
// continue on non-empty result or first error
true
} }
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::Rpc)?;
if let Some(item) = self.stack.pop() { // reverse here to have txs in chronological order
if self.keep_tx(&item) { Ok(nested_list.into_iter().rev().flatten())
return Some(Ok(item));
}
}
let res = self
.client
.list_transactions(
None,
Some(self.page_size),
Some(self.page_size * self.page_index),
Some(true),
)
.map_err(Error::Rpc);
self.page_index += 1;
let list = match res {
Ok(list) => list,
Err(err) => {
self.done = true;
return Some(Err(err));
}
};
if list.is_empty() {
self.done = true;
return None;
}
self.stack = list;
}
}
} }
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> { fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory {
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))] #[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
mod test { mod test {
use super::*; use super::*;
use crate::testutils::blockchain_tests::TestClient; use crate::{
descriptor::{into_wallet_descriptor_checked, AsDerived},
testutils::blockchain_tests::TestClient,
wallet::utils::SecpCtx,
};
use bitcoin::Network; use bitcoin::{Address, Network};
use bitcoincore_rpc::RpcApi; use bitcoincore_rpc::RpcApi;
use log::LevelFilter;
use miniscript::DescriptorTrait;
crate::bdk_blockchain_tests! { crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain { fn test_instance(test_client: &TestClient) -> RpcBlockchain {
@ -942,4 +926,63 @@ mod test {
"prefix-bbbbbb" "prefix-bbbbbb"
); );
} }
/// This test ensures that [list_transactions] always iterates through transactions in
/// chronological order, independent of the `page_size`.
#[test]
fn test_list_transactions() {
let _ = env_logger::builder()
.filter_level(LevelFilter::Info)
.default_format()
.try_init();
const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
const AMOUNT_PER_TX: u64 = 10_000;
const TX_COUNT: u32 = 50;
let secp = SecpCtx::default();
let network = Network::Regtest;
let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
let (mut test_client, factory) = get_factory();
let bc = factory.build("itertest", None).unwrap();
// generate scripts (1 tx per script)
let scripts = (0..TX_COUNT)
.map(|index| desc.as_derived(index, &secp).script_pubkey())
.collect::<Vec<_>>();
// import scripts and wait
if bc.is_descriptors {
import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
} else {
import_multi(&bc.client, 0, scripts.iter()).unwrap();
}
await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
// create and broadcast txs
let expected_txids = scripts
.iter()
.map(|script| {
let addr = Address::from_script(script, network).unwrap();
let txid =
test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
test_client.generate(1, None);
txid
})
.collect::<Vec<_>>();
// iterate through different page sizes - should always return txs in chronological order
[1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
println!("trying with page_size: {}", page_size);
let txids = list_transactions(&bc.client, *page_size)
.unwrap()
.map(|res| res.info.txid)
.collect::<Vec<_>>();
assert_eq!(txids.len(), expected_txids.len());
assert_eq!(txids, expected_txids);
});
}
} }