Merge bitcoindevkit/bdk#461: Restructure electrum/esplora sync logic

9c5770831de58760066dbbb26c0785632dde4991 Make stop_gap a parameter to EsploraBlockchainConfig::new (LLFourn)
0f0a01a742448fa9db67752722fed7d1c028117b s/vin/vout/ (LLFourn)
1a64fd9c9595566366cfe96cc20d8ba2f4da1bd3 Delete src/blockchain/utils.rs (LLFourn)
d3779fac73321f41b82897e6ab69708c957212b2 Fix comments (LLFourn)
d39401162fb72a9a02999908ac0347685d8861a5 Less intermediary data states in sync (LLFourn)
dfb63d389b2804cb55da60de0d8fcad985d50e5c s/observed_txs/finished_txs/g (LLFourn)
188d9a4a8b502b3f681f4c83fd9933b9a952e42c Make variable names consistent (LLFourn)
5eadf5ccf9722aa331008304221e981d01bca1c1 Add some logging to script_sync (LLFourn)
aaad560a91872318890208c4b3d5cb73a63029a8 Always get up to chunk_size heights to request headers for (LLFourn)
e7c13575c88be4fce1f407eae3710cb06c5db039 Don't request conftime during tx request (LLFourn)
808d7d8463c56cbcda0bd55a5eba422cf5080ce7 Update changelog (LLFourn)
732166fcb689ad666eab02a4bb28c33018f7b819 Fix feerate calculation for esplora (LLFourn)
3f5cb6997f8bf0c889747badb9498a402ecf96a4 Invert dependencies in electrum sync (LLFourn)

Pull request description:

  ## Description

  This PR does dependency inversion on the previous sync logic for electrum and esplora captured in the trait `ElectrumLikeSync`. This means that the sync logic does not reference the blockchain at all. Instead the blockchain asks the sync logic (in `script_sync.rs`) what it needs to continue the sync and tries to retrieve it.

  The initial purpose of doing this is to remove invocations of `maybe_await` in the abstract sync logic in preparation for completely removing `maybe_await` in the future. The other major benefit is it gives a lot more freedom for the esplora logic to use the rich data from the responses to complete the sync with less HTTP requests than it did previously.

  ## List of changes

  - sync logic moved to `script_sync.rs` and `ElectrumLikeSync` is gone.
  - esplora makes one http request per sync address. This means it makes half the number of http requests for a fully synced wallet and N*M less requests for a wallet which has N new transactions with M unique input transactions.
  - electrum and esplora save less raw transactions in the database. Electrum still requests input transactions for each of its transactions to calculate the fee but it does not save them to the database anymore.
  - The ureq and reqwest blockchain configuration is now unified into the same struct. This is the only API change. `read_timeout` and `write_timeout` have been removed in favor of a single `timeout` option which is set in both ureq and reqwest.
  - ureq now does concurrent (parallel) requests using threads.
  - An previously unnoticed bug has been fixed where by sending a lot of double spending transactions to the same address you could trick a bdk Esplora wallet into thinking it had a lot of unconfirmed coins. This is because esplora doesn't delete double spent transactions from its indexes immediately (not sure if this is a bug or a feature). A blockchain test is added for this.
  - BONUS: The second commit in this PR fixes the feerate calculation for esplora and adds a test (the previous algorithm didn't work at all). I could have made a separate PR but since I was touching this file a lot I decided to fix it here.

  ## Notes to the reviewers

  - The most important thing to review is the the logic in `script_sync.rs` is sound.
  - Look at the two commits separately.
  - I think CI is failing because of MSRV problems again!
  - It would be cool to measure how much sync time is improved for your existing wallets/projects. For `gun` the speed improvements for modest but it is at least hammering the esplora server much less.
  - I noticed the performance of reqwest in blocking is much worse in this patch than previously. This is because somehow reqwest is not re-using the connection for each request in this new code. I have no idea why. The plan is to get rid of the blocking reqwest implementation in a follow up PR.

  ### Checklists

  #### All Submissions:

  * [x] I've signed all my commits

ACKs for top commit:
  rajarshimaitra:
    Retested ACK a630685a0a

Tree-SHA512: de74981e9d1f80758a9f20a3314ed7381c6b7c635f7ede80b177651fe2f9e9468064fae26bf80d4254098accfacfe50326ae0968e915186e13313f05bf77990b
This commit is contained in:
Steve Myers 2021-11-24 19:48:19 -08:00
commit b2ac4a0dfd
No known key found for this signature in database
GPG Key ID: 8105A46B22C2D051
11 changed files with 1127 additions and 819 deletions

View File

@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- BIP39 implementation dependency, in `keys::bip39` changed from tiny-bip39 to rust-bip39. - BIP39 implementation dependency, in `keys::bip39` changed from tiny-bip39 to rust-bip39.
- Add new method on the `TxBuilder` to embed data in the transaction via `OP_RETURN`. To allow that a fix to check the dust only on spendable output has been introduced. - Add new method on the `TxBuilder` to embed data in the transaction via `OP_RETURN`. To allow that a fix to check the dust only on spendable output has been introduced.
- Overhauled sync logic for electrum and esplora.
- Unify ureq and reqwest esplora backends to have the same configuration parameters. This means reqwest now has a timeout parameter and ureq has a concurrency parameter.
- Fixed esplora fee estimation.
- Update the `Database` trait to store the last sync timestamp and block height - Update the `Database` trait to store the last sync timestamp and block height
- Rename `ConfirmationTime` to `BlockTime` - Rename `ConfirmationTime` to `BlockTime`
@ -393,4 +396,4 @@ final transaction is created by calling `finish` on the builder.
[v0.10.0]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...v0.10.0 [v0.10.0]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...v0.10.0
[v0.11.0]: https://github.com/bitcoindevkit/bdk/compare/v0.10.0...v0.11.0 [v0.11.0]: https://github.com/bitcoindevkit/bdk/compare/v0.10.0...v0.11.0
[v0.12.0]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...v0.12.0 [v0.12.0]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...v0.12.0
[v0.13.0]: https://github.com/bitcoindevkit/bdk/compare/v0.12.0...v0.13.0 [v0.13.0]: https://github.com/bitcoindevkit/bdk/compare/v0.12.0...v0.13.0

View File

@ -24,20 +24,20 @@
//! # Ok::<(), bdk::Error>(()) //! # Ok::<(), bdk::Error>(())
//! ``` //! ```
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
#[allow(unused_imports)] #[allow(unused_imports)]
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use bitcoin::{BlockHeader, Script, Transaction, Txid}; use bitcoin::{Transaction, Txid};
use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config}; use electrum_client::{Client, ConfigBuilder, ElectrumApi, Socks5Config};
use self::utils::{ElectrumLikeSync, ElsGetHistoryRes}; use super::script_sync::Request;
use super::*; use super::*;
use crate::database::BatchDatabase; use crate::database::{BatchDatabase, Database};
use crate::error::Error; use crate::error::Error;
use crate::FeeRate; use crate::{BlockTime, FeeRate};
/// Wrapper over an Electrum Client that implements the required blockchain traits /// Wrapper over an Electrum Client that implements the required blockchain traits
/// ///
@ -71,10 +71,139 @@ impl Blockchain for ElectrumBlockchain {
fn setup<D: BatchDatabase, P: Progress>( fn setup<D: BatchDatabase, P: Progress>(
&self, &self,
database: &mut D, database: &mut D,
progress_update: P, _progress_update: P,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.client let mut request = script_sync::start(database, self.stop_gap)?;
.electrum_like_setup(self.stop_gap, database, progress_update) let mut block_times = HashMap::<u32, u32>::new();
let mut txid_to_height = HashMap::<Txid, u32>::new();
let mut tx_cache = TxCache::new(database, &self.client);
let chunk_size = self.stop_gap;
// The electrum server has been inconsistent somehow in its responses during sync. For
// example, we do a batch request of transactions and the response contains less
// tranascations than in the request. This should never happen but we don't want to panic.
let electrum_goof = || Error::Generic("electrum server misbehaving".to_string());
let batch_update = loop {
request = match request {
Request::Script(script_req) => {
let scripts = script_req.request().take(chunk_size);
let txids_per_script: Vec<Vec<_>> = self
.client
.batch_script_get_history(scripts)
.map_err(Error::Electrum)?
.into_iter()
.map(|txs| {
txs.into_iter()
.map(|tx| {
let tx_height = match tx.height {
none if none <= 0 => None,
height => {
txid_to_height.insert(tx.tx_hash, height as u32);
Some(height as u32)
}
};
(tx.tx_hash, tx_height)
})
.collect()
})
.collect();
script_req.satisfy(txids_per_script)?
}
Request::Conftime(conftime_req) => {
// collect up to chunk_size heights to fetch from electrum
let needs_block_height = {
let mut needs_block_height_iter = conftime_req
.request()
.filter_map(|txid| txid_to_height.get(txid).cloned())
.filter(|height| block_times.get(height).is_none());
let mut needs_block_height = HashSet::new();
while needs_block_height.len() < chunk_size {
match needs_block_height_iter.next() {
Some(height) => needs_block_height.insert(height),
None => break,
};
}
needs_block_height
};
let new_block_headers = self
.client
.batch_block_header(needs_block_height.iter().cloned())?;
for (height, header) in needs_block_height.into_iter().zip(new_block_headers) {
block_times.insert(height, header.time);
}
let conftimes = conftime_req
.request()
.take(chunk_size)
.map(|txid| {
let confirmation_time = txid_to_height
.get(txid)
.map(|height| {
let timestamp =
*block_times.get(height).ok_or_else(electrum_goof)?;
Result::<_, Error>::Ok(BlockTime {
height: *height,
timestamp: timestamp.into(),
})
})
.transpose()?;
Ok(confirmation_time)
})
.collect::<Result<_, Error>>()?;
conftime_req.satisfy(conftimes)?
}
Request::Tx(tx_req) => {
let needs_full = tx_req.request().take(chunk_size);
tx_cache.save_txs(needs_full.clone())?;
let full_transactions = needs_full
.map(|txid| tx_cache.get(*txid).ok_or_else(electrum_goof))
.collect::<Result<Vec<_>, _>>()?;
let input_txs = full_transactions.iter().flat_map(|tx| {
tx.input
.iter()
.filter(|input| !input.previous_output.is_null())
.map(|input| &input.previous_output.txid)
});
tx_cache.save_txs(input_txs)?;
let full_details = full_transactions
.into_iter()
.map(|tx| {
let prev_outputs = tx
.input
.iter()
.map(|input| {
if input.previous_output.is_null() {
return Ok(None);
}
let prev_tx = tx_cache
.get(input.previous_output.txid)
.ok_or_else(electrum_goof)?;
let txout = prev_tx
.output
.get(input.previous_output.vout as usize)
.ok_or_else(electrum_goof)?;
Ok(Some(txout.clone()))
})
.collect::<Result<Vec<_>, Error>>()?;
Ok((prev_outputs, tx))
})
.collect::<Result<Vec<_>, Error>>()?;
tx_req.satisfy(full_details)?
}
Request::Finish(batch_update) => break batch_update,
}
};
database.commit_batch(batch_update)?;
Ok(())
} }
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> { fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@ -101,43 +230,48 @@ impl Blockchain for ElectrumBlockchain {
} }
} }
impl ElectrumLikeSync for Client { struct TxCache<'a, 'b, D> {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>( db: &'a D,
&self, client: &'b Client,
scripts: I, cache: HashMap<Txid, Transaction>,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> { }
self.batch_script_get_history(scripts)
.map(|v| { impl<'a, 'b, D: Database> TxCache<'a, 'b, D> {
v.into_iter() fn new(db: &'a D, client: &'b Client) -> Self {
.map(|v| { TxCache {
v.into_iter() db,
.map( client,
|electrum_client::GetHistoryRes { cache: HashMap::default(),
height, tx_hash, .. }
}| ElsGetHistoryRes { }
height, fn save_txs<'c>(&mut self, txids: impl Iterator<Item = &'c Txid>) -> Result<(), Error> {
tx_hash, let mut need_fetch = vec![];
}, for txid in txids {
) if self.cache.get(txid).is_some() {
.collect() continue;
}) } else if let Some(transaction) = self.db.get_raw_tx(txid)? {
.collect() self.cache.insert(*txid, transaction);
}) } else {
.map_err(Error::Electrum) need_fetch.push(txid);
}
}
if !need_fetch.is_empty() {
let txs = self
.client
.batch_transaction_get(need_fetch.clone())
.map_err(Error::Electrum)?;
for (tx, _txid) in txs.into_iter().zip(need_fetch) {
debug_assert_eq!(*_txid, tx.txid());
self.cache.insert(tx.txid(), tx);
}
}
Ok(())
} }
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>( fn get(&self, txid: Txid) -> Option<Transaction> {
&self, self.cache.get(&txid).map(Clone::clone)
txids: I,
) -> Result<Vec<Transaction>, Error> {
self.batch_transaction_get(txids).map_err(Error::Electrum)
}
fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
self.batch_block_header(heights).map_err(Error::Electrum)
} }
} }

