Merge bitcoindevkit/bdk#704: Fix rpc::CoreTxIter
logic.
74e2c477f124489a2357921ca879cc82e24da5fd Replace `rpc::CoreTxIter` with `list_transactions` fn. (志宇) Pull request description: ### Description This fixes a bug where `CoreTxIter` attempts to call `listtransactions` immediately after a tx result is filtered (instead of being returned), when in fact, the correct logic will be to pop another tx result. The new logic also ensures that tx results are returned in chonological order. The test `test_list_transactions` verifies this. We also now ensure that `page_size` is between the range `[0 to 1000]` otherwise an error is returned. Some needless cloning is removed from `from_config` as well as logging improvements. ### Notes to the reviewers This is an oversight by me (sorry) for PR #683 ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### Bugfixes: ~* [ ] This pull request breaks the existing API~ * [x] I've added tests to reproduce the issue which are now passing * [x] I'm linking the issue being fixed by this PR ACKs for top commit: afilini: ACK 74e2c477f124489a2357921ca879cc82e24da5fd Tree-SHA512: f32314a9947067673d19d95da8cde36b350c0bb0ebe0924405ad50602c14590f7ccb09a3e03cdfdd227f938dccd0f556f3a2b4dd7fdd6eba1591c0f8d3e65182
This commit is contained in:
commit
d9adfbe047
@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain {
|
||||
/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
|
||||
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
|
||||
fn from_config(config: &Self::Config) -> Result<Self, Error> {
|
||||
let wallet_name = config.wallet_name.clone();
|
||||
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
|
||||
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
|
||||
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
|
||||
|
||||
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
|
||||
let rpc_version = client.version()?;
|
||||
|
||||
let loaded_wallets = client.list_wallets()?;
|
||||
if loaded_wallets.contains(&wallet_name) {
|
||||
debug!("wallet already loaded {:?}", wallet_name);
|
||||
} else if list_wallet_dir(&client)?.contains(&wallet_name) {
|
||||
client.load_wallet(&wallet_name)?;
|
||||
debug!("wallet loaded {:?}", wallet_name);
|
||||
info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
|
||||
|
||||
if client.list_wallets()?.contains(&config.wallet_name) {
|
||||
info!("wallet already loaded: {}", config.wallet_name);
|
||||
} else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
|
||||
client.load_wallet(&config.wallet_name)?;
|
||||
info!("wallet loaded: {}", config.wallet_name);
|
||||
} else {
|
||||
// pre-0.21 use legacy wallets
|
||||
if rpc_version < 210_000 {
|
||||
client.create_wallet(&wallet_name, Some(true), None, None, None)?;
|
||||
client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
|
||||
} else {
|
||||
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
|
||||
let args = [
|
||||
Value::String(wallet_name.clone()),
|
||||
Value::String(config.wallet_name.clone()),
|
||||
Value::Bool(true),
|
||||
Value::Bool(false),
|
||||
Value::Null,
|
||||
@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
|
||||
let _: Value = client.call("createwallet", &args)?;
|
||||
}
|
||||
|
||||
debug!("wallet created {:?}", wallet_name);
|
||||
info!("wallet created: {}", config.wallet_name);
|
||||
}
|
||||
|
||||
let is_descriptors = is_wallet_descriptor(&client)?;
|
||||
@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
|
||||
// wait for Core wallet to rescan (TODO: maybe make this async)
|
||||
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
|
||||
|
||||
// loop through results of Core RPC method `listtransactions`
|
||||
for tx_res in CoreTxIter::new(client, 100) {
|
||||
let tx_res = tx_res?;
|
||||
// obtain iterator of pagenated `listtransactions` RPC calls
|
||||
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
|
||||
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
|
||||
// filter out conflicting transactions - only accept transactions that are already
|
||||
// confirmed, or exists in mempool
|
||||
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
|
||||
});
|
||||
|
||||
// iterate through chronological results of `listtransactions`
|
||||
for tx_res in tx_iter {
|
||||
let mut updated = false;
|
||||
|
||||
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
|
||||
@ -695,81 +701,53 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterates through results of multiple `listtransactions` calls.
|
||||
struct CoreTxIter<'a> {
|
||||
client: &'a Client,
|
||||
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
|
||||
/// in chronological order.
|
||||
///
|
||||
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
|
||||
fn list_transactions(
|
||||
client: &Client,
|
||||
page_size: usize,
|
||||
page_index: usize,
|
||||
|
||||
stack: Vec<ListTransactionResult>,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
impl<'a> CoreTxIter<'a> {
|
||||
fn new(client: &'a Client, mut page_size: usize) -> Self {
|
||||
if page_size > 1000 {
|
||||
page_size = 1000;
|
||||
}
|
||||
|
||||
Self {
|
||||
client,
|
||||
page_size,
|
||||
page_index: 0,
|
||||
stack: Vec::with_capacity(page_size),
|
||||
done: false,
|
||||
}
|
||||
) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
|
||||
if !(1..=1000).contains(&page_size) {
|
||||
return Err(Error::Generic(format!(
|
||||
"Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
|
||||
page_size
|
||||
)));
|
||||
}
|
||||
|
||||
/// We want to filter out conflicting transactions.
|
||||
/// Only accept transactions that are already confirmed, or existing in mempool.
|
||||
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
|
||||
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
|
||||
}
|
||||
}
|
||||
// `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
|
||||
let mut got_err = false;
|
||||
|
||||
impl<'a> Iterator for CoreTxIter<'a> {
|
||||
type Item = Result<ListTransactionResult, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
if self.done {
|
||||
return None;
|
||||
// obtain results in batches (of `page_size`)
|
||||
let nested_list = (0_usize..)
|
||||
.map(|page_index| {
|
||||
client.list_transactions(
|
||||
None,
|
||||
Some(page_size),
|
||||
Some(page_size * page_index),
|
||||
Some(true),
|
||||
)
|
||||
})
|
||||
// take until returned rpc call is empty or until error
|
||||
// TODO: replace with the following when MSRV is 1.57.0:
|
||||
// `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
|
||||
.take_while(|res| {
|
||||
if got_err || matches!(res, Ok(list) if list.is_empty()) {
|
||||
// break if last iteration was an error, or if the current result is empty
|
||||
false
|
||||
} else {
|
||||
// record whether result is error or not
|
||||
got_err = res.is_err();
|
||||
// continue on non-empty result or first error
|
||||
true
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(Error::Rpc)?;
|
||||
|
||||
if let Some(item) = self.stack.pop() {
|
||||
if self.keep_tx(&item) {
|
||||
return Some(Ok(item));
|
||||
}
|
||||
}
|
||||
|
||||
let res = self
|
||||
.client
|
||||
.list_transactions(
|
||||
None,
|
||||
Some(self.page_size),
|
||||
Some(self.page_size * self.page_index),
|
||||
Some(true),
|
||||
)
|
||||
.map_err(Error::Rpc);
|
||||
|
||||
self.page_index += 1;
|
||||
|
||||
let list = match res {
|
||||
Ok(list) => list,
|
||||
Err(err) => {
|
||||
self.done = true;
|
||||
return Some(Err(err));
|
||||
}
|
||||
};
|
||||
|
||||
if list.is_empty() {
|
||||
self.done = true;
|
||||
return None;
|
||||
}
|
||||
|
||||
self.stack = list;
|
||||
}
|
||||
}
|
||||
// reverse here to have txs in chronological order
|
||||
Ok(nested_list.into_iter().rev().flatten())
|
||||
}
|
||||
|
||||
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
|
||||
@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory {
|
||||
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::testutils::blockchain_tests::TestClient;
|
||||
use crate::{
|
||||
descriptor::{into_wallet_descriptor_checked, AsDerived},
|
||||
testutils::blockchain_tests::TestClient,
|
||||
wallet::utils::SecpCtx,
|
||||
};
|
||||
|
||||
use bitcoin::Network;
|
||||
use bitcoin::{Address, Network};
|
||||
use bitcoincore_rpc::RpcApi;
|
||||
use log::LevelFilter;
|
||||
use miniscript::DescriptorTrait;
|
||||
|
||||
crate::bdk_blockchain_tests! {
|
||||
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
|
||||
@ -942,4 +926,63 @@ mod test {
|
||||
"prefix-bbbbbb"
|
||||
);
|
||||
}
|
||||
|
||||
/// This test ensures that [list_transactions] always iterates through transactions in
|
||||
/// chronological order, independent of the `page_size`.
|
||||
#[test]
|
||||
fn test_list_transactions() {
|
||||
let _ = env_logger::builder()
|
||||
.filter_level(LevelFilter::Info)
|
||||
.default_format()
|
||||
.try_init();
|
||||
|
||||
const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
|
||||
const AMOUNT_PER_TX: u64 = 10_000;
|
||||
const TX_COUNT: u32 = 50;
|
||||
|
||||
let secp = SecpCtx::default();
|
||||
let network = Network::Regtest;
|
||||
let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
|
||||
|
||||
let (mut test_client, factory) = get_factory();
|
||||
let bc = factory.build("itertest", None).unwrap();
|
||||
|
||||
// generate scripts (1 tx per script)
|
||||
let scripts = (0..TX_COUNT)
|
||||
.map(|index| desc.as_derived(index, &secp).script_pubkey())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// import scripts and wait
|
||||
if bc.is_descriptors {
|
||||
import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
|
||||
} else {
|
||||
import_multi(&bc.client, 0, scripts.iter()).unwrap();
|
||||
}
|
||||
await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
|
||||
|
||||
// create and broadcast txs
|
||||
let expected_txids = scripts
|
||||
.iter()
|
||||
.map(|script| {
|
||||
let addr = Address::from_script(script, network).unwrap();
|
||||
let txid =
|
||||
test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
|
||||
test_client.generate(1, None);
|
||||
txid
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// iterate through different page sizes - should always return txs in chronological order
|
||||
[1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
|
||||
println!("trying with page_size: {}", page_size);
|
||||
|
||||
let txids = list_transactions(&bc.client, *page_size)
|
||||
.unwrap()
|
||||
.map(|res| res.info.txid)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(txids.len(), expected_txids.len());
|
||||
assert_eq!(txids, expected_txids);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user