View File

@ -0,0 +1,117 @@
//! structs from the esplora API
//!
//! see: <https://github.com/Blockstream/esplora/blob/master/API.md>
use crate::BlockTime;
use bitcoin::{OutPoint, Script, Transaction, TxIn, TxOut, Txid};
#[derive(serde::Deserialize, Clone, Debug)]
pub struct PrevOut {
pub value: u64,
pub scriptpubkey: Script,
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct Vin {
pub txid: Txid,
pub vout: u32,
// None if coinbase
pub prevout: Option<PrevOut>,
pub scriptsig: Script,
#[serde(deserialize_with = "deserialize_witness")]
pub witness: Vec<Vec<u8>>,
pub sequence: u32,
pub is_coinbase: bool,
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct Vout {
pub value: u64,
pub scriptpubkey: Script,
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct TxStatus {
pub confirmed: bool,
pub block_height: Option<u32>,
pub block_time: Option<u64>,
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct Tx {
pub txid: Txid,
pub version: i32,
pub locktime: u32,
pub vin: Vec<Vin>,
pub vout: Vec<Vout>,
pub status: TxStatus,
pub fee: u64,
}
impl Tx {
pub fn to_tx(&self) -> Transaction {
Transaction {
version: self.version,
lock_time: self.locktime,
input: self
.vin
.iter()
.cloned()
.map(|vin| TxIn {
previous_output: OutPoint {
txid: vin.txid,
vout: vin.vout,
},
script_sig: vin.scriptsig,
sequence: vin.sequence,
witness: vin.witness,
})
.collect(),
output: self
.vout
.iter()
.cloned()
.map(|vout| TxOut {
value: vout.value,
script_pubkey: vout.scriptpubkey,
})
.collect(),
}
}
pub fn confirmation_time(&self) -> Option<BlockTime> {
match self.status {
TxStatus {
confirmed: true,
block_height: Some(height),
block_time: Some(timestamp),
} => Some(BlockTime { timestamp, height }),
_ => None,
}
}
pub fn previous_outputs(&self) -> Vec<Option<TxOut>> {
self.vin
.iter()
.cloned()
.map(|vin| {
vin.prevout.map(|po| TxOut {
script_pubkey: po.scriptpubkey,
value: po.value,
})
})
.collect()
}
}
fn deserialize_witness<'de, D>(d: D) -> Result<Vec<Vec<u8>>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
use crate::serde::Deserialize;
use bitcoin::hashes::hex::FromHex;
let list = Vec::<String>::deserialize(d)?;
list.into_iter()
.map(|hex_str| Vec::<u8>::from_hex(&hex_str))
.collect::<Result<Vec<Vec<u8>>, _>>()
.map_err(serde::de::Error::custom)
}

View File

@ -21,8 +21,6 @@ use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::io; use std::io;
use serde::Deserialize;
use bitcoin::consensus; use bitcoin::consensus;
use bitcoin::{BlockHash, Txid}; use bitcoin::{BlockHash, Txid};
@ -41,33 +39,24 @@ mod ureq;
#[cfg(feature = "ureq")] #[cfg(feature = "ureq")]
pub use self::ureq::*; pub use self::ureq::*;
mod api;
fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> { fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> {
let fee_val = estimates let fee_val = {
.into_iter() let mut pairs = estimates
.map(|(k, v)| Ok::<_, std::num::ParseIntError>((k.parse::<usize>()?, v))) .into_iter()
.collect::<Result<Vec<_>, _>>() .filter_map(|(k, v)| Some((k.parse::<usize>().ok()?, v)))
.map_err(|e| Error::Generic(e.to_string()))? .collect::<Vec<_>>();
.into_iter() pairs.sort_unstable_by_key(|(k, _)| std::cmp::Reverse(*k));
.take_while(|(k, _)| k <= &target) pairs
.map(|(_, v)| v) .into_iter()
.last() .find(|(k, _)| k <= &target)
.unwrap_or(1.0); .map(|(_, v)| v)
.unwrap_or(1.0)
};
Ok(FeeRate::from_sat_per_vb(fee_val as f32)) Ok(FeeRate::from_sat_per_vb(fee_val as f32))
} }
/// Data type used when fetching transaction history from Esplora.
#[derive(Deserialize)]
pub struct EsploraGetHistory {
txid: Txid,
status: EsploraGetHistoryStatus,
}
#[derive(Deserialize)]
struct EsploraGetHistoryStatus {
block_height: Option<usize>,
}
/// Errors that can happen during a sync with [`EsploraBlockchain`] /// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)] #[derive(Debug)]
pub enum EsploraError { pub enum EsploraError {
@ -107,10 +96,50 @@ impl fmt::Display for EsploraError {
} }
} }
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service
///
/// eg. `https://blockstream.info/api/`
pub base_url: String,
/// Optional URL of the proxy to use to make requests to the Esplora server
///
/// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
///
/// Note that the format of this value and the supported protocols change slightly between the
/// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
/// details check with the documentation of the two crates. Both of them are compiled with
/// the `socks` feature enabled.
///
/// The proxy is ignored when targeting `wasm32`.
#[serde(skip_serializing_if = "Option::is_none")]
pub proxy: Option<String>,
/// Number of parallel requests sent to the esplora service (default: 4)
#[serde(skip_serializing_if = "Option::is_none")]
pub concurrency: Option<u8>,
/// Stop searching addresses for transactions after finding an unused gap of this length.
pub stop_gap: usize,
/// Socket timeout.
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<u64>,
}
impl EsploraBlockchainConfig {
/// create a config with default values given the base url and stop gap
pub fn new(base_url: String, stop_gap: usize) -> Self {
Self {
base_url,
proxy: None,
timeout: None,
stop_gap,
concurrency: None,
}
}
}
impl std::error::Error for EsploraError {} impl std::error::Error for EsploraError {}
#[cfg(feature = "ureq")]
impl_error!(::ureq::Error, Ureq, EsploraError);
#[cfg(feature = "ureq")] #[cfg(feature = "ureq")]
impl_error!(::ureq::Transport, UreqTransport, EsploraError); impl_error!(::ureq::Transport, UreqTransport, EsploraError);
#[cfg(feature = "reqwest")] #[cfg(feature = "reqwest")]
@ -127,3 +156,57 @@ crate::bdk_blockchain_tests! {
EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20) EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20)
} }
} }
const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
#[cfg(test)]
mod test {
use super::*;
#[test]
fn feerate_parsing() {
let esplora_fees = serde_json::from_str::<HashMap<String, f64>>(
r#"{
"25": 1.015,
"5": 2.3280000000000003,
"12": 2.0109999999999997,
"15": 1.018,
"17": 1.018,
"11": 2.0109999999999997,
"3": 3.01,
"2": 4.9830000000000005,
"6": 2.2359999999999998,
"21": 1.018,
"13": 1.081,
"7": 2.2359999999999998,
"8": 2.2359999999999998,
"16": 1.018,
"20": 1.018,
"22": 1.017,
"23": 1.017,
"504": 1,
"9": 2.2359999999999998,
"14": 1.018,
"10": 2.0109999999999997,
"24": 1.017,
"1008": 1,
"1": 4.9830000000000005,
"4": 2.3280000000000003,
"19": 1.018,
"144": 1,
"18": 1.018
}
"#,
)
.unwrap();
assert_eq!(
into_fee_rate(6, esplora_fees.clone()).unwrap(),
FeeRate::from_sat_per_vb(2.236)
);
assert_eq!(
into_fee_rate(26, esplora_fees).unwrap(),
FeeRate::from_sat_per_vb(1.015),
"should inherit from value for 25"
);
}
}

View File

@ -21,20 +21,16 @@ use bitcoin::{BlockHeader, Script, Transaction, Txid};
#[allow(unused_imports)] #[allow(unused_imports)]
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
use ::reqwest::{Client, StatusCode}; use ::reqwest::{Client, StatusCode};
use futures::stream::{FuturesOrdered, TryStreamExt};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory}; use super::api::Tx;
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes}; use crate::blockchain::esplora::EsploraError;
use crate::blockchain::*; use crate::blockchain::*;
use crate::database::BatchDatabase; use crate::database::BatchDatabase;
use crate::error::Error; use crate::error::Error;
use crate::wallet::utils::ChunksIterator;
use crate::FeeRate; use crate::FeeRate;
const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
#[derive(Debug)] #[derive(Debug)]
struct UrlClient { struct UrlClient {
url: String, url: String,
@ -70,7 +66,7 @@ impl EsploraBlockchain {
url_client: UrlClient { url_client: UrlClient {
url: base_url.to_string(), url: base_url.to_string(),
client: Client::new(), client: Client::new(),
concurrency: DEFAULT_CONCURRENT_REQUESTS, concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
}, },
stop_gap, stop_gap,
} }
@ -98,11 +94,91 @@ impl Blockchain for EsploraBlockchain {
fn setup<D: BatchDatabase, P: Progress>( fn setup<D: BatchDatabase, P: Progress>(
&self, &self,
database: &mut D, database: &mut D,
progress_update: P, _progress_update: P,
) -> Result<(), Error> { ) -> Result<(), Error> {
maybe_await!(self use crate::blockchain::script_sync::Request;
.url_client let mut request = script_sync::start(database, self.stop_gap)?;
.electrum_like_setup(self.stop_gap, database, progress_update)) let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
let batch_update = loop {
request = match request {
Request::Script(script_req) => {
let futures: FuturesOrdered<_> = script_req
.request()
.take(self.url_client.concurrency as usize)
.map(|script| async move {
let mut related_txs: Vec<Tx> =
self.url_client._scripthash_txs(script, None).await?;
let n_confirmed =
related_txs.iter().filter(|tx| tx.status.confirmed).count();
// esplora pages on 25 confirmed transactions. If there's 25 or more we
// keep requesting to see if there's more.
if n_confirmed >= 25 {
loop {
let new_related_txs: Vec<Tx> = self
.url_client
._scripthash_txs(
script,
Some(related_txs.last().unwrap().txid),
)
.await?;
let n = new_related_txs.len();
related_txs.extend(new_related_txs);
// we've reached the end
if n < 25 {
break;
}
}
}
Result::<_, Error>::Ok(related_txs)
})
.collect();
let txs_per_script: Vec<Vec<Tx>> = await_or_block!(futures.try_collect())?;
let mut satisfaction = vec![];
for txs in txs_per_script {
satisfaction.push(
txs.iter()
.map(|tx| (tx.txid, tx.status.block_height))
.collect(),
);
for tx in txs {
tx_index.insert(tx.txid, tx);
}
}
script_req.satisfy(satisfaction)?
}
Request::Conftime(conftime_req) => {
let conftimes = conftime_req
.request()
.map(|txid| {
tx_index
.get(txid)
.expect("must be in index")
.confirmation_time()
})
.collect();
conftime_req.satisfy(conftimes)?
}
Request::Tx(tx_req) => {
let full_txs = tx_req
.request()
.map(|txid| {
let tx = tx_index.get(txid).expect("must be in index");
(tx.previous_outputs(), tx.to_tx())
})
.collect();
tx_req.satisfy(full_txs)?
}
Request::Finish(batch_update) => break batch_update,
}
};
database.commit_batch(batch_update)?;
Ok(())
} }
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> { fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@ -124,10 +200,6 @@ impl Blockchain for EsploraBlockchain {
} }
impl UrlClient { impl UrlClient {
fn script_to_scripthash(script: &Script) -> String {
sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
}
async fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> { async fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self let resp = self
.client .client
@ -196,71 +268,27 @@ impl UrlClient {
Ok(req.error_for_status()?.text().await?.parse()?) Ok(req.error_for_status()?.text().await?.parse()?)
} }
async fn _script_get_history( async fn _scripthash_txs(
&self, &self,
script: &Script, script: &Script,
) -> Result<Vec<ElsGetHistoryRes>, EsploraError> { last_seen: Option<Txid>,
let mut result = Vec::new(); ) -> Result<Vec<Tx>, EsploraError> {
let scripthash = Self::script_to_scripthash(script); let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
let url = match last_seen {
// Add the unconfirmed transactions first Some(last_seen) => format!(
result.extend( "{}/scripthash/{}/txs/chain/{}",
self.client self.url, script_hash, last_seen
.get(&format!( ),
"{}/scripthash/{}/txs/mempool", None => format!("{}/scripthash/{}/txs", self.url, script_hash),
self.url, scripthash };
)) Ok(self
.send() .client
.await? .get(url)
.error_for_status()? .send()
.json::<Vec<EsploraGetHistory>>() .await?
.await? .error_for_status()?
.into_iter() .json::<Vec<Tx>>()
.map(|x| ElsGetHistoryRes { .await?)
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}),
);
debug!(
"Found {} mempool txs for {} - {:?}",
result.len(),
scripthash,
script
);
// Then go through all the pages of confirmed transactions
let mut last_txid = String::new();
loop {
let response = self
.client
.get(&format!(
"{}/scripthash/{}/txs/chain/{}",
self.url, scripthash, last_txid
))
.send()
.await?
.error_for_status()?
.json::<Vec<EsploraGetHistory>>()
.await?;
let len = response.len();
if let Some(elem) = response.last() {
last_txid = elem.txid.to_hex();
}
debug!("... adding {} confirmed transactions", len);
result.extend(response.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
if len < 25 {
break;
}
}
Ok(result)
} }
async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> { async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
@ -275,83 +303,8 @@ impl UrlClient {
} }
} }
#[maybe_async]
impl ElectrumLikeSync for UrlClient {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for script in chunk {
futs.push(self._script_get_history(script));
}
let partial_results: Vec<Vec<ElsGetHistoryRes>> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for txid in chunk {
futs.push(self._get_tx_no_opt(txid));
}
let partial_results: Vec<Transaction> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for height in chunk {
futs.push(self._get_header(height));
}
let partial_results: Vec<BlockHeader> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
}
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service
///
/// eg. `https://blockstream.info/api/`
pub base_url: String,
/// Optional URL of the proxy to use to make requests to the Esplora server
///
/// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
///
/// Note that the format of this value and the supported protocols change slightly between the
/// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
/// details check with the documentation of the two crates. Both of them are compiled with
/// the `socks` feature enabled.
///
/// The proxy is ignored when targeting `wasm32`.
pub proxy: Option<String>,
/// Number of parallel requests sent to the esplora service (default: 4)
pub concurrency: Option<u8>,
/// Stop searching addresses for transactions after finding an unused gap of this length.
pub stop_gap: usize,
}
impl ConfigurableBlockchain for EsploraBlockchain { impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig; type Config = super::EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> { fn from_config(config: &Self::Config) -> Result<Self, Error> {
let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into())); let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into()));
@ -360,13 +313,19 @@ impl ConfigurableBlockchain for EsploraBlockchain {
if let Some(concurrency) = config.concurrency { if let Some(concurrency) = config.concurrency {
blockchain.url_client.concurrency = concurrency; blockchain.url_client.concurrency = concurrency;
} }
let mut builder = Client::builder();
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
if let Some(proxy) = &config.proxy { if let Some(proxy) = &config.proxy {
blockchain.url_client.client = Client::builder() builder = builder.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?);
.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?)
.build()
.map_err(map_e)?;
} }
#[cfg(not(target_arch = "wasm32"))]
if let Some(timeout) = config.timeout {
builder = builder.timeout(core::time::Duration::from_secs(timeout));
}
blockchain.url_client.client = builder.build().map_err(map_e)?;
Ok(blockchain) Ok(blockchain)
} }
} }

View File

@ -26,14 +26,14 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash}; use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid}; use bitcoin::{BlockHeader, Script, Transaction, Txid};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory}; use super::api::Tx;
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes}; use crate::blockchain::esplora::EsploraError;
use crate::blockchain::*; use crate::blockchain::*;
use crate::database::BatchDatabase; use crate::database::BatchDatabase;
use crate::error::Error; use crate::error::Error;
use crate::FeeRate; use crate::FeeRate;
#[derive(Debug)] #[derive(Debug, Clone)]
struct UrlClient { struct UrlClient {
url: String, url: String,
agent: Agent, agent: Agent,
@ -47,15 +47,7 @@ struct UrlClient {
pub struct EsploraBlockchain { pub struct EsploraBlockchain {
url_client: UrlClient, url_client: UrlClient,
stop_gap: usize, stop_gap: usize,
} concurrency: u8,
impl std::convert::From<UrlClient> for EsploraBlockchain {
fn from(url_client: UrlClient) -> Self {
EsploraBlockchain {
url_client,
stop_gap: 20,
}
}
} }
impl EsploraBlockchain { impl EsploraBlockchain {
@ -66,6 +58,7 @@ impl EsploraBlockchain {
url: base_url.to_string(), url: base_url.to_string(),
agent: Agent::new(), agent: Agent::new(),
}, },
concurrency: super::DEFAULT_CONCURRENT_REQUESTS,
stop_gap, stop_gap,
} }
} }
@ -75,6 +68,12 @@ impl EsploraBlockchain {
self.url_client.agent = agent; self.url_client.agent = agent;
self self
} }
/// Set the number of parallel requests the client can make.
pub fn with_concurrency(mut self, concurrency: u8) -> Self {
self.concurrency = concurrency;
self
}
} }
impl Blockchain for EsploraBlockchain { impl Blockchain for EsploraBlockchain {
@ -91,10 +90,94 @@ impl Blockchain for EsploraBlockchain {
fn setup<D: BatchDatabase, P: Progress>( fn setup<D: BatchDatabase, P: Progress>(
&self, &self,
database: &mut D, database: &mut D,
progress_update: P, _progress_update: P,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.url_client use crate::blockchain::script_sync::Request;
.electrum_like_setup(self.stop_gap, database, progress_update) let mut request = script_sync::start(database, self.stop_gap)?;
let mut tx_index: HashMap<Txid, Tx> = HashMap::new();
let batch_update = loop {
request = match request {
Request::Script(script_req) => {
let scripts = script_req
.request()
.take(self.concurrency as usize)
.cloned();
let handles = scripts.map(move |script| {
let client = self.url_client.clone();
// make each request in its own thread.
std::thread::spawn(move || {
let mut related_txs: Vec<Tx> = client._scripthash_txs(&script, None)?;
let n_confirmed =
related_txs.iter().filter(|tx| tx.status.confirmed).count();
// esplora pages on 25 confirmed transactions. If there's 25 or more we
// keep requesting to see if there's more.
if n_confirmed >= 25 {
loop {
let new_related_txs: Vec<Tx> = client._scripthash_txs(
&script,
Some(related_txs.last().unwrap().txid),
)?;
let n = new_related_txs.len();
related_txs.extend(new_related_txs);
// we've reached the end
if n < 25 {
break;
}
}
}
Result::<_, Error>::Ok(related_txs)
})
});
let txs_per_script: Vec<Vec<Tx>> = handles
.map(|handle| handle.join().unwrap())
.collect::<Result<_, _>>()?;
let mut satisfaction = vec![];
for txs in txs_per_script {
satisfaction.push(
txs.iter()
.map(|tx| (tx.txid, tx.status.block_height))
.collect(),
);
for tx in txs {
tx_index.insert(tx.txid, tx);
}
}
script_req.satisfy(satisfaction)?
}
Request::Conftime(conftime_req) => {
let conftimes = conftime_req
.request()
.map(|txid| {
tx_index
.get(txid)
.expect("must be in index")
.confirmation_time()
})
.collect();
conftime_req.satisfy(conftimes)?
}
Request::Tx(tx_req) => {
let full_txs = tx_req
.request()
.map(|txid| {
let tx = tx_index.get(txid).expect("must be in index");
(tx.previous_outputs(), tx.to_tx())
})
.collect();
tx_req.satisfy(full_txs)?
}
Request::Finish(batch_update) => break batch_update,
}
};
database.commit_batch(batch_update)?;
Ok(())
} }
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> { fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
@ -117,10 +200,6 @@ impl Blockchain for EsploraBlockchain {
} }
impl UrlClient { impl UrlClient {
fn script_to_scripthash(script: &Script) -> String {
sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
}
fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> { fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self let resp = self
.agent .agent
@ -200,81 +279,6 @@ impl UrlClient {
} }
} }
fn _script_get_history(&self, script: &Script) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
let mut result = Vec::new();
let scripthash = Self::script_to_scripthash(script);
// Add the unconfirmed transactions first
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/mempool",
self.url, scripthash
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
debug!(
"Found {} mempool txs for {} - {:?}",
result.len(),
scripthash,
script
);
// Then go through all the pages of confirmed transactions
let mut last_txid = String::new();
loop {
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/chain/{}",
self.url, scripthash, last_txid
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
let len = v.len();
if let Some(elem) = v.last() {
last_txid = elem.txid.to_hex();
}
debug!("... adding {} confirmed transactions", len);
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
if len < 25 {
break;
}
}
Ok(result)
}
fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> { fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
let resp = self let resp = self
.agent .agent
@ -292,6 +296,22 @@ impl UrlClient {
Ok(map) Ok(map)
} }
fn _scripthash_txs(
&self,
script: &Script,
last_seen: Option<Txid>,
) -> Result<Vec<Tx>, EsploraError> {
let script_hash = sha256::Hash::hash(script.as_bytes()).into_inner().to_hex();
let url = match last_seen {
Some(last_seen) => format!(
"{}/scripthash/{}/txs/chain/{}",
self.url, script_hash, last_seen
),
None => format!("{}/scripthash/{}/txs", self.url, script_hash),
};
Ok(self.agent.get(&url).call()?.into_json()?)
}
} }
fn is_status_not_found(status: u16) -> bool { fn is_status_not_found(status: u16) -> bool {
@ -315,84 +335,37 @@ fn into_bytes(resp: Response) -> Result<Vec<u8>, io::Error> {
Ok(buf) Ok(buf)
} }
impl ElectrumLikeSync for UrlClient {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for script in scripts.into_iter() {
let v = self._script_get_history(script)?;
results.push(v);
}
Ok(results)
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for txid in txids.into_iter() {
let tx = self._get_tx_no_opt(txid)?;
results.push(tx);
}
Ok(results)
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for height in heights.into_iter() {
let header = self._get_header(height)?;
results.push(header);
}
Ok(results)
}
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service eg. `https://blockstream.info/api/`
pub base_url: String,
/// Optional URL of the proxy to use to make requests to the Esplora server
///
/// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
///
/// Note that the format of this value and the supported protocols change slightly between the
/// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
/// details check with the documentation of the two crates. Both of them are compiled with
/// the `socks` feature enabled.
///
/// The proxy is ignored when targeting `wasm32`.
pub proxy: Option<String>,
/// Socket read timeout.
pub timeout_read: u64,
/// Socket write timeout.
pub timeout_write: u64,
/// Stop searching addresses for transactions after finding an unused gap of this length.
pub stop_gap: usize,
}
impl ConfigurableBlockchain for EsploraBlockchain { impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig; type Config = super::EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> { fn from_config(config: &Self::Config) -> Result<Self, Error> {
let mut agent_builder = ureq::AgentBuilder::new() let mut agent_builder = ureq::AgentBuilder::new();
.timeout_read(Duration::from_secs(config.timeout_read))
.timeout_write(Duration::from_secs(config.timeout_write)); if let Some(timeout) = config.timeout {
agent_builder = agent_builder.timeout(Duration::from_secs(timeout));
}
if let Some(proxy) = &config.proxy { if let Some(proxy) = &config.proxy {
agent_builder = agent_builder agent_builder = agent_builder
.proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?); .proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?);
} }
Ok( let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap) .with_agent(agent_builder.build());
.with_agent(agent_builder.build()),
) if let Some(concurrency) = config.concurrency {
blockchain = blockchain.with_concurrency(concurrency);
}
Ok(blockchain)
}
}
impl From<ureq::Error> for EsploraError {
fn from(e: ureq::Error) -> Self {
match e {
ureq::Error::Status(code, _) => EsploraError::HttpResponse(code),
e => EsploraError::Ureq(e),
}
} }
} }

View File

@ -27,9 +27,6 @@ use crate::database::BatchDatabase;
use crate::error::Error; use crate::error::Error;
use crate::FeeRate; use crate::FeeRate;
#[cfg(any(feature = "electrum", feature = "esplora"))]
pub(crate) mod utils;
#[cfg(any( #[cfg(any(
feature = "electrum", feature = "electrum",
feature = "esplora", feature = "esplora",
@ -37,6 +34,8 @@ pub(crate) mod utils;
feature = "rpc" feature = "rpc"
))] ))]
pub mod any; pub mod any;
mod script_sync;
#[cfg(any( #[cfg(any(
feature = "electrum", feature = "electrum",
feature = "esplora", feature = "esplora",

View File

@ -0,0 +1,394 @@
/*!
This models a how a sync happens where you have a server that you send your script pubkeys to and it
returns associated transactions i.e. electrum.
*/
#![allow(dead_code)]
use crate::{
database::{BatchDatabase, BatchOperations, DatabaseUtils},
wallet::time::Instant,
BlockTime, Error, KeychainKind, LocalUtxo, TransactionDetails,
};
use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
use log::*;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
/// A request for on-chain information
pub enum Request<'a, D: BatchDatabase> {
/// A request for transactions related to script pubkeys.
Script(ScriptReq<'a, D>),
/// A request for confirmation times for some transactions.
Conftime(ConftimeReq<'a, D>),
/// A request for full transaction details of some transactions.
Tx(TxReq<'a, D>),
/// Requests are finished here's a batch database update to reflect data gathered.
Finish(D::Batch),
}
/// starts a sync
pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>, Error> {
use rand::seq::SliceRandom;
let mut keychains = vec![KeychainKind::Internal, KeychainKind::External];
// shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
keychains.shuffle(&mut rand::thread_rng());
let keychain = keychains.pop().unwrap();
let scripts_needed = db
.iter_script_pubkeys(Some(keychain))?
.into_iter()
.collect();
let state = State::new(db);
Ok(Request::Script(ScriptReq {
state,
scripts_needed,
script_index: 0,
stop_gap,
keychain,
next_keychains: keychains,
}))
}
pub struct ScriptReq<'a, D: BatchDatabase> {
state: State<'a, D>,
script_index: usize,
scripts_needed: VecDeque<Script>,
stop_gap: usize,
keychain: KeychainKind,
next_keychains: Vec<KeychainKind>,
}
/// The sync starts by returning script pubkeys we are interested in.
impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
pub fn request(&self) -> impl Iterator<Item = &Script> + Clone {
self.scripts_needed.iter()
}
pub fn satisfy(
mut self,
// we want to know the txids assoiciated with the script and their height
txids: Vec<Vec<(Txid, Option<u32>)>>,
) -> Result<Request<'a, D>, Error> {
for (txid_list, script) in txids.iter().zip(self.scripts_needed.iter()) {
debug!(
"found {} transactions for script pubkey {}",
txid_list.len(),
script
);
if !txid_list.is_empty() {
// the address is active
self.state
.last_active_index
.insert(self.keychain, self.script_index);
}
for (txid, height) in txid_list {
// have we seen this txid already?
match self.state.db.get_tx(txid, true)? {
Some(mut details) => {
let old_height = details.confirmation_time.as_ref().map(|x| x.height);
match (old_height, height) {
(None, Some(_)) => {
// It looks like the tx has confirmed since we last saw it -- we
// need to know the confirmation time.
self.state.tx_missing_conftime.insert(*txid, details);
}
(Some(old_height), Some(new_height)) if old_height != *new_height => {
// The height of the tx has changed !? -- It's a reorg get the new confirmation time.
self.state.tx_missing_conftime.insert(*txid, details);
}
(Some(_), None) => {
// A re-org where the tx is not in the chain anymore.
details.confirmation_time = None;
self.state.finished_txs.push(details);
}
_ => self.state.finished_txs.push(details),
}
}
None => {
// we've never seen it let's get the whole thing
self.state.tx_needed.insert(*txid);
}
};
}
self.script_index += 1;
}
for _ in txids {
self.scripts_needed.pop_front();
}
let last_active_index = self
.state
.last_active_index
.get(&self.keychain)
.map(|x| x + 1)
.unwrap_or(0); // so no addresses active maps to 0
Ok(
if self.script_index > last_active_index + self.stop_gap
|| self.scripts_needed.is_empty()
{
debug!(
"finished scanning for transactions for keychain {:?} at index {}",
self.keychain, last_active_index
);
// we're done here -- check if we need to do the next keychain
if let Some(keychain) = self.next_keychains.pop() {
self.keychain = keychain;
self.script_index = 0;
self.scripts_needed = self
.state
.db
.iter_script_pubkeys(Some(keychain))?
.into_iter()
.collect();
Request::Script(self)
} else {
Request::Tx(TxReq { state: self.state })
}
} else {
Request::Script(self)
},
)
}
}
/// Then we get full transactions
pub struct TxReq<'a, D> {
state: State<'a, D>,
}
impl<'a, D: BatchDatabase> TxReq<'a, D> {
pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
self.state.tx_needed.iter()
}
pub fn satisfy(
mut self,
tx_details: Vec<(Vec<Option<TxOut>>, Transaction)>,
) -> Result<Request<'a, D>, Error> {
let tx_details: Vec<TransactionDetails> = tx_details
.into_iter()
.zip(self.state.tx_needed.iter())
.map(|((vout, tx), txid)| {
debug!("found tx_details for {}", txid);
assert_eq!(tx.txid(), *txid);
let mut sent: u64 = 0;
let mut received: u64 = 0;
let mut inputs_sum: u64 = 0;
let mut outputs_sum: u64 = 0;
for (txout, input) in vout.into_iter().zip(tx.input.iter()) {
let txout = match txout {
Some(txout) => txout,
None => {
// skip coinbase inputs
debug_assert!(
input.previous_output.is_null(),
"prevout should only be missing for coinbase"
);
continue;
}
};
inputs_sum += txout.value;
if self.state.db.is_mine(&txout.script_pubkey)? {
sent += txout.value;
}
}
for out in &tx.output {
outputs_sum += out.value;
if self.state.db.is_mine(&out.script_pubkey)? {
received += out.value;
}
}
// we need to saturating sub since we want coinbase txs to map to 0 fee and
// this subtraction will be negative for coinbase txs.
let fee = inputs_sum.saturating_sub(outputs_sum);
Result::<_, Error>::Ok(TransactionDetails {
txid: *txid,
transaction: Some(tx),
received,
sent,
// we're going to fill this in later
confirmation_time: None,
fee: Some(fee),
verified: false,
})
})
.collect::<Result<Vec<_>, _>>()?;
for tx_detail in tx_details {
self.state.tx_needed.remove(&tx_detail.txid);
self.state
.tx_missing_conftime
.insert(tx_detail.txid, tx_detail);
}
if !self.state.tx_needed.is_empty() {
Ok(Request::Tx(self))
} else {
Ok(Request::Conftime(ConftimeReq { state: self.state }))
}
}
}
/// Final step is to get confirmation times
pub struct ConftimeReq<'a, D> {
state: State<'a, D>,
}
impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
self.state.tx_missing_conftime.keys()
}
pub fn satisfy(
mut self,
confirmation_times: Vec<Option<BlockTime>>,
) -> Result<Request<'a, D>, Error> {
let conftime_needed = self
.request()
.cloned()
.take(confirmation_times.len())
.collect::<Vec<_>>();
for (confirmation_time, txid) in confirmation_times.into_iter().zip(conftime_needed.iter())
{
debug!("confirmation time for {} was {:?}", txid, confirmation_time);
if let Some(mut tx_details) = self.state.tx_missing_conftime.remove(txid) {
tx_details.confirmation_time = confirmation_time;
self.state.finished_txs.push(tx_details);
}
}
if self.state.tx_missing_conftime.is_empty() {
Ok(Request::Finish(self.state.into_db_update()?))
} else {
Ok(Request::Conftime(self))
}
}
}
struct State<'a, D> {
db: &'a D,
last_active_index: HashMap<KeychainKind, usize>,
/// Transactions where we need to get the full details
tx_needed: BTreeSet<Txid>,
/// Transacitions that we know everything about
finished_txs: Vec<TransactionDetails>,
/// Transactions that discovered conftimes should be inserted into
tx_missing_conftime: BTreeMap<Txid, TransactionDetails>,
/// The start of the sync
start_time: Instant,
}
impl<'a, D: BatchDatabase> State<'a, D> {
fn new(db: &'a D) -> Self {
State {
db,
last_active_index: HashMap::default(),
finished_txs: vec![],
tx_needed: BTreeSet::default(),
tx_missing_conftime: BTreeMap::default(),
start_time: Instant::new(),
}
}
fn into_db_update(self) -> Result<D::Batch, Error> {
debug_assert!(self.tx_needed.is_empty() && self.tx_missing_conftime.is_empty());
let existing_txs = self.db.iter_txs(false)?;
let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect();
let finished_txs = make_txs_consistent(&self.finished_txs);
let observed_txids: HashSet<Txid> = finished_txs.iter().map(|tx| tx.txid).collect();
let txids_to_delete = existing_txids.difference(&observed_txids);
let mut batch = self.db.begin_batch();
// Delete old txs that no longer exist
for txid in txids_to_delete {
if let Some(raw_tx) = self.db.get_raw_tx(txid)? {
for i in 0..raw_tx.output.len() {
// Also delete any utxos from the txs that no longer exist.
let _ = batch.del_utxo(&OutPoint {
txid: *txid,
vout: i as u32,
})?;
}
} else {
unreachable!("we should always have the raw tx");
}
batch.del_tx(txid, true)?;
}
// Set every tx we observed
for finished_tx in &finished_txs {
let tx = finished_tx
.transaction
.as_ref()
.expect("transaction will always be present here");
for (i, output) in tx.output.iter().enumerate() {
if let Some((keychain, _)) =
self.db.get_path_from_script_pubkey(&output.script_pubkey)?
{
// add utxos we own from the new transactions we've seen.
batch.set_utxo(&LocalUtxo {
outpoint: OutPoint {
txid: finished_tx.txid,
vout: i as u32,
},
txout: output.clone(),
keychain,
})?;
}
}
batch.set_tx(finished_tx)?;
}
// we don't do this in the loop above since we may want to delete some of the utxos we
// just added in case there are new tranasactions that spend form each other.
for finished_tx in &finished_txs {
let tx = finished_tx
.transaction
.as_ref()
.expect("transaction will always be present here");
for input in &tx.input {
// Delete any spent utxos
batch.del_utxo(&input.previous_output)?;
}
}
for (keychain, last_active_index) in self.last_active_index {
batch.set_last_index(keychain, last_active_index as u32)?;
}
info!(
"finished setup, elapsed {:?}ms",
self.start_time.elapsed().as_millis()
);
Ok(batch)
}
}
/// Remove conflicting transactions -- tie breaking them by fee.
fn make_txs_consistent(txs: &[TransactionDetails]) -> Vec<&TransactionDetails> {
let mut utxo_index: HashMap<OutPoint, &TransactionDetails> = HashMap::default();
for tx in txs {
for input in &tx.transaction.as_ref().unwrap().input {
utxo_index
.entry(input.previous_output)
.and_modify(|existing| match (tx.fee, existing.fee) {
(Some(fee), Some(existing_fee)) if fee > existing_fee => *existing = tx,
(Some(_), None) => *existing = tx,
_ => { /* leave it the same */ }
})
.or_insert(tx);
}
}
utxo_index
.into_iter()
.map(|(_, tx)| (tx.txid, tx))
.collect::<HashMap<_, _>>()
.into_iter()
.map(|(_, tx)| tx)
.collect()
}

View File

@ -1,388 +0,0 @@
// Bitcoin Dev Kit
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
use std::collections::{HashMap, HashSet};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use rand::seq::SliceRandom;
use rand::thread_rng;
use bitcoin::{BlockHeader, OutPoint, Script, Transaction, Txid};
use super::*;
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::error::Error;
use crate::types::{BlockTime, KeychainKind, LocalUtxo, TransactionDetails};
use crate::wallet::time::Instant;
use crate::wallet::utils::ChunksIterator;
#[derive(Debug)]
pub struct ElsGetHistoryRes {
pub height: i32,
pub tx_hash: Txid,
}
/// Implements the synchronization logic for an Electrum-like client.
#[maybe_async]
pub trait ElectrumLikeSync {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script> + Clone>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error>;
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid> + Clone>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error>;
fn els_batch_block_header<I: IntoIterator<Item = u32> + Clone>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error>;
// Provided methods down here...
fn electrum_like_setup<D: BatchDatabase, P: Progress>(
&self,
stop_gap: usize,
db: &mut D,
_progress_update: P,
) -> Result<(), Error> {
// TODO: progress
let start = Instant::new();
debug!("start setup");
let chunk_size = stop_gap;
let mut history_txs_id = HashSet::new();
let mut txid_height = HashMap::new();
let mut max_indexes = HashMap::new();
let mut wallet_chains = vec![KeychainKind::Internal, KeychainKind::External];
// shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
wallet_chains.shuffle(&mut thread_rng());
// download history of our internal and external script_pubkeys
for keychain in wallet_chains.iter() {
let script_iter = db.iter_script_pubkeys(Some(*keychain))?.into_iter();
for (i, chunk) in ChunksIterator::new(script_iter, stop_gap).enumerate() {
// TODO if i == last, should create another chunk of addresses in db
let call_result: Vec<Vec<ElsGetHistoryRes>> =
maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
let max_index = call_result
.iter()
.enumerate()
.filter_map(|(i, v)| v.first().map(|_| i as u32))
.max();
if let Some(max) = max_index {
max_indexes.insert(keychain, max + (i * chunk_size) as u32);
}
let flattened: Vec<ElsGetHistoryRes> = call_result.into_iter().flatten().collect();
debug!("#{} of {:?} results:{}", i, keychain, flattened.len());
if flattened.is_empty() {
// Didn't find anything in the last `stop_gap` script_pubkeys, breaking
break;
}
for el in flattened {
// el.height = -1 means unconfirmed with unconfirmed parents
// el.height = 0 means unconfirmed with confirmed parents
// but we treat those tx the same
if el.height <= 0 {
txid_height.insert(el.tx_hash, None);
} else {
txid_height.insert(el.tx_hash, Some(el.height as u32));
}
history_txs_id.insert(el.tx_hash);
}
}
}
// saving max indexes
info!("max indexes are: {:?}", max_indexes);
for keychain in wallet_chains.iter() {
if let Some(index) = max_indexes.get(keychain) {
db.set_last_index(*keychain, *index)?;
}
}
// get db status
let txs_details_in_db: HashMap<Txid, TransactionDetails> = db
.iter_txs(false)?
.into_iter()
.map(|tx| (tx.txid, tx))
.collect();
let txs_raw_in_db: HashMap<Txid, Transaction> = db
.iter_raw_txs()?
.into_iter()
.map(|tx| (tx.txid(), tx))
.collect();
let utxos_deps = utxos_deps(db, &txs_raw_in_db)?;
// download new txs and headers
let new_txs = maybe_await!(self.download_and_save_needed_raw_txs(
&history_txs_id,
&txs_raw_in_db,
chunk_size,
db
))?;
let new_timestamps = maybe_await!(self.download_needed_headers(
&txid_height,
&txs_details_in_db,
chunk_size
))?;
let mut batch = db.begin_batch();
// save any tx details not in db but in history_txs_id or with different height/timestamp
for txid in history_txs_id.iter() {
let height = txid_height.get(txid).cloned().flatten();
let timestamp = new_timestamps.get(txid).cloned();
if let Some(tx_details) = txs_details_in_db.get(txid) {
// check if tx height matches, otherwise updates it. timestamp is not in the if clause
// because we are not asking headers for confirmed tx we know about
if tx_details.confirmation_time.as_ref().map(|c| c.height) != height {
let confirmation_time = BlockTime::new(height, timestamp);
let mut new_tx_details = tx_details.clone();
new_tx_details.confirmation_time = confirmation_time;
batch.set_tx(&new_tx_details)?;
}
} else {
save_transaction_details_and_utxos(
txid,
db,
timestamp,
height,
&mut batch,
&utxos_deps,
)?;
}
}
// remove any tx details in db but not in history_txs_id
for txid in txs_details_in_db.keys() {
if !history_txs_id.contains(txid) {
batch.del_tx(txid, false)?;
}
}
// remove any spent utxo
for new_tx in new_txs.iter() {
for input in new_tx.input.iter() {
batch.del_utxo(&input.previous_output)?;
}
}
db.commit_batch(batch)?;
info!("finish setup, elapsed {:?}ms", start.elapsed().as_millis());
Ok(())
}
/// download txs identified by `history_txs_id` and theirs previous outputs if not already present in db
fn download_and_save_needed_raw_txs<D: BatchDatabase>(
&self,
history_txs_id: &HashSet<Txid>,
txs_raw_in_db: &HashMap<Txid, Transaction>,
chunk_size: usize,
db: &mut D,
) -> Result<Vec<Transaction>, Error> {
let mut txs_downloaded = vec![];
let txids_raw_in_db: HashSet<Txid> = txs_raw_in_db.keys().cloned().collect();
let txids_to_download: Vec<&Txid> = history_txs_id.difference(&txids_raw_in_db).collect();
if !txids_to_download.is_empty() {
info!("got {} txs to download", txids_to_download.len());
txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
txids_to_download,
chunk_size,
db,
))?);
let mut prev_txids = HashSet::new();
let mut txids_downloaded = HashSet::new();
for tx in txs_downloaded.iter() {
txids_downloaded.insert(tx.txid());
// add every previous input tx, but skip coinbase
for input in tx.input.iter().filter(|i| !i.previous_output.is_null()) {
prev_txids.insert(input.previous_output.txid);
}
}
let already_present: HashSet<Txid> =
txids_downloaded.union(&txids_raw_in_db).cloned().collect();
let prev_txs_to_download: Vec<&Txid> =
prev_txids.difference(&already_present).collect();
info!("{} previous txs to download", prev_txs_to_download.len());
txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
prev_txs_to_download,
chunk_size,
db,
))?);
}
Ok(txs_downloaded)
}
/// download headers at heights in `txid_height` if tx details not already present, returns a map Txid -> timestamp
fn download_needed_headers(
&self,
txid_height: &HashMap<Txid, Option<u32>>,
txs_details_in_db: &HashMap<Txid, TransactionDetails>,
chunk_size: usize,
) -> Result<HashMap<Txid, u64>, Error> {
let mut txid_timestamp = HashMap::new();
let txid_in_db_with_conf: HashSet<_> = txs_details_in_db
.values()
.filter_map(|details| details.confirmation_time.as_ref().map(|_| details.txid))
.collect();
let needed_txid_height: HashMap<&Txid, u32> = txid_height
.iter()
.filter(|(t, _)| !txid_in_db_with_conf.contains(*t))
.filter_map(|(t, o)| o.map(|h| (t, h)))
.collect();
let needed_heights: HashSet<u32> = needed_txid_height.values().cloned().collect();
if !needed_heights.is_empty() {
info!("{} headers to download for timestamp", needed_heights.len());
let mut height_timestamp: HashMap<u32, u64> = HashMap::new();
for chunk in ChunksIterator::new(needed_heights.into_iter(), chunk_size) {
let call_result: Vec<BlockHeader> =
maybe_await!(self.els_batch_block_header(chunk.clone()))?;
height_timestamp.extend(
chunk
.into_iter()
.zip(call_result.iter().map(|h| h.time as u64)),
);
}
for (txid, height) in needed_txid_height {
let timestamp = height_timestamp
.get(&height)
.ok_or_else(|| Error::Generic("timestamp missing".to_string()))?;
txid_timestamp.insert(*txid, *timestamp);
}
}
Ok(txid_timestamp)
}
fn download_and_save_in_chunks<D: BatchDatabase>(
&self,
to_download: Vec<&Txid>,
chunk_size: usize,
db: &mut D,
) -> Result<Vec<Transaction>, Error> {
let mut txs_downloaded = vec![];
for chunk in ChunksIterator::new(to_download.into_iter(), chunk_size) {
let call_result: Vec<Transaction> =
maybe_await!(self.els_batch_transaction_get(chunk))?;
let mut batch = db.begin_batch();
for new_tx in call_result.iter() {
batch.set_raw_tx(new_tx)?;
}
db.commit_batch(batch)?;
txs_downloaded.extend(call_result);
}
Ok(txs_downloaded)
}
}
fn save_transaction_details_and_utxos<D: BatchDatabase>(
txid: &Txid,
db: &mut D,
timestamp: Option<u64>,
height: Option<u32>,
updates: &mut dyn BatchOperations,
utxo_deps: &HashMap<OutPoint, OutPoint>,
) -> Result<(), Error> {
let tx = db.get_raw_tx(txid)?.ok_or(Error::TransactionNotFound)?;
let mut incoming: u64 = 0;
let mut outgoing: u64 = 0;
let mut inputs_sum: u64 = 0;
let mut outputs_sum: u64 = 0;
// look for our own inputs
for input in tx.input.iter() {
// skip coinbase inputs
if input.previous_output.is_null() {
continue;
}
// We already downloaded all previous output txs in the previous step
if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
inputs_sum += previous_output.value;
if db.is_mine(&previous_output.script_pubkey)? {
outgoing += previous_output.value;
}
} else {
// The input is not ours, but we still need to count it for the fees
let tx = db
.get_raw_tx(&input.previous_output.txid)?
.ok_or(Error::TransactionNotFound)?;
inputs_sum += tx.output[input.previous_output.vout as usize].value;
}
// removes conflicting UTXO if any (generated from same inputs, like for example RBF)
if let Some(outpoint) = utxo_deps.get(&input.previous_output) {
updates.del_utxo(outpoint)?;
}
}
for (i, output) in tx.output.iter().enumerate() {
// to compute the fees later
outputs_sum += output.value;
// this output is ours, we have a path to derive it
if let Some((keychain, _child)) = db.get_path_from_script_pubkey(&output.script_pubkey)? {
debug!("{} output #{} is mine, adding utxo", txid, i);
updates.set_utxo(&LocalUtxo {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
keychain,
})?;
incoming += output.value;
}
}
let tx_details = TransactionDetails {
txid: tx.txid(),
transaction: Some(tx),
received: incoming,
sent: outgoing,
confirmation_time: BlockTime::new(height, timestamp),
fee: Some(inputs_sum.saturating_sub(outputs_sum)), /* if the tx is a coinbase, fees would be negative */
verified: height.is_some(),
};
updates.set_tx(&tx_details)?;
Ok(())
}
/// returns utxo dependency as the inputs needed for the utxo to exist
/// `tx_raw_in_db` must contains utxo's generating txs or errors with [crate::Error::TransactionNotFound]
fn utxos_deps<D: BatchDatabase>(
db: &mut D,
tx_raw_in_db: &HashMap<Txid, Transaction>,
) -> Result<HashMap<OutPoint, OutPoint>, Error> {
let utxos = db.iter_utxos()?;
let mut utxos_deps = HashMap::new();
for utxo in utxos {
let from_tx = tx_raw_in_db
.get(&utxo.outpoint.txid)
.ok_or(Error::TransactionNotFound)?;
for input in from_tx.input.iter() {
utxos_deps.insert(input.previous_output, utxo.outpoint);
}
}
Ok(utxos_deps)
}

View File

@ -609,6 +609,74 @@ macro_rules! bdk_blockchain_tests {
assert_eq!(wallet.list_unspent().unwrap().len(), 1, "incorrect number of unspents"); assert_eq!(wallet.list_unspent().unwrap().len(), 1, "incorrect number of unspents");
} }
/// Send two conflicting transactions to the same address twice in a row.
/// The coins should only be received once!
#[test]
fn test_sync_double_receive() {
let (wallet, descriptors, mut test_client) = init_single_sig();
let receiver_wallet = get_wallet_from_descriptors(&("wpkh(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW)".to_string(), None), &test_client);
// need to sync so rpc can start watching
receiver_wallet.sync(noop_progress(), None).unwrap();
test_client.receive(testutils! {
@tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
});
wallet.sync(noop_progress(), None).unwrap();
assert_eq!(wallet.get_balance().unwrap(), 75_000, "incorrect balance");
let target_addr = receiver_wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address;
let tx1 = {
let mut builder = wallet.build_tx();
builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf();
let (mut psbt, _details) = builder.finish().unwrap();
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
assert!(finalized, "Cannot finalize transaction");
psbt.extract_tx()
};
let tx2 = {
let mut builder = wallet.build_tx();
builder.add_recipient(target_addr.script_pubkey(), 49_000).enable_rbf().fee_rate(FeeRate::from_sat_per_vb(5.0));
let (mut psbt, _details) = builder.finish().unwrap();
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
assert!(finalized, "Cannot finalize transaction");
psbt.extract_tx()
};
wallet.broadcast(&tx1).unwrap();
wallet.broadcast(&tx2).unwrap();
receiver_wallet.sync(noop_progress(), None).unwrap();
assert_eq!(receiver_wallet.get_balance().unwrap(), 49_000, "should have received coins once and only once");
}
#[test]
fn test_sync_many_sends_to_a_single_address() {
let (wallet, descriptors, mut test_client) = init_single_sig();
for _ in 0..4 {
// split this up into multiple blocks so rpc doesn't get angry
for _ in 0..20 {
test_client.receive(testutils! {
@tx ( (@external descriptors, 0) => 1_000 )
});
}
test_client.generate(1, None);
}
// add some to the mempool as well.
for _ in 0..20 {
test_client.receive(testutils! {
@tx ( (@external descriptors, 0) => 1_000 )
});
}
wallet.sync(noop_progress(), None).unwrap();
assert_eq!(wallet.get_balance().unwrap(), 100_000);
}
#[test] #[test]
fn test_update_confirmation_time_after_generate() { fn test_update_confirmation_time_after_generate() {
let (wallet, descriptors, mut test_client) = init_single_sig(); let (wallet, descriptors, mut test_client) = init_single_sig();

View File

@ -138,40 +138,6 @@ impl<Pk: MiniscriptKey + ToPublicKey> Satisfier<Pk> for Older {
pub(crate) type SecpCtx = Secp256k1<All>; pub(crate) type SecpCtx = Secp256k1<All>;
pub struct ChunksIterator<I: Iterator> {
iter: I,
size: usize,
}
#[cfg(any(feature = "electrum", feature = "esplora"))]
impl<I: Iterator> ChunksIterator<I> {
pub fn new(iter: I, size: usize) -> Self {
ChunksIterator { iter, size }
}
}
impl<I: Iterator> Iterator for ChunksIterator<I> {
type Item = Vec<<I as std::iter::Iterator>::Item>;
fn next(&mut self) -> Option<Self::Item> {
let mut v = Vec::new();
for _ in 0..self.size {
let e = self.iter.next();
match e {
None => break,
Some(val) => v.push(val),
}
}
if v.is_empty() {
return None;
}
Some(v)
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::{ use super::{