Compare commits

..

7 Commits

Author SHA1 Message Date
codeShark149
d2da3755f4 Add Peer Manager
The Peer Manager structure is responsible for maintaining a live
directory of connected CBF/Non-CBF peers. It can send and receive
messages from one or multiple peers.

It manages an underlying address manager to fetch network addresses,
and it maintains a ban score for each peer. A threshold banscore (100)
is used to signal banning of a peer. Once a peer is banned it will not
be connected to in subsequent runs.

Peer manager will be used to handle parallel fetching of filter data
from multiple peers.
2021-07-22 00:00:55 +05:30
Steve Myers
6acb4d9796 Ignore wip test 2021-07-20 09:45:34 -07:00
codeShark149
377e5cdd49 Update address manager test
The two tests have been combined into a single one. Added extra
log points for better debugging in CI if it gets stuck.
2021-07-20 19:55:16 +05:30
rajarshimaitra
70d2a0ee6b Return Option when getting addresses
When getting addresses return Option with Some vectors, or None in
case of empty vectors.

This makes handling the case of getting empty result easier.
2021-07-20 19:55:16 +05:30
codeShark149
de1fc2a677 Update compact_filters module
Compact filters module is updated to include the new Address Manager.

The new PeerError structure is added into CompactFiltersError to have
compatibility with existing APIs.

Minor cargo fmt nit fixes
2021-07-20 19:55:16 +05:30
codeShark149
671d90e57c Add Address Manager
Add the final address manager structures. Address Manager is responsible
for finding new peers from the network. It maintains a directory with
cbf/non_cbf catagories of peer. A Peer Manager will query this
Address Manager to provide it with fresh addresses.

Currently Address Manager is single threaded. When a user calls it to
fetch the network, it will block the call until fetch completes.

It knows when to stop fetching via config variable MIN_CBF_BUFFER and
MIN_NONCBF_BUFFER. And it stops fetching when buffer requirement is met.

Co-authored-by: John Cantrell <johncantrell97@protonmail.com>
2021-07-20 19:55:15 +05:30
codeShark149
9480faa5d3 Add PeerError structure in peer module
This adds a new PeerError structure in the peer module. To handle all
the peer related errors. PeerErrors contains all the mempool errors too,
for now. Later if we have a more complex mempool, we might decide to
have its own dedicated error.

PeerError is to be included in the global CompactFiltersError type.
2021-07-20 19:55:15 +05:30
31 changed files with 1967 additions and 1115 deletions

View File

@@ -24,7 +24,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Test
run: cargo test --features all-keys,compiler,esplora,ureq,compact_filters --no-default-features
run: cargo test --features all-keys,compiler,esplora,compact_filters --no-default-features
- id: coverage
name: Generate coverage

View File

@@ -16,16 +16,14 @@ jobs:
- default
- minimal
- all-keys
- minimal,use-esplora-ureq
- minimal,esplora
- key-value-db
- electrum
- compact_filters
- esplora,ureq,key-value-db,electrum
- esplora,key-value-db,electrum
- compiler
- rpc
- verify
- async-interface
- use-esplora-reqwest
steps:
- name: checkout
uses: actions/checkout@v2
@@ -95,6 +93,8 @@ jobs:
with:
path: |
~/.cargo/registry
~/.cargo/bitcoin
~/.cargo/electrs
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ github.job }}-${{ hashFiles('**/Cargo.toml','**/Cargo.lock') }}
@@ -137,8 +137,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Check
run: cargo check --target wasm32-unknown-unknown --features use-esplora-reqwest --no-default-features
run: cargo check --target wasm32-unknown-unknown --features esplora --no-default-features
fmt:
name: Rust fmt

View File

@@ -24,7 +24,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Build docs
run: cargo rustdoc --verbose --features=compiler,electrum,esplora,ureq,compact_filters,key-value-db,all-keys -- --cfg docsrs -Dwarnings
run: cargo rustdoc --verbose --features=compiler,electrum,esplora,compact_filters,key-value-db,all-keys -- --cfg docsrs -Dwarnings
- name: Upload artifact
uses: actions/upload-artifact@v2
with:

View File

@@ -6,23 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [v0.11.0] - [v0.10.0]
- Added `flush` method to the `Database` trait to explicitly flush to disk latest changes on the db.
## [v0.10.0] - [v0.9.0]
- Added `RpcBlockchain` in the `AnyBlockchain` struct to allow using Rpc backend where `AnyBlockchain` is used (eg `bdk-cli`)
- Removed hard dependency on `tokio`.
### Wallet
- Removed and replaced `set_single_recipient` with more general `drain_to` and replaced `maintain_single_recipient` with `allow_shrinking`.
### Blockchain
- Removed `stop_gap` from `Blockchain` trait and added it to only `ElectrumBlockchain` and `EsploraBlockchain` structs.
- Added a `ureq` backend for use when not using feature `async-interface` or target WASM. `ureq` is a blocking HTTP client.
- Removed `stop_gap` from `Blockchain` trait and added it to only `ElectrumBlockchain` and `EsploraBlockchain` structs
## [v0.9.0] - [v0.8.0]
@@ -364,7 +354,7 @@ final transaction is created by calling `finish` on the builder.
- Use `MemoryDatabase` in the compiler example
- Make the REPL return JSON
[unreleased]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...HEAD
[unreleased]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...HEAD
[0.1.0-beta.1]: https://github.com/bitcoindevkit/bdk/compare/96c87ea5...0.1.0-beta.1
[v0.2.0]: https://github.com/bitcoindevkit/bdk/compare/0.1.0-beta.1...v0.2.0
[v0.3.0]: https://github.com/bitcoindevkit/bdk/compare/v0.2.0...v0.3.0
@@ -375,5 +365,3 @@ final transaction is created by calling `finish` on the builder.
[v0.7.0]: https://github.com/bitcoindevkit/bdk/compare/v0.6.0...v0.7.0
[v0.8.0]: https://github.com/bitcoindevkit/bdk/compare/v0.7.0...v0.8.0
[v0.9.0]: https://github.com/bitcoindevkit/bdk/compare/v0.8.0...v0.9.0
[v0.10.0]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...v0.10.0
[v0.11.0]: https://github.com/bitcoindevkit/bdk/compare/v0.10.0...v0.11.0

View File

@@ -57,21 +57,6 @@ comment suggesting that you're working on it. If someone is already assigned,
don't hesitate to ask if the assigned party or previous commenters are still
working on it if it has been awhile.
Deprecation policy
------------------
Where possible, breaking existing APIs should be avoided. Instead, add new APIs and
use [`#[deprecated]`](https://github.com/rust-lang/rfcs/blob/master/text/1270-deprecation.md)
to discourage use of the old one.
Deprecated APIs are typically maintained for one release cycle. In other words, an
API that has been deprecated with the 0.10 release can be expected to be removed in the
0.11 release. This allows for smoother upgrades without incurring too much technical
debt inside this library.
If you deprecated an API as part of a contribution, we encourage you to "own" that API
and send a follow-up to remove it as part of the next release cycle.
Peer review
-----------

View File

@@ -1,6 +1,6 @@
[package]
name = "bdk"
version = "0.11.0"
version = "0.9.1-dev"
edition = "2018"
authors = ["Alekos Filini <alekos.filini@gmail.com>", "Riccardo Casatta <riccardo@casatta.it>"]
homepage = "https://bitcoindevkit.org"
@@ -12,22 +12,21 @@ readme = "README.md"
license = "MIT OR Apache-2.0"
[dependencies]
bdk-macros = "0.5"
bdk-macros = "^0.4"
log = "^0.4"
miniscript = "^6.0"
bitcoin = { version = "^0.27", features = ["use-serde", "base64"] }
miniscript = "5.1"
bitcoin = { version = "~0.26.2", features = ["use-serde", "base64"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = { version = "^1.0" }
rand = "^0.7"
# Optional dependencies
sled = { version = "0.34", optional = true }
electrum-client = { version = "0.8", optional = true }
electrum-client = { version = "0.7", optional = true }
reqwest = { version = "0.11", optional = true, features = ["json"] }
ureq = { version = "2.1", default-features = false, features = ["json"], optional = true }
futures = { version = "0.3", optional = true }
async-trait = { version = "0.1", optional = true }
rocksdb = { version = "0.14", default-features = false, features = ["snappy"], optional = true }
rocksdb = { version = "0.14", optional = true }
cc = { version = ">=1.0.64", optional = true }
socks = { version = "0.3", optional = true }
lazy_static = { version = "1.4", optional = true }
@@ -36,7 +35,11 @@ zeroize = { version = "<1.4.0", optional = true }
bitcoinconsensus = { version = "0.19.0-3", optional = true }
# Needed by bdk_blockchain_tests macro
core-rpc = { version = "0.14", optional = true }
bitcoincore-rpc = { version = "0.13", optional = true }
# Platform-specific dependencies
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1", features = ["rt"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
async-trait = "0.1"
@@ -48,46 +51,28 @@ minimal = []
compiler = ["miniscript/compiler"]
verify = ["bitcoinconsensus"]
default = ["key-value-db", "electrum"]
electrum = ["electrum-client"]
esplora = ["reqwest", "futures"]
compact_filters = ["rocksdb", "socks", "lazy_static", "cc"]
key-value-db = ["sled"]
async-interface = ["async-trait"]
all-keys = ["keys-bip39"]
keys-bip39 = ["tiny-bip39", "zeroize"]
rpc = ["core-rpc"]
# We currently provide mulitple implementations of `Blockchain`, all are
# blocking except for the `EsploraBlockchain` which can be either async or
# blocking, depending on the HTTP client in use.
#
# - Users wanting asynchronous HTTP calls should enable `async-interface` to get
# access to the asynchronous method implementations. Then, if Esplora is wanted,
# enable `esplora` AND `reqwest` (`--features=use-esplora-reqwest`).
# - Users wanting blocking HTTP calls can use any of the other blockchain
# implementations (`compact_filters`, `electrum`, or `esplora`). Users wanting to
# use Esplora should enable `esplora` AND `ureq` (`--features=use-esplora-ureq`).
#
# WARNING: Please take care with the features below, various combinations will
# fail to build. We cannot currently build `bdk` with `--all-features`.
async-interface = ["async-trait"]
electrum = ["electrum-client"]
# MUST ALSO USE `--no-default-features`.
use-esplora-reqwest = ["async-interface", "esplora", "reqwest", "futures"]
use-esplora-ureq = ["esplora", "ureq"]
# Typical configurations will not need to use `esplora` feature directly.
esplora = []
rpc = ["bitcoincore-rpc"]
# Debug/Test features
test-blockchains = ["core-rpc", "electrum-client"]
test-blockchains = ["bitcoincore-rpc", "electrum-client"]
test-electrum = ["electrum", "electrsd/electrs_0_8_10", "test-blockchains"]
test-rpc = ["rpc", "electrsd/electrs_0_8_10", "test-blockchains"]
test-esplora = ["esplora", "ureq", "electrsd/legacy", "electrsd/esplora_a33e97e1", "test-blockchains"]
test-esplora = ["esplora", "electrsd/legacy", "electrsd/esplora_a33e97e1", "test-blockchains"]
test-md-docs = ["electrum"]
[dev-dependencies]
lazy_static = "1.4"
env_logger = "0.7"
clap = "2.33"
electrsd = { version= "0.10", features = ["trigger", "bitcoind_0_21_1"] }
electrsd = { version= "0.6", features = ["trigger", "bitcoind_0_21_1"] }
[[example]]
name = "address_validator"
@@ -103,6 +88,6 @@ required-features = ["compiler"]
[workspace]
members = ["macros"]
[package.metadata.docs.rs]
features = ["compiler", "electrum", "esplora", "ureq", "compact_filters", "rpc", "key-value-db", "all-keys", "verify"]
features = ["compiler", "electrum", "esplora", "compact_filters", "rpc", "key-value-db", "all-keys", "verify"]
# defines the configuration attribute `docsrs`
rustdoc-args = ["--cfg", "docsrs"]

View File

@@ -1,6 +1,6 @@
[package]
name = "bdk-macros"
version = "0.5.0"
version = "0.4.0"
authors = ["Alekos Filini <alekos.filini@gmail.com>"]
edition = "2018"
homepage = "https://bitcoindevkit.org"

View File

@@ -121,3 +121,26 @@ pub fn maybe_await(expr: TokenStream) -> TokenStream {
quoted.into()
}
/// Awaits if target_arch is "wasm32", uses `tokio::Runtime::block_on()` otherwise
///
/// Requires the `tokio` crate as a dependecy with `rt-core` or `rt-threaded` to build on non-wasm32 platforms.
#[proc_macro]
pub fn await_or_block(expr: TokenStream) -> TokenStream {
let expr: proc_macro2::TokenStream = expr.into();
let quoted = quote! {
{
#[cfg(all(not(target_arch = "wasm32"), not(feature = "async-interface")))]
{
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(#expr)
}
#[cfg(any(target_arch = "wasm32", feature = "async-interface"))]
{
#expr.await
}
}
};
quoted.into()
}

View File

@@ -37,9 +37,9 @@
//! )?;
//! # }
//!
//! # #[cfg(all(feature = "esplora", feature = "ureq"))]
//! # #[cfg(feature = "esplora")]
//! # {
//! let esplora_blockchain = EsploraBlockchain::new("...", 20);
//! let esplora_blockchain = EsploraBlockchain::new("...", None, 20);
//! let wallet_esplora: Wallet<AnyBlockchain, _> = Wallet::new(
//! "...",
//! None,
@@ -60,8 +60,6 @@
//! # use bdk::blockchain::*;
//! # use bdk::database::MemoryDatabase;
//! # use bdk::Wallet;
//! # #[cfg(all(feature = "esplora", feature = "ureq"))]
//! # {
//! let config = serde_json::from_str("...")?;
//! let blockchain = AnyBlockchain::from_config(&config)?;
//! let wallet = Wallet::new(
@@ -71,7 +69,6 @@
//! MemoryDatabase::default(),
//! blockchain,
//! )?;
//! # }
//! # Ok::<(), bdk::Error>(())
//! ```
@@ -97,8 +94,6 @@ macro_rules! impl_inner_method {
AnyBlockchain::Esplora(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "compact_filters")]
AnyBlockchain::CompactFilters(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "rpc")]
AnyBlockchain::Rpc(inner) => inner.$name( $($args, )* ),
}
}
}
@@ -121,10 +116,6 @@ pub enum AnyBlockchain {
#[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))]
/// Compact filters client
CompactFilters(compact_filters::CompactFiltersBlockchain),
#[cfg(feature = "rpc")]
#[cfg_attr(docsrs, doc(cfg(feature = "rpc")))]
/// RPC client
Rpc(rpc::RpcBlockchain),
}
#[maybe_async]
@@ -166,7 +157,6 @@ impl Blockchain for AnyBlockchain {
impl_from!(electrum::ElectrumBlockchain, AnyBlockchain, Electrum, #[cfg(feature = "electrum")]);
impl_from!(esplora::EsploraBlockchain, AnyBlockchain, Esplora, #[cfg(feature = "esplora")]);
impl_from!(compact_filters::CompactFiltersBlockchain, AnyBlockchain, CompactFilters, #[cfg(feature = "compact_filters")]);
impl_from!(rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")]);
/// Type that can contain any of the blockchain configurations defined by the library
///
@@ -216,10 +206,6 @@ pub enum AnyBlockchainConfig {
#[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))]
/// Compact filters client
CompactFilters(compact_filters::CompactFiltersBlockchainConfig),
#[cfg(feature = "rpc")]
#[cfg_attr(docsrs, doc(cfg(feature = "rpc")))]
/// RPC client configuration
Rpc(rpc::RpcConfig),
}
impl ConfigurableBlockchain for AnyBlockchain {
@@ -239,10 +225,6 @@ impl ConfigurableBlockchain for AnyBlockchain {
AnyBlockchainConfig::CompactFilters(inner) => AnyBlockchain::CompactFilters(
compact_filters::CompactFiltersBlockchain::from_config(inner)?,
),
#[cfg(feature = "rpc")]
AnyBlockchainConfig::Rpc(inner) => {
AnyBlockchain::Rpc(rpc::RpcBlockchain::from_config(inner)?)
}
})
}
}
@@ -250,4 +232,3 @@ impl ConfigurableBlockchain for AnyBlockchain {
impl_from!(electrum::ElectrumBlockchainConfig, AnyBlockchainConfig, Electrum, #[cfg(feature = "electrum")]);
impl_from!(esplora::EsploraBlockchainConfig, AnyBlockchainConfig, Esplora, #[cfg(feature = "esplora")]);
impl_from!(compact_filters::CompactFiltersBlockchainConfig, AnyBlockchainConfig, CompactFilters, #[cfg(feature = "compact_filters")]);
impl_from!(rpc::RpcConfig, AnyBlockchainConfig, Rpc, #[cfg(feature = "rpc")]);

View File

@@ -0,0 +1,764 @@
// Bitcoin Dev Kit
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
// John Cantrell <johncantrell97@protonmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
use std::fs::File;
use std::io::prelude::*;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, SendError, Sender},
Arc, RwLock,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use super::{Mempool, Peer, PeerError};
use bitcoin::network::{
constants::{Network, ServiceFlags},
message::NetworkMessage,
Address,
};
/// Default address pool minimums
const MIN_CBF_BUFFER: usize = 5;
const MIN_NONCBF_BUFFER: usize = 5;
/// A Discovery structure used by workers
///
/// Discovery can be initiated via a cache,
/// Or it will start with default hardcoded seeds
pub struct AddressDiscovery {
pending: VecDeque<SocketAddr>,
visited: HashSet<SocketAddr>,
}
impl AddressDiscovery {
fn new(network: Network, seeds: VecDeque<SocketAddr>) -> AddressDiscovery {
let mut network_seeds = AddressDiscovery::seeds(network);
let mut total_seeds = seeds;
total_seeds.append(&mut network_seeds);
AddressDiscovery {
pending: total_seeds,
visited: HashSet::new(),
}
}
fn add_pendings(&mut self, addresses: Vec<SocketAddr>) {
for addr in addresses {
if !self.pending.contains(&addr) && !self.visited.contains(&addr) {
self.pending.push_back(addr);
}
}
}
fn get_next(&mut self) -> Option<SocketAddr> {
match self.pending.pop_front() {
None => None,
Some(next) => {
self.visited.insert(next);
Some(next)
}
}
}
fn seeds(network: Network) -> VecDeque<SocketAddr> {
let mut seeds = VecDeque::new();
let port: u16 = match network {
Network::Bitcoin => 8333,
Network::Testnet => 18333,
Network::Regtest => 18444,
Network::Signet => 38333,
};
let seedhosts: &[&str] = match network {
Network::Bitcoin => &[
"seed.bitcoin.sipa.be",
"dnsseed.bluematt.me",
"dnsseed.bitcoin.dashjr.org",
"seed.bitcoinstats.com",
"seed.bitcoin.jonasschnelli.ch",
"seed.btc.petertodd.org",
"seed.bitcoin.sprovoost.nl",
"dnsseed.emzy.de",
"seed.bitcoin.wiz.biz",
],
Network::Testnet => &[
"testnet-seed.bitcoin.jonasschnelli.ch",
"seed.tbtc.petertodd.org",
"seed.testnet.bitcoin.sprovoost.nl",
"testnet-seed.bluematt.me",
],
Network::Regtest => &[],
Network::Signet => &[],
};
for seedhost in seedhosts.iter() {
if let Ok(lookup) = (*seedhost, port).to_socket_addrs() {
for host in lookup {
if host.is_ipv4() {
seeds.push_back(host);
}
}
}
}
seeds
}
}
/// Crawler structure that will interface with Discovery and public bitcoin network
///
/// Address manager will spawn multiple crawlers in separate threads to discover new addresses.
struct AddressWorker {
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
}
impl AddressWorker {
fn new(
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
) -> AddressWorker {
AddressWorker {
discovery,
sender,
network,
}
}
fn try_receive_addr(&mut self, peer: &Peer) -> Result<(), AddressManagerError> {
if let Some(NetworkMessage::Addr(new_addresses)) =
peer.recv("addr", Some(Duration::from_secs(1)))?
{
self.consume_addr(new_addresses)?;
}
Ok(())
}
fn consume_addr(&mut self, addrs: Vec<(u32, Address)>) -> Result<(), AddressManagerError> {
let mut discovery_lock = self.discovery.write().map_err(PeerError::from)?;
let mut addresses = Vec::new();
for network_addrs in addrs {
if let Ok(socket_addrs) = network_addrs.1.socket_addr() {
addresses.push(socket_addrs);
}
}
discovery_lock.add_pendings(addresses);
Ok(())
}
fn work(&mut self) -> Result<(), AddressManagerError> {
loop {
let next_address = {
let mut address_discovery = self.discovery.write()?;
address_discovery.get_next()
};
match next_address {
Some(address) => {
let potential_peer = Peer::connect_with_timeout(
address,
Duration::from_secs(1),
Arc::new(Mempool::default()),
self.network,
);
if let Ok(peer) = potential_peer {
peer.send(NetworkMessage::GetAddr)?;
self.try_receive_addr(&peer)?;
self.try_receive_addr(&peer)?;
self.sender.send((address, peer.get_version().services))?;
// TODO: Investigate why close is being called on non existent connections
// currently the errors are ignored
peer.close().unwrap_or(());
}
}
None => continue,
}
}
}
}
/// A dedicated cache structure, with cbf/non_cbf separation
///
/// [AddressCache] will interface with file i/o
/// And can te turned into seeds. Generation of seed will put previously cached
/// cbf addresses at front of the vec, to boost up cbf node findings
#[derive(Serialize, Deserialize)]
struct AddressCache {
banned_peers: HashSet<SocketAddr>,
cbf: HashSet<SocketAddr>,
non_cbf: HashSet<SocketAddr>,
}
impl AddressCache {
fn empty() -> Self {
Self {
banned_peers: HashSet::new(),
cbf: HashSet::new(),
non_cbf: HashSet::new(),
}
}
fn from_file(path: &str) -> Result<Option<Self>, AddressManagerError> {
let serialized: Result<String, _> = std::fs::read_to_string(path);
let serialized = match serialized {
Ok(contents) => contents,
Err(_) => return Ok(None),
};
let address_cache = serde_json::from_str(&serialized)?;
Ok(Some(address_cache))
}
fn write_to_file(&self, path: &str) -> Result<(), AddressManagerError> {
let serialized = serde_json::to_string_pretty(&self)?;
let mut cache_file = File::create(path)?;
cache_file.write_all(serialized.as_bytes())?;
Ok(())
}
fn make_seeds(&self) -> VecDeque<SocketAddr> {
self.cbf
.iter()
.chain(self.non_cbf.iter())
.copied()
.collect()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.remove(addrs)
} else {
self.non_cbf.remove(addrs)
}
}
fn add_address(&mut self, addrs: SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.insert(addrs)
} else {
self.non_cbf.insert(addrs)
}
}
fn add_to_banlist(&mut self, addrs: SocketAddr, cbf: bool) {
if self.banned_peers.insert(addrs) {
self.remove_address(&addrs, cbf);
}
}
}
/// A Live directory maintained by [AddressManager] of freshly found cbf and non_cbf nodes by workers
///
/// Each instance of new [AddressManager] with have fresh [AddressDirectory]
/// This is independent from the cache and will be an in-memory database to
/// fetch addresses to the user.
struct AddressDirectory {
cbf_nodes: HashSet<SocketAddr>,
non_cbf_nodes: HashSet<SocketAddr>,
// List of addresses it has previously provided to the caller (PeerManager)
previously_sent: HashSet<SocketAddr>,
}
impl AddressDirectory {
fn new() -> AddressDirectory {
AddressDirectory {
cbf_nodes: HashSet::new(),
non_cbf_nodes: HashSet::new(),
previously_sent: HashSet::new(),
}
}
fn add_address(&mut self, addr: SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.insert(addr);
} else {
self.non_cbf_nodes.insert(addr);
}
}
fn get_new_address(&mut self, cbf: bool) -> Option<SocketAddr> {
if cbf {
if let Some(new_addresses) = self
.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
} else if let Some(new_addresses) = self
.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
}
fn get_cbf_address_count(&self) -> usize {
self.cbf_nodes.len()
}
fn get_non_cbf_address_count(&self) -> usize {
self.non_cbf_nodes.len()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.remove(addrs);
} else {
self.non_cbf_nodes.remove(addrs);
}
}
fn get_cbf_buffer(&self) -> usize {
self.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
fn get_non_cbf_buffer(&self) -> usize {
self.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
}
/// Discovery statistics, useful for logging
#[derive(Clone, Copy)]
pub struct DiscoveryData {
queued: usize,
visited: usize,
non_cbf_count: usize,
cbf_count: usize,
}
/// Progress trait for discovery statistics logging
pub trait DiscoveryProgress {
/// Update progress
fn update(&self, data: DiscoveryData);
}
/// Used when progress updates are not desired
#[derive(Clone)]
pub struct NoDiscoveryProgress;
impl DiscoveryProgress for NoDiscoveryProgress {
fn update(&self, _data: DiscoveryData) {}
}
/// Used to log progress update
#[derive(Clone)]
pub struct LogDiscoveryProgress;
impl DiscoveryProgress for LogDiscoveryProgress {
fn update(&self, data: DiscoveryData) {
log::trace!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued,
data.visited,
data.non_cbf_count,
data.cbf_count
);
#[cfg(test)]
println!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued, data.visited, data.non_cbf_count, data.cbf_count
);
}
}
/// A manager structure managing address discovery
///
/// Manager will try to maintain a given address buffer in its directory
/// buffer = len(exiting addresses) - len(previously provided addresses)
/// Manager will crawl the network until buffer criteria is satisfied
/// Manager will bootstrap workers from a cache, to speed up discovery progress in
/// subsequent call after the first crawl.
/// Manager will keep track of the cache and only update it if previously
/// unknown addresses are found.
pub struct AddressManager<P: DiscoveryProgress> {
directory: AddressDirectory,
cache_filename: String,
discovery: Arc<RwLock<AddressDiscovery>>,
threads: usize,
receiver: Receiver<(SocketAddr, ServiceFlags)>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
cbf_buffer: usize,
non_cbf_buffer: usize,
progress: P,
}
impl<P: DiscoveryProgress> AddressManager<P> {
/// Create a new manager. Initiate Discovery seeds from the cache
/// if it exists, else start with hardcoded seeds
pub fn new(
network: Network,
cache_filename: String,
threads: usize,
cbf_buffer: Option<usize>,
non_cbf_buffer: Option<usize>,
progress: P,
) -> Result<AddressManager<P>, AddressManagerError> {
let (sender, receiver) = channel();
let seeds = match AddressCache::from_file(&cache_filename)? {
Some(cache) => cache.make_seeds(),
None => VecDeque::new(),
};
let min_cbf = cbf_buffer.unwrap_or(MIN_CBF_BUFFER);
let min_non_cbf = non_cbf_buffer.unwrap_or(MIN_NONCBF_BUFFER);
let discovery = AddressDiscovery::new(network, seeds);
Ok(AddressManager {
cache_filename,
directory: AddressDirectory::new(),
discovery: Arc::new(RwLock::new(discovery)),
sender,
receiver,
network,
threads,
cbf_buffer: min_cbf,
non_cbf_buffer: min_non_cbf,
progress,
})
}
/// Get running address discovery progress
fn get_progress(&self) -> Result<DiscoveryData, AddressManagerError> {
let (queued_count, visited_count) = {
let address_discovery = self.discovery.read()?;
(
address_discovery.pending.len(),
address_discovery.visited.len(),
)
};
let cbf_node_count = self.directory.get_cbf_address_count();
let other_node_count = self.directory.get_non_cbf_address_count();
Ok(DiscoveryData {
queued: queued_count,
visited: visited_count,
non_cbf_count: cbf_node_count + other_node_count,
cbf_count: cbf_node_count,
})
}
/// Spawn [self.thread] no. of worker threads
fn spawn_workers(&mut self) -> Vec<JoinHandle<()>> {
let mut worker_handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..self.threads {
let sender = self.sender.clone();
let discovery = self.discovery.clone();
let network = self.network;
let worker_handle = thread::spawn(move || {
let mut worker = AddressWorker::new(discovery, sender, network);
worker.work().unwrap();
});
worker_handles.push(worker_handle);
}
worker_handles
}
/// Crawl the Bitcoin network until required number of cbf/non_cbf nodes are found
///
/// - This will start a bunch of crawlers.
/// - load up the existing cache.
/// - Update the cache with new found peers.
/// - check if address is in banlist
/// - run crawlers until buffer requirement is matched
/// - flush the current cache into disk
pub fn fetch(&mut self) -> Result<(), AddressManagerError> {
self.spawn_workers();
// Get already existing cache
let mut cache = match AddressCache::from_file(&self.cache_filename)? {
Some(cache) => cache,
None => AddressCache::empty(),
};
while self.directory.get_cbf_buffer() < self.cbf_buffer
|| self.directory.get_non_cbf_buffer() < self.non_cbf_buffer
{
if let Ok(message) = self.receiver.recv() {
let (addr, flag) = message;
if !cache.banned_peers.contains(&addr) {
let cbf = flag.has(ServiceFlags::COMPACT_FILTERS);
self.directory.add_address(addr, cbf);
cache.add_address(addr, cbf);
}
}
}
self.progress.update(self.get_progress()?);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename)?;
Ok(())
}
/// Get a new addresses not previously provided
pub fn get_new_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(true)
}
/// Get a new non_cbf address
pub fn get_new_non_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(false)
}
/// Ban an address
pub fn ban_peer(&mut self, addrs: &SocketAddr, cbf: bool) -> Result<(), AddressManagerError> {
let mut cache = AddressCache::from_file(&self.cache_filename)?.ok_or_else(|| {
AddressManagerError::Generic("Address Cache file not found".to_string())
})?;
cache.add_to_banlist(*addrs, cbf);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename).unwrap();
self.directory.remove_address(addrs, cbf);
Ok(())
}
/// Get all the known CBF addresses
pub fn get_known_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get all the known regular addresses
pub fn get_known_non_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.non_cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get previously tried addresses
pub fn get_previously_tried(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.previously_sent
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
}
#[derive(Debug)]
pub enum AddressManagerError {
/// Std I/O Error
Io(std::io::Error),
/// Internal Peer error
Peer(PeerError),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedOut,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Internal MPSC sending error
MpscSendError,
/// Serde Json Error
SerdeJson(SerdeError),
/// Generic Errors
Generic(String),
}
impl_error!(PeerError, Peer, AddressManagerError);
impl_error!(std::io::Error, Io, AddressManagerError);
impl_error!(SerdeError, SerdeJson, AddressManagerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
AddressManagerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
AddressManagerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
AddressManagerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for AddressManagerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
AddressManagerError::MutexTimedOut
} else {
AddressManagerError::MutexPoisoned
}
}
}
impl<T> From<SendError<T>> for AddressManagerError {
fn from(_: SendError<T>) -> Self {
AddressManagerError::MpscSendError
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
#[ignore]
fn test_address_manager() {
// Initiate a manager with an non existent cache file name.
// It will create a new cache file
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting manager and initial fetch");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration1 = start.elapsed();
println!("Completed Initial fetch");
// Create a new manager from existing cache and fetch again
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting new fetch with previous cache");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration2 = start.elapsed();
println!("Completed new fetch()");
println!("Time taken for initial crawl: {:#?}", duration1);
println!("Time taken for next crawl {:#?}", duration2);
// Check Buffer Management
println!("Checking buffer management");
// Fetch few new address and ensure buffer goes to zero
let mut addrs_list = Vec::new();
for _ in 0..5 {
let addr_cbf = manager.get_new_cbf_address().unwrap();
let addrs_non_cbf = manager.get_new_non_cbf_address().unwrap();
addrs_list.push(addr_cbf);
addrs_list.push(addrs_non_cbf);
}
assert_eq!(addrs_list.len(), 10);
// This should exhaust the cbf buffer
assert_eq!(manager.directory.get_cbf_buffer(), 0);
// Calling fetch again should start crawlers until buffer
// requirements are matched.
println!("Address buffer exhausted, starting new fetch");
manager.fetch().unwrap();
println!("Fetch Complete");
// It should again have a cbf buffer of 5
assert_eq!(manager.directory.get_cbf_buffer(), 5);
println!("Buffer management passed");
}
}

View File

@@ -63,7 +63,9 @@ use bitcoin::{Network, OutPoint, Transaction, Txid};
use rocksdb::{Options, SliceTransform, DB};
mod address_manager;
mod peer;
mod peermngr;
mod store;
mod sync;
@@ -77,8 +79,11 @@ use peer::*;
use store::*;
use sync::*;
// Only added to avoid unused warnings in addrsmngr module
pub use address_manager::{
AddressManager, DiscoveryProgress, LogDiscoveryProgress, NoDiscoveryProgress,
};
pub use peer::{Mempool, Peer};
const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
@@ -371,10 +376,10 @@ impl Blockchain for CompactFiltersBlockchain {
database.commit_batch(updates)?;
match first_peer.ask_for_mempool() {
Err(CompactFiltersError::PeerBloomDisabled) => {
Err(PeerError::PeerBloomDisabled(_)) => {
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
}
e => e?,
e => e.map_err(CompactFiltersError::from)?,
};
let mut internal_max_deriv = None;
@@ -392,7 +397,12 @@ impl Blockchain for CompactFiltersBlockchain {
)?;
}
}
for tx in first_peer.get_mempool().iter_txs().iter() {
for tx in first_peer
.get_mempool()
.iter_txs()
.map_err(CompactFiltersError::from)?
.iter()
{
self.process_tx(
database,
tx,
@@ -435,11 +445,14 @@ impl Blockchain for CompactFiltersBlockchain {
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.peers[0]
.get_mempool()
.get_tx(&Inventory::Transaction(*txid)))
.get_tx(&Inventory::Transaction(*txid))
.map_err(CompactFiltersError::from)?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
self.peers[0].broadcast_tx(tx.clone())?;
self.peers[0]
.broadcast_tx(tx.clone())
.map_err(CompactFiltersError::from)?;
Ok(())
}
@@ -487,7 +500,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.peers
.iter()
.map(|peer_conf| match &peer_conf.socks5 {
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network)
.map_err(CompactFiltersError::from),
Some(proxy) => Peer::connect_proxy(
peer_conf.address.as_str(),
proxy,
@@ -497,7 +511,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.map(|(a, b)| (a.as_str(), b.as_str())),
Arc::clone(&mempool),
config.network,
),
)
.map_err(CompactFiltersError::from),
})
.collect::<Result<_, _>>()?;
@@ -546,6 +561,9 @@ pub enum CompactFiltersError {
/// Wrapper for [`crate::error::Error`]
Global(Box<crate::error::Error>),
/// Internal Peer Error
Peer(PeerError),
}
impl fmt::Display for CompactFiltersError {
@@ -560,6 +578,7 @@ impl_error!(rocksdb::Error, Db, CompactFiltersError);
impl_error!(std::io::Error, Io, CompactFiltersError);
impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError);
impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
impl_error!(PeerError, Peer, CompactFiltersError);
impl From<crate::error::Error> for CompactFiltersError {
fn from(err: crate::error::Error) -> Self {

View File

@@ -10,11 +10,15 @@
// licenses.
use std::collections::HashMap;
use std::net::{TcpStream, ToSocketAddrs};
use std::fmt;
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use socks::{Socks5Stream, ToTargetAddr};
use rand::{thread_rng, Rng};
@@ -30,8 +34,6 @@ use bitcoin::network::stream_reader::StreamReader;
use bitcoin::network::Address;
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
use super::CompactFiltersError;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
pub(crate) const TIMEOUT_SECS: u64 = 30;
@@ -65,17 +67,18 @@ impl Mempool {
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
let mut guard = self.0.write().unwrap();
pub fn add_tx(&self, tx: Transaction) -> Result<(), PeerError> {
let mut guard = self.0.write()?;
guard.wtxids.insert(tx.wtxid(), tx.txid());
guard.txs.insert(tx.txid(), tx);
Ok(())
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
pub fn get_tx(&self, inventory: &Inventory) -> Result<Option<Transaction>, PeerError> {
let identifer = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return Ok(None),
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
@@ -85,27 +88,34 @@ impl Mempool {
inv_type,
hash
);
return None;
return Ok(None);
}
};
let txid = match identifer {
TxIdentifier::Txid(txid) => Some(txid),
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
TxIdentifier::Wtxid(wtxid) => self.0.read()?.wtxids.get(&wtxid).cloned(),
};
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
.flatten()
let result = match txid {
Some(txid) => {
let read_lock = self.0.read()?;
read_lock.txs.get(&txid).cloned()
}
None => None,
};
Ok(result)
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
self.0.read().unwrap().txs.contains_key(txid)
pub fn has_tx(&self, txid: &Txid) -> Result<bool, PeerError> {
Ok(self.0.read()?.txs.contains_key(txid))
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
self.0.read().unwrap().txs.values().cloned().collect()
pub fn iter_txs(&self) -> Result<Vec<Transaction>, PeerError> {
Ok(self.0.read()?.txs.values().cloned().collect())
}
}
@@ -133,12 +143,31 @@ impl Peer {
address: A,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let stream = TcpStream::connect(address)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer over a plaintext TCP connection with a timeout
///
/// This function behaves exactly the same as `connect` except for two differences
/// 1) It assumes your ToSocketAddrs will resolve to a single address
/// 2) It lets you specify a connection timeout
pub fn connect_with_timeout<A: ToSocketAddrs>(
address: A,
timeout: Duration,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, PeerError> {
let socket_addr = address
.to_socket_addrs()?
.next()
.ok_or(PeerError::AddresseResolution)?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
/// as a tuple of `(username, password)`
///
@@ -150,7 +179,7 @@ impl Peer {
credentials: Option<(&str, &str)>,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let socks_stream = if let Some((username, password)) = credentials {
Socks5Stream::connect_with_password(proxy, target, username, password)?
} else {
@@ -165,12 +194,12 @@ impl Peer {
stream: TcpStream,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let writer = Arc::new(Mutex::new(stream.try_clone()?));
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
let connected = Arc::new(RwLock::new(true));
let mut locked_writer = writer.lock().unwrap();
let mut locked_writer = writer.lock()?;
let reader_thread_responses = Arc::clone(&responses);
let reader_thread_writer = Arc::clone(&writer);
@@ -185,6 +214,7 @@ impl Peer {
reader_thread_mempool,
reader_thread_connected,
)
.unwrap()
});
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
@@ -209,18 +239,20 @@ impl Peer {
0,
)),
)?;
let version = if let NetworkMessage::Version(version) =
Self::_recv(&responses, "version", None).unwrap()
{
version
} else {
return Err(CompactFiltersError::InvalidResponse);
let version = match Self::_recv(&responses, "version", Some(Duration::from_secs(1)))? {
Some(NetworkMessage::Version(version)) => version,
_ => {
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
};
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
if let Some(NetworkMessage::Verack) =
Self::_recv(&responses, "verack", Some(Duration::from_secs(1)))?
{
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
} else {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
std::mem::drop(locked_writer);
@@ -236,19 +268,26 @@ impl Peer {
})
}
/// Close the peer connection
// Consume Self
pub fn close(self) -> Result<(), PeerError> {
let locked_writer = self.writer.lock()?;
Ok((*locked_writer).shutdown(std::net::Shutdown::Both)?)
}
/// Get the socket address of the remote peer
pub fn get_address(&self) -> Result<SocketAddr, PeerError> {
let locked_writer = self.writer.lock()?;
Ok(locked_writer.peer_addr()?)
}
/// Send a Bitcoin network message
fn _send(
writer: &mut TcpStream,
magic: u32,
payload: NetworkMessage,
) -> Result<(), CompactFiltersError> {
fn _send(writer: &mut TcpStream, magic: u32, payload: NetworkMessage) -> Result<(), PeerError> {
log::trace!("==> {:?}", payload);
let raw_message = RawNetworkMessage { magic, payload };
raw_message
.consensus_encode(writer)
.map_err(|_| CompactFiltersError::DataCorruption)?;
raw_message.consensus_encode(writer)?;
Ok(())
}
@@ -258,30 +297,30 @@ impl Peer {
responses: &Arc<RwLock<ResponsesMap>>,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Option<NetworkMessage> {
) -> Result<Option<NetworkMessage>, PeerError> {
let message_resp = {
let mut lock = responses.write().unwrap();
let mut lock = responses.write()?;
let message_resp = lock.entry(wait_for).or_default();
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
while messages.is_empty() {
match timeout {
None => messages = cvar.wait(messages).unwrap(),
None => messages = cvar.wait(messages)?,
Some(t) => {
let result = cvar.wait_timeout(messages, t).unwrap();
let result = cvar.wait_timeout(messages, t)?;
if result.1.timed_out() {
return None;
return Ok(None);
}
messages = result.0;
}
}
}
messages.pop()
Ok(messages.pop())
}
/// Return the [`VersionMessage`] sent by the peer
@@ -300,8 +339,8 @@ impl Peer {
}
/// Return whether or not the peer is still connected
pub fn is_connected(&self) -> bool {
*self.connected.read().unwrap()
pub fn is_connected(&self) -> Result<bool, PeerError> {
Ok(*self.connected.read()?)
}
/// Internal function called once the `reader_thread` is spawned
@@ -312,14 +351,14 @@ impl Peer {
reader_thread_writer: Arc<Mutex<TcpStream>>,
reader_thread_mempool: Arc<Mempool>,
reader_thread_connected: Arc<RwLock<bool>>,
) {
) -> Result<(), PeerError> {
macro_rules! check_disconnect {
($call:expr) => {
match $call {
Ok(good) => good,
Err(e) => {
log::debug!("Error {:?}", e);
*reader_thread_connected.write().unwrap() = false;
*reader_thread_connected.write()? = false;
break;
}
@@ -328,7 +367,7 @@ impl Peer {
}
let mut reader = StreamReader::new(connection, None);
loop {
while *reader_thread_connected.read()? {
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
let in_message = if raw_message.magic != network.magic() {
@@ -342,7 +381,7 @@ impl Peer {
match in_message {
NetworkMessage::Ping(nonce) => {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Pong(nonce),
));
@@ -353,19 +392,21 @@ impl Peer {
NetworkMessage::GetData(ref inv) => {
let (found, not_found): (Vec<_>, Vec<_>) = inv
.iter()
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
.map(|item| (*item, reader_thread_mempool.get_tx(item).unwrap()))
.partition(|(_, d)| d.is_some());
for (_, found_tx) in found {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Tx(found_tx.unwrap()),
NetworkMessage::Tx(found_tx.ok_or_else(|| PeerError::Generic(
"Got None while expecting Transaction".to_string()
))?),
));
}
if !not_found.is_empty() {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::NotFound(
not_found.into_iter().map(|(i, _)| i).collect(),
@@ -377,21 +418,23 @@ impl Peer {
}
let message_resp = {
let mut lock = reader_thread_responses.write().unwrap();
let mut lock = reader_thread_responses.write()?;
let message_resp = lock.entry(in_message.cmd()).or_default();
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
messages.push(in_message);
cvar.notify_all();
}
Ok(())
}
/// Send a raw Bitcoin message to the peer
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
let mut writer = self.writer.lock().unwrap();
pub fn send(&self, payload: NetworkMessage) -> Result<(), PeerError> {
let mut writer = self.writer.lock()?;
Self::_send(&mut writer, self.network.magic(), payload)
}
@@ -400,30 +443,27 @@ impl Peer {
&self,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
Ok(Self::_recv(&self.responses, wait_for, timeout))
) -> Result<Option<NetworkMessage>, PeerError> {
Self::_recv(&self.responses, wait_for, timeout)
}
}
pub trait CompactFiltersPeer {
fn get_cf_checkpt(
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError>;
fn get_cf_checkpt(&self, filter_type: u8, stop_hash: BlockHash)
-> Result<CFCheckpt, PeerError>;
fn get_cf_headers(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError>;
) -> Result<CFHeaders, PeerError>;
fn get_cf_filters(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
) -> Result<(), PeerError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError>;
}
impl CompactFiltersPeer for Peer {
@@ -431,22 +471,20 @@ impl CompactFiltersPeer for Peer {
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError> {
) -> Result<CFCheckpt, PeerError> {
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
filter_type,
stop_hash,
}))?;
let response = self
.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFCheckpt(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFCheckpt(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
@@ -457,35 +495,31 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError> {
) -> Result<CFHeaders, PeerError> {
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
filter_type,
start_height,
stop_hash,
}))?;
let response = self
.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFHeaders(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFHeaders(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
}
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
let response = self
.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError> {
let response = self.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFilter(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFilter(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
Ok(response)
@@ -496,7 +530,7 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError> {
) -> Result<(), PeerError> {
self.send(NetworkMessage::GetCFilters(GetCFilters {
filter_type,
start_height,
@@ -508,13 +542,13 @@ impl CompactFiltersPeer for Peer {
}
pub trait InvPeer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError>;
fn ask_for_mempool(&self) -> Result<(), PeerError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError>;
}
impl InvPeer for Peer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError> {
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
block_hash,
)]))?;
@@ -522,51 +556,126 @@ impl InvPeer for Peer {
match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
None => Ok(None),
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
_ => Err(CompactFiltersError::InvalidResponse),
_ => Err(PeerError::InvalidResponse(self.get_address()?)),
}
}
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
fn ask_for_mempool(&self) -> Result<(), PeerError> {
if !self.version.services.has(ServiceFlags::BLOOM) {
return Err(CompactFiltersError::PeerBloomDisabled);
return Err(PeerError::PeerBloomDisabled(self.get_address()?));
}
self.send(NetworkMessage::MemPool)?;
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
None => return Ok(()), // empty mempool
Some(NetworkMessage::Inv(inv)) => inv,
_ => return Err(CompactFiltersError::InvalidResponse),
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
let getdata = inv
.iter()
.cloned()
.filter(
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid).unwrap()),
)
.collect::<Vec<_>>();
let num_txs = getdata.len();
self.send(NetworkMessage::GetData(getdata))?;
for _ in 0..num_txs {
let tx = self
.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let tx = self.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let tx = match tx {
NetworkMessage::Tx(tx) => tx,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::Tx(tx)) => tx,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
self.mempool.add_tx(tx);
self.mempool.add_tx(tx)?;
}
Ok(())
}
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
self.mempool.add_tx(tx.clone());
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError> {
self.mempool.add_tx(tx.clone())?;
self.send(NetworkMessage::Tx(tx))?;
Ok(())
}
}
/// Peer Errors
#[derive(Debug)]
pub enum PeerError {
/// Internal I/O error
Io(std::io::Error),
/// Internal system time error
Time(std::time::SystemTimeError),
/// A peer sent an invalid or unexpected response
InvalidResponse(SocketAddr),
/// Peer had bloom filter disabled
PeerBloomDisabled(SocketAddr),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedout,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Mempool Mutex poisoned
MempoolPoisoned,
/// Network address resolution Error
AddresseResolution,
/// Generic Errors
Generic(String),
}
impl std::fmt::Display for PeerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for PeerError {}
impl_error!(std::io::Error, Io, PeerError);
impl_error!(std::time::SystemTimeError, Time, PeerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
PeerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
PeerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
PeerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for PeerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
PeerError::MutexTimedout
} else {
PeerError::MutexPoisoned
}
}
}

View File

@@ -0,0 +1,578 @@
use super::address_manager::{AddressManager, AddressManagerError, DiscoveryProgress};
use super::peer::{Mempool, Peer, PeerError, TIMEOUT_SECS};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time;
use bitcoin::network::constants::{Network, ServiceFlags};
use bitcoin::network::message::NetworkMessage;
use std::path::PathBuf;
use std::collections::BTreeMap;
// Peer Manager Configuration constants
const MIN_CBF_PEERS: usize = 2;
const MIN_TOTAL_PEERS: usize = 5;
const MIN_CRAWLER_THREADS: usize = 20;
const BAN_SCORE_THRESHOLD: usize = 100;
const RECEIVE_TIMEOUT: time::Duration = time::Duration::from_secs(TIMEOUT_SECS);
#[allow(dead_code)]
/// An Error structure describing Peer Management errors
#[derive(Debug)]
pub enum PeerManagerError {
// Internal Peer Error
Peer(PeerError),
// Internal AddressManager Error
AddrsManager(AddressManagerError),
// Os String Error
OsString(std::ffi::OsString),
// Peer not found in directory
PeerNotFound,
// Generic Internal Error
Generic(String),
}
impl_error!(PeerError, Peer, PeerManagerError);
impl_error!(AddressManagerError, AddrsManager, PeerManagerError);
impl_error!(std::ffi::OsString, OsString, PeerManagerError);
/// Peer Data stored in the manager's directory
#[derive(Debug)]
struct PeerData {
peer: Peer,
is_cbf: bool,
ban_score: usize,
}
#[allow(dead_code)]
/// A Directory structure to hold live Peers
/// All peers in the directory have live ongoing connection
/// Banning a peer removes it from the directory
#[derive(Default, Debug)]
struct PeerDirectory {
peers: BTreeMap<SocketAddr, PeerData>,
}
#[allow(dead_code)]
impl PeerDirectory {
fn new() -> Self {
Self::default()
}
fn get_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let cbf_peers = self
.peers
.iter()
.filter(|(_, peer)| peer.is_cbf)
.map(|(_, peer)| peer)
.collect::<Vec<&PeerData>>();
match cbf_peers.len() {
0 => None,
_ => Some(cbf_peers),
}
}
fn get_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let cbf_addrseses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match cbf_addrseses.len() {
0 => None,
_ => Some(cbf_addrseses),
}
}
fn get_non_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let non_cbf_peers = self
.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&PeerData>>();
match non_cbf_peers.len() {
0 => None,
_ => Some(non_cbf_peers),
}
}
fn get_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if !peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
fn get_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_non_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| peerdata.is_cbf)
.count()
}
fn get_non_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.count()
}
fn insert_peer(&mut self, peerdata: PeerData) -> Result<(), PeerManagerError> {
let addrs = peerdata.peer.get_address()?;
self.peers.entry(addrs).or_insert(peerdata);
Ok(())
}
fn remove_peer(&mut self, addrs: &SocketAddr) -> Option<PeerData> {
self.peers.remove(addrs)
}
fn get_peer_banscore(&self, addrs: &SocketAddr) -> Option<usize> {
self.peers.get(addrs).map(|peerdata| peerdata.ban_score)
}
fn get_peerdata_mut(&mut self, address: &SocketAddr) -> Option<&mut PeerData> {
self.peers.get_mut(address)
}
fn get_peerdata(&self, address: &SocketAddr) -> Option<&PeerData> {
self.peers.get(address)
}
fn is_cbf(&self, addrs: &SocketAddr) -> Option<bool> {
if let Some(peer) = self.peers.get(addrs) {
match peer.is_cbf {
true => Some(true),
false => Some(false),
}
} else {
None
}
}
}
#[allow(dead_code)]
pub struct PeerManager<P: DiscoveryProgress> {
addrs_mngr: AddressManager<P>,
directory: PeerDirectory,
mempool: Arc<Mempool>,
min_cbf: usize,
min_total: usize,
network: Network,
}
#[allow(dead_code)]
impl<P: DiscoveryProgress> PeerManager<P> {
pub fn init(
network: Network,
cache_dir: &str,
crawler_threads: Option<usize>,
progress: P,
cbf_peers: Option<usize>,
total_peers: Option<usize>,
) -> Result<Self, PeerManagerError> {
let mut cache_filename = PathBuf::from(cache_dir);
cache_filename.push("addr_cache");
// Fetch minimum peer requirements, either by user input, or via default
let min_cbf = cbf_peers.unwrap_or(MIN_CBF_PEERS);
let min_total = total_peers.unwrap_or(MIN_TOTAL_PEERS);
let cbf_buff = min_cbf * 2;
let non_cbf_buff = (min_total - min_cbf) * 2;
// Create internal items
let addrs_mngr = AddressManager::new(
network,
cache_filename.into_os_string().into_string()?,
crawler_threads.unwrap_or(MIN_CRAWLER_THREADS),
Some(cbf_buff),
Some(non_cbf_buff),
progress,
)?;
let mempool = Arc::new(Mempool::new());
let peer_dir = PeerDirectory::new();
// Create self and update
let mut manager = Self {
addrs_mngr,
directory: peer_dir,
mempool,
min_cbf,
min_total,
network,
};
manager.update_directory()?;
Ok(manager)
}
fn update_directory(&mut self) -> Result<(), PeerManagerError> {
while self.directory.get_cbf_count() < self.min_cbf
|| self.directory.get_non_cbf_count() < (self.min_total - self.min_cbf)
{
// First connect with cbf peers, then with non_cbf
let cbf_fetch = self.directory.get_cbf_count() < self.min_cbf;
// Try to get an address
// if not present start crawlers
let target_addrs = match cbf_fetch {
true => {
if let Some(addrs) = self.addrs_mngr.get_new_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
false => {
if let Some(addrs) = self.addrs_mngr.get_new_non_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
};
if let Ok(peer) = Peer::connect(target_addrs, Arc::clone(&self.mempool), self.network) {
let address = peer.get_address()?;
assert_eq!(address, target_addrs);
let is_cbf = peer
.get_version()
.services
.has(ServiceFlags::COMPACT_FILTERS);
let peerdata = PeerData {
peer,
is_cbf,
ban_score: 0,
};
self.directory.insert_peer(peerdata)?;
} else {
continue;
}
}
Ok(())
}
pub fn set_banscore(
&mut self,
increase_by: usize,
address: &SocketAddr,
) -> Result<(), PeerManagerError> {
let mut current_score = if let Some(peer) = self.directory.get_peerdata_mut(address) {
peer.ban_score
} else {
return Err(PeerManagerError::PeerNotFound);
};
current_score += increase_by;
let mut banned = false;
if current_score >= BAN_SCORE_THRESHOLD {
match (
self.directory.is_cbf(address),
self.directory.remove_peer(address),
) {
(Some(true), Some(_)) => {
self.addrs_mngr.ban_peer(address, true)?;
banned = true;
}
(Some(false), Some(_)) => {
self.addrs_mngr.ban_peer(address, false)?;
banned = true;
}
_ => {
return Err(PeerManagerError::Generic(
"data inconsistency in directory, should not happen".to_string(),
))
}
}
}
if banned {
self.update_directory()?;
}
Ok(())
}
pub fn send_to(
&self,
address: &SocketAddr,
message: NetworkMessage,
) -> Result<(), PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
peerdata.peer.send(message)?;
Ok(())
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn receive_from(
&self,
address: &SocketAddr,
wait_for: &'static str,
) -> Result<Option<NetworkMessage>, PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
if let Some(response) = peerdata.peer.recv(wait_for, Some(RECEIVE_TIMEOUT))? {
Ok(Some(response))
} else {
Ok(None)
}
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn connected_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_cbf_addresses()
}
pub fn connected_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_non_cbf_addresses()
}
pub fn known_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_cbfs()
}
pub fn known_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_non_cbfs()
}
pub fn previously_tried_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_previously_tried()
}
}
#[cfg(test)]
mod test {
use super::super::LogDiscoveryProgress;
use super::*;
#[test]
#[ignore]
fn test_ban() {
let mut manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let connected_cbfs = manager.connected_cbf_addresses().unwrap();
let connected_non_cbfs = manager.connected_non_cbf_addresses().unwrap();
println!("Currently Connected CBFs: {:#?}", connected_cbfs);
assert_eq!(connected_cbfs.len(), 2);
assert_eq!(connected_non_cbfs.len(), 3);
let to_banned = &connected_cbfs[0];
println!("Banning address : {}", to_banned);
manager.set_banscore(100, to_banned).unwrap();
let newly_connected = manager.connected_cbf_addresses().unwrap();
println!("Newly Connected CBFs: {:#?}", newly_connected);
assert_eq!(newly_connected.len(), 2);
assert_ne!(newly_connected, connected_cbfs);
}
#[test]
#[ignore]
fn test_send_recv() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let target_address = manager.connected_cbf_addresses().unwrap()[0];
let ping = NetworkMessage::Ping(30);
println!("Asking peer {}", target_address);
manager.send_to(&target_address, ping).unwrap();
let response = manager
.receive_from(&target_address, "pong")
.unwrap()
.unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
let value = value.unwrap();
println!("Got value {:#?}", value);
}
#[test]
#[ignore]
fn test_connect_all() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let cbf_pings = vec![100u64; manager.min_cbf];
let non_cbf_pings = vec![200u64; manager.min_total - manager.min_cbf];
let cbf_peers = manager.connected_cbf_addresses().unwrap();
let non_cbf_peers = manager.connected_non_cbf_addresses().unwrap();
let sent_cbf: Vec<bool> = cbf_pings
.iter()
.zip(cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_cbf, vec![true; manager.min_cbf]);
println!("Sent pings to cbf peers");
let sent_noncbf: Vec<bool> = non_cbf_pings
.iter()
.zip(non_cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_noncbf, vec![true; manager.min_total - manager.min_cbf]);
println!("Sent pings to non cbf peers");
let cbf_received: Vec<u64> = cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
let non_cbf_received: Vec<u64> = non_cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
assert_eq!(cbf_pings, cbf_received);
assert_eq!(non_cbf_pings, non_cbf_received);
}
}

View File

@@ -9,30 +9,41 @@
// You may not use this file except in accordance with one or both of these
// licenses.
//! Esplora by way of `reqwest` HTTP client.
//! Esplora
//!
//! This module defines a [`Blockchain`] struct that can query an Esplora backend
//! populate the wallet's [database](crate::database::Database) by
//!
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::esplora::EsploraBlockchain;
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", None, 20);
//! # Ok::<(), bdk::Error>(())
//! ```
use std::collections::{HashMap, HashSet};
use std::fmt;
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::consensus::{self, deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use reqwest::{Client, StatusCode};
use serde::Deserialize;
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
use ::reqwest::{Client, StatusCode};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::wallet::utils::ChunksIterator;
use crate::FeeRate;
use super::*;
use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
#[derive(Debug)]
@@ -64,23 +75,17 @@ impl std::convert::From<UrlClient> for EsploraBlockchain {
}
impl EsploraBlockchain {
/// Create a new instance of the client from a base URL and `stop_gap`.
pub fn new(base_url: &str, stop_gap: usize) -> Self {
/// Create a new instance of the client from a base URL
pub fn new(base_url: &str, concurrency: Option<u8>, stop_gap: usize) -> Self {
EsploraBlockchain {
url_client: UrlClient {
url: base_url.to_string(),
client: Client::new(),
concurrency: DEFAULT_CONCURRENT_REQUESTS,
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENT_REQUESTS),
},
stop_gap,
}
}
/// Set the concurrency to use when doing batch queries against the Esplora instance.
pub fn with_concurrency(mut self, concurrency: u8) -> Self {
self.url_client.concurrency = concurrency;
self
}
}
#[maybe_async]
@@ -106,20 +111,32 @@ impl Blockchain for EsploraBlockchain {
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.url_client._get_tx(txid).await?)
Ok(await_or_block!(self.url_client._get_tx(txid))?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
Ok(self.url_client._broadcast(tx).await?)
Ok(await_or_block!(self.url_client._broadcast(tx))?)
}
fn get_height(&self) -> Result<u32, Error> {
Ok(self.url_client._get_height().await?)
Ok(await_or_block!(self.url_client._get_height())?)
}
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
let estimates = self.url_client._get_fee_estimates().await?;
super::into_fee_rate(target, estimates)
let estimates = await_or_block!(self.url_client._get_fee_estimates())?;
let fee_val = estimates
.into_iter()
.map(|(k, v)| Ok::<_, std::num::ParseIntError>((k.parse::<usize>()?, v)))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| Error::Generic(e.to_string()))?
.into_iter()
.take_while(|(k, _)| k <= &target)
.map(|(_, v)| v)
.last()
.unwrap_or(1.0);
Ok(FeeRate::from_sat_per_vb(fee_val as f32))
}
}
@@ -281,51 +298,74 @@ impl ElectrumLikeSync for UrlClient {
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for script in chunk {
futs.push(self._script_get_history(script));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for script in chunk {
futs.push(self._script_get_history(&script));
}
let partial_results: Vec<Vec<ElsGetHistoryRes>> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<Vec<ElsGetHistoryRes>> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for txid in chunk {
futs.push(self._get_tx_no_opt(txid));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for txid in chunk {
futs.push(self._get_tx_no_opt(&txid));
}
let partial_results: Vec<Transaction> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<Transaction> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for height in chunk {
futs.push(self._get_header(height));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for height in chunk {
futs.push(self._get_header(height));
}
let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
results.extend(partial_results);
}
Ok(stream::iter(results).collect().await)
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
}
#[derive(Deserialize)]
struct EsploraGetHistoryStatus {
block_height: Option<usize>,
}
#[derive(Deserialize)]
struct EsploraGetHistory {
txid: Txid,
status: EsploraGetHistoryStatus,
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
@@ -335,7 +375,7 @@ pub struct EsploraBlockchainConfig {
pub base_url: String,
/// Number of parallel requests sent to the esplora service (default: 4)
pub concurrency: Option<u8>,
/// Stop searching addresses for transactions after finding an unused gap of this length.
/// Stop searching addresses for transactions after finding an unused gap of this length
pub stop_gap: usize,
}
@@ -343,14 +383,47 @@ impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap);
if let Some(concurrency) = config.concurrency {
blockchain.url_client.concurrency = concurrency;
};
Ok(blockchain)
Ok(EsploraBlockchain::new(
config.base_url.as_str(),
config.concurrency,
config.stop_gap,
))
}
}
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
/// Error with the HTTP call
Reqwest(reqwest::Error),
/// Invalid number returned
Parsing(std::num::ParseIntError),
/// Invalid Bitcoin data returned
BitcoinEncoding(bitcoin::consensus::encode::Error),
/// Invalid Hex data returned
Hex(bitcoin::hashes::hex::Error),
/// Transaction not found
TransactionNotFound(Txid),
/// Header height not found
HeaderHeightNotFound(u32),
/// Header hash not found
HeaderHashNotFound(BlockHash),
}
impl fmt::Display for EsploraError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for EsploraError {}
impl_error!(reqwest::Error, Reqwest, EsploraError);
impl_error!(std::num::ParseIntError, Parsing, EsploraError);
impl_error!(consensus::encode::Error, BitcoinEncoding, EsploraError);
impl_error!(bitcoin::hashes::hex::Error, Hex, EsploraError);
#[cfg(test)]
#[cfg(feature = "test-esplora")]
crate::bdk_blockchain_tests! {

View File

@@ -1,143 +0,0 @@
//! Esplora
//!
//! This module defines a [`EsploraBlockchain`] struct that can query an Esplora
//! backend populate the wallet's [database](crate::database::Database) by:
//!
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::esplora::EsploraBlockchain;
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", 20);
//! # Ok::<(), bdk::Error>(())
//! ```
//!
//! Esplora blockchain can use either `ureq` or `reqwest` for the HTTP client
//! depending on your needs (blocking or async respectively).
//!
//! Please note, to configure the Esplora HTTP client correctly use one of:
//! Blocking: --features='esplora,ureq'
//! Async: --features='async-interface,esplora,reqwest' --no-default-features
use std::collections::HashMap;
use std::fmt;
use std::io;
use serde::Deserialize;
use bitcoin::consensus;
use bitcoin::{BlockHash, Txid};
use crate::error::Error;
use crate::FeeRate;
#[cfg(all(
feature = "esplora",
feature = "reqwest",
any(feature = "async-interface", target_arch = "wasm32"),
))]
mod reqwest;
#[cfg(all(
feature = "esplora",
feature = "reqwest",
any(feature = "async-interface", target_arch = "wasm32"),
))]
pub use self::reqwest::*;
#[cfg(all(
feature = "esplora",
not(any(
feature = "async-interface",
feature = "reqwest",
target_arch = "wasm32"
)),
))]
mod ureq;
#[cfg(all(
feature = "esplora",
not(any(
feature = "async-interface",
feature = "reqwest",
target_arch = "wasm32"
)),
))]
pub use self::ureq::*;
fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> {
let fee_val = estimates
.into_iter()
.map(|(k, v)| Ok::<_, std::num::ParseIntError>((k.parse::<usize>()?, v)))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| Error::Generic(e.to_string()))?
.into_iter()
.take_while(|(k, _)| k <= &target)
.map(|(_, v)| v)
.last()
.unwrap_or(1.0);
Ok(FeeRate::from_sat_per_vb(fee_val as f32))
}
/// Data type used when fetching transaction history from Esplora.
#[derive(Deserialize)]
pub struct EsploraGetHistory {
txid: Txid,
status: EsploraGetHistoryStatus,
}
#[derive(Deserialize)]
struct EsploraGetHistoryStatus {
block_height: Option<usize>,
}
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
/// Error during ureq HTTP request
#[cfg(feature = "ureq")]
Ureq(::ureq::Error),
/// Transport error during the ureq HTTP call
#[cfg(feature = "ureq")]
UreqTransport(::ureq::Transport),
/// Error during reqwest HTTP request
#[cfg(feature = "reqwest")]
Reqwest(::reqwest::Error),
/// HTTP response error
HttpResponse(u16),
/// IO error during ureq response read
Io(io::Error),
/// No header found in ureq response
NoHeader,
/// Invalid number returned
Parsing(std::num::ParseIntError),
/// Invalid Bitcoin data returned
BitcoinEncoding(bitcoin::consensus::encode::Error),
/// Invalid Hex data returned
Hex(bitcoin::hashes::hex::Error),
/// Transaction not found
TransactionNotFound(Txid),
/// Header height not found
HeaderHeightNotFound(u32),
/// Header hash not found
HeaderHashNotFound(BlockHash),
}
impl fmt::Display for EsploraError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for EsploraError {}
#[cfg(feature = "ureq")]
impl_error!(::ureq::Error, Ureq, EsploraError);
#[cfg(feature = "ureq")]
impl_error!(::ureq::Transport, UreqTransport, EsploraError);
#[cfg(feature = "reqwest")]
impl_error!(::reqwest::Error, Reqwest, EsploraError);
impl_error!(io::Error, Io, EsploraError);
impl_error!(std::num::ParseIntError, Parsing, EsploraError);
impl_error!(consensus::encode::Error, BitcoinEncoding, EsploraError);
impl_error!(bitcoin::hashes::hex::Error, Hex, EsploraError);

View File

@@ -1,379 +0,0 @@
// Bitcoin Dev Kit
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
//! Esplora by way of `ureq` HTTP client.
use std::collections::{HashMap, HashSet};
use std::io;
use std::io::Read;
use std::time::Duration;
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use ureq::{Agent, Response};
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
#[derive(Debug)]
struct UrlClient {
url: String,
agent: Agent,
}
/// Structure that implements the logic to sync with Esplora
///
/// ## Example
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
#[derive(Debug)]
pub struct EsploraBlockchain {
url_client: UrlClient,
stop_gap: usize,
}
impl std::convert::From<UrlClient> for EsploraBlockchain {
fn from(url_client: UrlClient) -> Self {
EsploraBlockchain {
url_client,
stop_gap: 20,
}
}
}
impl EsploraBlockchain {
/// Create a new instance of the client from a base URL and `stop_gap`.
pub fn new(base_url: &str, stop_gap: usize) -> Self {
EsploraBlockchain {
url_client: UrlClient {
url: base_url.to_string(),
agent: Agent::new(),
},
stop_gap,
}
}
/// Set the inner `ureq` agent.
pub fn with_agent(mut self, agent: Agent) -> Self {
self.url_client.agent = agent;
self
}
}
impl Blockchain for EsploraBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
vec![
Capability::FullHistory,
Capability::GetAnyTx,
Capability::AccurateFees,
]
.into_iter()
.collect()
}
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
progress_update: P,
) -> Result<(), Error> {
self.url_client
.electrum_like_setup(self.stop_gap, database, progress_update)
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.url_client._get_tx(txid)?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
let _txid = self.url_client._broadcast(tx)?;
Ok(())
}
fn get_height(&self) -> Result<u32, Error> {
Ok(self.url_client._get_height()?)
}
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
let estimates = self.url_client._get_fee_estimates()?;
super::into_fee_rate(target, estimates)
}
}
impl UrlClient {
fn script_to_scripthash(script: &Script) -> String {
sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
}
fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self
.agent
.get(&format!("{}/tx/{}/raw", self.url, txid))
.call();
match resp {
Ok(resp) => Ok(Some(deserialize(&into_bytes(resp)?)?)),
Err(ureq::Error::Status(code, _)) => {
if is_status_not_found(code) {
return Ok(None);
}
Err(EsploraError::HttpResponse(code))
}
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, EsploraError> {
match self._get_tx(txid) {
Ok(Some(tx)) => Ok(tx),
Ok(None) => Err(EsploraError::TransactionNotFound(*txid)),
Err(e) => Err(e),
}
}
fn _get_header(&self, block_height: u32) -> Result<BlockHeader, EsploraError> {
let resp = self
.agent
.get(&format!("{}/block-height/{}", self.url, block_height))
.call();
let bytes = match resp {
Ok(resp) => Ok(into_bytes(resp)?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
let hash = std::str::from_utf8(&bytes)
.map_err(|_| EsploraError::HeaderHeightNotFound(block_height))?;
let resp = self
.agent
.get(&format!("{}/block/{}/header", self.url, hash))
.call();
match resp {
Ok(resp) => Ok(deserialize(&Vec::from_hex(&resp.into_string()?)?)?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _broadcast(&self, transaction: &Transaction) -> Result<(), EsploraError> {
let resp = self
.agent
.post(&format!("{}/tx", self.url))
.send_string(&serialize(transaction).to_hex());
match resp {
Ok(_) => Ok(()), // We do not return the txid?
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _get_height(&self) -> Result<u32, EsploraError> {
let resp = self
.agent
.get(&format!("{}/blocks/tip/height", self.url))
.call();
match resp {
Ok(resp) => Ok(resp.into_string()?.parse()?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _script_get_history(&self, script: &Script) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
let mut result = Vec::new();
let scripthash = Self::script_to_scripthash(script);
// Add the unconfirmed transactions first
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/mempool",
self.url, scripthash
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
debug!(
"Found {} mempool txs for {} - {:?}",
result.len(),
scripthash,
script
);
// Then go through all the pages of confirmed transactions
let mut last_txid = String::new();
loop {
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/chain/{}",
self.url, scripthash, last_txid
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
let len = v.len();
if let Some(elem) = v.last() {
last_txid = elem.txid.to_hex();
}
debug!("... adding {} confirmed transactions", len);
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
if len < 25 {
break;
}
}
Ok(result)
}
fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
let resp = self
.agent
.get(&format!("{}/fee-estimates", self.url,))
.call();
let map = match resp {
Ok(resp) => {
let map: HashMap<String, f64> = resp.into_json()?;
Ok(map)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
Ok(map)
}
}
fn is_status_not_found(status: u16) -> bool {
status == 404
}
fn into_bytes(resp: Response) -> Result<Vec<u8>, io::Error> {
const BYTES_LIMIT: usize = 10 * 1_024 * 1_024;
let mut buf: Vec<u8> = vec![];
resp.into_reader()
.take((BYTES_LIMIT + 1) as u64)
.read_to_end(&mut buf)?;
if buf.len() > BYTES_LIMIT {
return Err(io::Error::new(
io::ErrorKind::Other,
"response too big for into_bytes",
));
}
Ok(buf)
}
impl ElectrumLikeSync for UrlClient {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for script in scripts.into_iter() {
let v = self._script_get_history(script)?;
results.push(v);
}
Ok(results)
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for txid in txids.into_iter() {
let tx = self._get_tx_no_opt(txid)?;
results.push(tx);
}
Ok(results)
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for height in heights.into_iter() {
let header = self._get_header(height)?;
results.push(header);
}
Ok(results)
}
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service eg. `https://blockstream.info/api/`
pub base_url: String,
/// Socket read timeout.
pub timeout_read: u64,
/// Socket write timeout.
pub timeout_write: u64,
/// Stop searching addresses for transactions after finding an unused gap of this length.
pub stop_gap: usize,
}
impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let agent: Agent = ureq::AgentBuilder::new()
.timeout_read(Duration::from_secs(config.timeout_read))
.timeout_write(Duration::from_secs(config.timeout_write))
.build();
Ok(EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap).with_agent(agent))
}
}

View File

@@ -30,19 +30,9 @@ use crate::FeeRate;
#[cfg(any(feature = "electrum", feature = "esplora"))]
pub(crate) mod utils;
#[cfg(any(
feature = "electrum",
feature = "esplora",
feature = "compact_filters",
feature = "rpc"
))]
#[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]
pub mod any;
#[cfg(any(
feature = "electrum",
feature = "esplora",
feature = "compact_filters",
feature = "rpc"
))]
#[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]
pub use any::{AnyBlockchain, AnyBlockchainConfig};
#[cfg(feature = "electrum")]

View File

@@ -18,12 +18,10 @@
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::{RpcConfig, RpcBlockchain, ConfigurableBlockchain, rpc::Auth};
//! # use bdk::blockchain::{RpcConfig, RpcBlockchain, ConfigurableBlockchain};
//! let config = RpcConfig {
//! url: "127.0.0.1:18332".to_string(),
//! auth: Auth::Cookie {
//! file: "/home/user/.bitcoin/.cookie".into(),
//! },
//! auth: bitcoincore_rpc::Auth::CookieFile("/home/user/.bitcoin/.cookie".into()),
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! skip_blocks: None,
@@ -38,17 +36,15 @@ use crate::database::{BatchDatabase, DatabaseUtils};
use crate::descriptor::{get_checksum, IntoWalletDescriptor};
use crate::wallet::utils::SecpCtx;
use crate::{ConfirmationTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
use core_rpc::json::{
use bitcoincore_rpc::json::{
GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest,
ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
};
use core_rpc::jsonrpc::serde_json::Value;
use core_rpc::Auth as RpcAuth;
use core_rpc::{Client, RpcApi};
use bitcoincore_rpc::jsonrpc::serde_json::Value;
use bitcoincore_rpc::{Auth, Client, RpcApi};
use log::debug;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::str::FromStr;
/// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
@@ -68,7 +64,7 @@ pub struct RpcBlockchain {
}
/// RpcBlockchain configuration options
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug)]
pub struct RpcConfig {
/// The bitcoin node url
pub url: String,
@@ -82,39 +78,6 @@ pub struct RpcConfig {
pub skip_blocks: Option<u32>,
}
/// This struct is equivalent to [core_rpc::Auth] but it implements [serde::Serialize]
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
/// should be the same) https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181
#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
pub enum Auth {
/// None authentication
None,
/// Authentication with username and password, usually [Auth::Cookie] should be preferred
UserPass {
/// Username
username: String,
/// Password
password: String,
},
/// Authentication with a cookie file
Cookie {
/// Cookie file
file: PathBuf,
},
}
impl From<Auth> for RpcAuth {
fn from(auth: Auth) -> Self {
match auth {
Auth::None => RpcAuth::None,
Auth::UserPass { username, password } => RpcAuth::UserPass(username, password),
Auth::Cookie { file } => RpcAuth::CookieFile(file),
}
}
}
impl RpcBlockchain {
fn get_node_synced_height(&self) -> Result<u32, Error> {
let info = self.client.get_address_info(&self._storage_address)?;
@@ -357,7 +320,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let client = Client::new(wallet_url, config.auth.clone())?;
let loaded_wallets = client.list_wallets()?;
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name);
@@ -464,7 +427,7 @@ crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
let config = RpcConfig {
url: test_client.bitcoind.rpc_url(),
auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
auth: Auth::CookieFile(test_client.bitcoind.params.cookie_file.clone()),
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{:?}", std::time::SystemTime::now() ),
skip_blocks: None,

View File

@@ -369,7 +369,7 @@ fn save_transaction_details_and_utxos<D: BatchDatabase>(
}
/// returns utxo dependency as the inputs needed for the utxo to exist
/// `tx_raw_in_db` must contains utxo's generating txs or errors with [crate::Error::TransactionNotFound]
/// `tx_raw_in_db` must contains utxo's generating txs or errors witt [crate::Error::TransactionNotFound]
fn utxos_deps<D: BatchDatabase>(
db: &mut D,
tx_raw_in_db: &HashMap<Txid, Transaction>,

View File

@@ -233,10 +233,6 @@ impl Database for AnyDatabase {
fn increment_last_index(&mut self, keychain: KeychainKind) -> Result<u32, Error> {
impl_inner_method!(AnyDatabase, self, increment_last_index, keychain)
}
fn flush(&mut self) -> Result<(), Error> {
impl_inner_method!(AnyDatabase, self, flush)
}
}
impl BatchOperations for AnyBatch {

View File

@@ -367,10 +367,6 @@ impl Database for Tree {
Ok(val)
})
}
fn flush(&mut self) -> Result<(), Error> {
Ok(Tree::flush(self).map(|_| ())?)
}
}
impl BatchDatabase for Tree {

View File

@@ -419,10 +419,6 @@ impl Database for MemoryDatabase {
Ok(*value)
}
fn flush(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl BatchDatabase for MemoryDatabase {

View File

@@ -134,9 +134,6 @@ pub trait Database: BatchOperations {
///
/// It should insert and return `0` if not present in the database
fn increment_last_index(&mut self, keychain: KeychainKind) -> Result<u32, Error>;
/// Force changes to be written to disk
fn flush(&mut self) -> Result<(), Error>;
}
/// Trait for a database that supports batch operations

View File

@@ -130,7 +130,7 @@ pub enum Error {
Electrum(electrum_client::Error),
#[cfg(feature = "esplora")]
/// Esplora client error
Esplora(Box<crate::blockchain::esplora::EsploraError>),
Esplora(crate::blockchain::esplora::EsploraError),
#[cfg(feature = "compact_filters")]
/// Compact filters client error)
CompactFilters(crate::blockchain::compact_filters::CompactFiltersError),
@@ -139,7 +139,7 @@ pub enum Error {
Sled(sled::Error),
#[cfg(feature = "rpc")]
/// Rpc client error
Rpc(core_rpc::Error),
Rpc(bitcoincore_rpc::Error),
}
impl fmt::Display for Error {
@@ -190,10 +190,12 @@ impl_error!(bitcoin::util::psbt::PsbtParseError, PsbtParse);
#[cfg(feature = "electrum")]
impl_error!(electrum_client::Error, Electrum);
#[cfg(feature = "esplora")]
impl_error!(crate::blockchain::esplora::EsploraError, Esplora);
#[cfg(feature = "key-value-db")]
impl_error!(sled::Error, Sled);
#[cfg(feature = "rpc")]
impl_error!(core_rpc::Error, Rpc);
impl_error!(bitcoincore_rpc::Error, Rpc);
#[cfg(feature = "compact_filters")]
impl From<crate::blockchain::compact_filters::CompactFiltersError> for Error {
@@ -214,10 +216,3 @@ impl From<crate::wallet::verify::VerifyError> for Error {
}
}
}
#[cfg(feature = "esplora")]
impl From<crate::blockchain::esplora::EsploraError> for Error {
fn from(other: crate::blockchain::esplora::EsploraError) -> Self {
Error::Esplora(Box::new(other))
}
}

View File

@@ -40,7 +40,7 @@
//! interact with the bitcoin P2P network.
//!
//! ```toml
//! bdk = "0.11.0"
//! bdk = "0.9.0"
//! ```
#![cfg_attr(
feature = "electrum",
@@ -205,24 +205,11 @@ extern crate serde;
#[macro_use]
extern crate serde_json;
#[cfg(all(feature = "reqwest", feature = "ureq"))]
compile_error!("Features reqwest and ureq are mutually exclusive and cannot be enabled together");
#[cfg(all(feature = "async-interface", feature = "electrum"))]
compile_error!(
"Features async-interface and electrum are mutually exclusive and cannot be enabled together"
);
#[cfg(all(feature = "async-interface", feature = "ureq"))]
compile_error!(
"Features async-interface and ureq are mutually exclusive and cannot be enabled together"
);
#[cfg(all(feature = "async-interface", feature = "compact_filters"))]
compile_error!(
"Features async-interface and compact_filters are mutually exclusive and cannot be enabled together"
);
#[cfg(feature = "keys-bip39")]
extern crate bip39;
@@ -236,11 +223,14 @@ extern crate bdk_macros;
extern crate lazy_static;
#[cfg(feature = "rpc")]
pub extern crate core_rpc;
pub extern crate bitcoincore_rpc;
#[cfg(feature = "electrum")]
pub extern crate electrum_client;
#[cfg(feature = "esplora")]
pub extern crate reqwest;
#[cfg(feature = "key-value-db")]
pub extern crate sled;

View File

@@ -43,8 +43,8 @@ impl PsbtUtils for Psbt {
mod test {
use crate::bitcoin::TxIn;
use crate::psbt::Psbt;
use crate::wallet::test::{get_funded_wallet, get_test_wpkh};
use crate::wallet::AddressIndex;
use crate::wallet::{get_funded_wallet, test::get_test_wpkh};
use crate::SignOptions;
use std::str::FromStr;

View File

@@ -3,14 +3,14 @@ use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::sha256d;
use bitcoin::{Address, Amount, Script, Transaction, Txid};
pub use bitcoincore_rpc::bitcoincore_rpc_json::AddressType;
pub use bitcoincore_rpc::{Auth, Client as RpcClient, RpcApi};
use core::str::FromStr;
pub use core_rpc::core_rpc_json::AddressType;
pub use core_rpc::{Auth, Client as RpcClient, RpcApi};
use electrsd::bitcoind::BitcoinD;
use electrsd::{bitcoind, Conf, ElectrsD};
use electrsd::{bitcoind, ElectrsD};
pub use electrum_client::{Client as ElectrumClient, ElectrumApi};
#[allow(unused_imports)]
use log::{debug, error, info, log_enabled, trace, Level};
use log::{debug, error, info, trace};
use std::collections::HashMap;
use std::env;
use std::ops::Deref;
@@ -24,20 +24,11 @@ pub struct TestClient {
impl TestClient {
pub fn new(bitcoind_exe: String, electrs_exe: String) -> Self {
debug!("launching {} and {}", &bitcoind_exe, &electrs_exe);
let conf = bitcoind::Conf {
view_stdout: log_enabled!(Level::Debug),
..Default::default()
};
let bitcoind = BitcoinD::with_conf(bitcoind_exe, &conf).unwrap();
let bitcoind = BitcoinD::new(bitcoind_exe).unwrap();
let http_enabled = cfg!(feature = "test-esplora");
let conf = Conf {
http_enabled,
view_stderr: log_enabled!(Level::Debug),
..Default::default()
};
let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &conf).unwrap();
let electrsd = ElectrsD::new(electrs_exe, &bitcoind, false, http_enabled).unwrap();
let node_address = bitcoind.client.get_new_address(None, None).unwrap();
bitcoind

View File

@@ -10,7 +10,6 @@
// licenses.
use std::convert::AsRef;
use std::ops::Sub;
use bitcoin::blockdata::transaction::{OutPoint, Transaction, TxOut};
use bitcoin::{hash_types::Txid, util::psbt};
@@ -66,31 +65,10 @@ impl FeeRate {
FeeRate(1.0)
}
/// Calculate fee rate from `fee` and weight units (`wu`).
pub fn from_wu(fee: u64, wu: usize) -> FeeRate {
Self::from_vb(fee, wu.vbytes())
}
/// Calculate fee rate from `fee` and `vbytes`.
pub fn from_vb(fee: u64, vbytes: usize) -> FeeRate {
let rate = fee as f32 / vbytes as f32;
Self::from_sat_per_vb(rate)
}
/// Return the value as satoshi/vbyte
pub fn as_sat_vb(&self) -> f32 {
self.0
}
/// Calculate absolute fee in Satoshis using size in weight units.
pub fn fee_wu(&self, wu: usize) -> u64 {
self.fee_vb(wu.vbytes())
}
/// Calculate absolute fee in Satoshis using size in virtual bytes.
pub fn fee_vb(&self, vbytes: usize) -> u64 {
(self.as_sat_vb() * vbytes as f32).ceil() as u64
}
}
impl std::default::Default for FeeRate {
@@ -99,27 +77,6 @@ impl std::default::Default for FeeRate {
}
}
impl Sub for FeeRate {
type Output = Self;
fn sub(self, other: FeeRate) -> Self::Output {
FeeRate(self.0 - other.0)
}
}
/// Trait implemented by types that can be used to measure weight units.
pub trait Vbytes {
/// Convert weight units to virtual bytes.
fn vbytes(self) -> usize;
}
impl Vbytes for usize {
fn vbytes(self) -> usize {
// ref: https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#transaction-size-calculations
(self as f32 / 4.0).ceil() as usize
}
}
/// An unspent output owned by a [`Wallet`].
///
/// [`Wallet`]: crate::Wallet

View File

@@ -115,8 +115,8 @@ mod test {
use std::sync::Arc;
use super::*;
use crate::wallet::test::{get_funded_wallet, get_test_wpkh};
use crate::wallet::AddressIndex::New;
use crate::wallet::{get_funded_wallet, test::get_test_wpkh};
#[derive(Debug)]
struct TestValidator;

View File

@@ -26,7 +26,7 @@
//! ```
//! # use std::str::FromStr;
//! # use bitcoin::*;
//! # use bdk::wallet::{self, coin_selection::*};
//! # use bdk::wallet::coin_selection::*;
//! # use bdk::database::Database;
//! # use bdk::*;
//! # const TXIN_BASE_WEIGHT: usize = (32 + 4 + 4 + 1) * 4;
@@ -41,7 +41,7 @@
//! optional_utxos: Vec<WeightedUtxo>,
//! fee_rate: FeeRate,
//! amount_needed: u64,
//! fee_amount: u64,
//! fee_amount: f32,
//! ) -> Result<CoinSelectionResult, bdk::Error> {
//! let mut selected_amount = 0;
//! let mut additional_weight = 0;
@@ -57,8 +57,9 @@
//! },
//! )
//! .collect::<Vec<_>>();
//! let additional_fees = fee_rate.fee_wu(additional_weight);
//! let amount_needed_with_fees = (fee_amount + additional_fees) + amount_needed;
//! let additional_fees = additional_weight as f32 * fee_rate.as_sat_vb() / 4.0;
//! let amount_needed_with_fees =
//! (fee_amount + additional_fees).ceil() as u64 + amount_needed;
//! if amount_needed_with_fees > selected_amount {
//! return Err(bdk::Error::InsufficientFunds {
//! needed: amount_needed_with_fees,
@@ -89,6 +90,7 @@
//! ```
use crate::types::FeeRate;
use crate::wallet::Vbytes;
use crate::{database::Database, WeightedUtxo};
use crate::{error::Error, Utxo};
@@ -116,7 +118,7 @@ pub struct CoinSelectionResult {
/// List of outputs selected for use as inputs
pub selected: Vec<Utxo>,
/// Total fee amount in satoshi
pub fee_amount: u64,
pub fee_amount: f32,
}
impl CoinSelectionResult {
@@ -163,7 +165,7 @@ pub trait CoinSelectionAlgorithm<D: Database>: std::fmt::Debug {
optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
fee_amount: u64,
fee_amount: f32,
) -> Result<CoinSelectionResult, Error>;
}
@@ -182,8 +184,10 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
mut optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
mut fee_amount: u64,
mut fee_amount: f32,
) -> Result<CoinSelectionResult, Error> {
let calc_fee_bytes = |wu| (wu as f32) * fee_rate.as_sat_vb() / 4.0;
log::debug!(
"amount_needed = `{}`, fee_amount = `{}`, fee_rate = `{:?}`",
amount_needed,
@@ -208,9 +212,9 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
.scan(
(&mut selected_amount, &mut fee_amount),
|(selected_amount, fee_amount), (must_use, weighted_utxo)| {
if must_use || **selected_amount < amount_needed + **fee_amount {
if must_use || **selected_amount < amount_needed + (fee_amount.ceil() as u64) {
**fee_amount +=
fee_rate.fee_wu(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
calc_fee_bytes(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
**selected_amount += weighted_utxo.utxo.txout().value;
log::debug!(
@@ -227,7 +231,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
)
.collect::<Vec<_>>();
let amount_needed_with_fees = amount_needed + fee_amount;
let amount_needed_with_fees = amount_needed + (fee_amount.ceil() as u64);
if selected_amount < amount_needed_with_fees {
return Err(Error::InsufficientFunds {
needed: amount_needed_with_fees,
@@ -247,15 +251,16 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
struct OutputGroup {
weighted_utxo: WeightedUtxo,
// Amount of fees for spending a certain utxo, calculated using a certain FeeRate
fee: u64,
fee: f32,
// The effective value of the UTXO, i.e., the utxo value minus the fee for spending it
effective_value: i64,
}
impl OutputGroup {
fn new(weighted_utxo: WeightedUtxo, fee_rate: FeeRate) -> Self {
let fee = fee_rate.fee_wu(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
let effective_value = weighted_utxo.utxo.txout().value as i64 - fee as i64;
let fee =
(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight).vbytes() * fee_rate.as_sat_vb();
let effective_value = weighted_utxo.utxo.txout().value as i64 - fee.ceil() as i64;
OutputGroup {
weighted_utxo,
fee,
@@ -298,7 +303,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
fee_amount: u64,
fee_amount: f32,
) -> Result<CoinSelectionResult, Error> {
// Mapping every (UTXO, usize) to an output group
let required_utxos: Vec<OutputGroup> = required_utxos
@@ -320,7 +325,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
.iter()
.fold(0, |acc, x| acc + x.effective_value);
let actual_target = fee_amount + amount_needed;
let actual_target = fee_amount.ceil() as u64 + amount_needed;
let cost_of_change = self.size_of_change as f32 * fee_rate.as_sat_vb();
let expected = (curr_available_value + curr_value)
@@ -340,14 +345,6 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
.try_into()
.expect("Bitcoin amount to fit into i64");
if curr_value > actual_target {
return Ok(BranchAndBoundCoinSelection::calculate_cs_result(
vec![],
required_utxos,
fee_amount,
));
}
Ok(self
.bnb(
required_utxos.clone(),
@@ -381,7 +378,7 @@ impl BranchAndBoundCoinSelection {
mut curr_value: i64,
mut curr_available_value: i64,
actual_target: i64,
fee_amount: u64,
fee_amount: f32,
cost_of_change: f32,
) -> Result<CoinSelectionResult, Error> {
// current_selection[i] will contain true if we are using optional_utxos[i],
@@ -489,7 +486,7 @@ impl BranchAndBoundCoinSelection {
mut optional_utxos: Vec<OutputGroup>,
curr_value: i64,
actual_target: i64,
fee_amount: u64,
fee_amount: f32,
) -> CoinSelectionResult {
#[cfg(not(test))]
optional_utxos.shuffle(&mut thread_rng());
@@ -518,10 +515,10 @@ impl BranchAndBoundCoinSelection {
fn calculate_cs_result(
mut selected_utxos: Vec<OutputGroup>,
mut required_utxos: Vec<OutputGroup>,
mut fee_amount: u64,
mut fee_amount: f32,
) -> CoinSelectionResult {
selected_utxos.append(&mut required_utxos);
fee_amount += selected_utxos.iter().map(|u| u.fee).sum::<u64>();
fee_amount += selected_utxos.iter().map(|u| u.fee).sum::<f32>();
let selected = selected_utxos
.into_iter()
.map(|u| u.weighted_utxo.utxo)
@@ -543,7 +540,6 @@ mod test {
use super::*;
use crate::database::MemoryDatabase;
use crate::types::*;
use crate::wallet::Vbytes;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
@@ -551,33 +547,52 @@ mod test {
const P2WPKH_WITNESS_SIZE: usize = 73 + 33 + 2;
const FEE_AMOUNT: u64 = 50;
fn utxo(value: u64, index: u32) -> WeightedUtxo {
assert!(index < 10);
let outpoint = OutPoint::from_str(&format!(
"000000000000000000000000000000000000000000000000000000000000000{}:0",
index
))
.unwrap();
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint,
txout: TxOut {
value,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
}
}
const FEE_AMOUNT: f32 = 50.0;
fn get_test_utxos() -> Vec<WeightedUtxo> {
vec![
utxo(100_000, 0),
utxo(FEE_AMOUNT as u64 - 40, 1),
utxo(200_000, 2),
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000000:0",
)
.unwrap(),
txout: TxOut {
value: 100_000,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
},
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000001:0",
)
.unwrap(),
txout: TxOut {
value: FEE_AMOUNT as u64 - 40,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
},
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000002:0",
)
.unwrap(),
txout: TxOut {
value: 200_000,
script_pubkey: Script::new(),
},
keychain: KeychainKind::Internal,
}),
},
]
}
@@ -641,13 +656,13 @@ mod test {
vec![],
FeeRate::from_sat_per_vb(1.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254)
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -662,13 +677,13 @@ mod test {
vec![],
FeeRate::from_sat_per_vb(1.0),
20_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -683,13 +698,13 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
20_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 1);
assert_eq!(result.selected_amount(), 200_000);
assert_eq!(result.fee_amount, 118);
assert!((result.fee_amount - 118.0).abs() < f32::EPSILON);
}
#[test]
@@ -705,7 +720,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
500_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -723,7 +738,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1000.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -743,13 +758,13 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_000);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -770,7 +785,7 @@ mod test {
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -791,38 +806,7 @@ mod test {
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300010);
assert_eq!(result.fee_amount, 254);
}
#[test]
fn test_bnb_coin_selection_required_not_enough() {
let utxos = get_test_utxos();
let database = MemoryDatabase::default();
let required = vec![utxos[0].clone()];
let mut optional = utxos[1..].to_vec();
optional.push(utxo(500_000, 3));
// Defensive assertions, for sanity and in case someone changes the test utxos vector.
let amount: u64 = required.iter().map(|u| u.utxo.txout().value).sum();
assert_eq!(amount, 100_000);
let amount: u64 = optional.iter().map(|u| u.utxo.txout().value).sum();
assert!(amount > 150_000);
let result = BranchAndBoundCoinSelection::default()
.coin_select(
&database,
required,
optional,
FeeRate::from_sat_per_vb(1.0),
150_000,
FEE_AMOUNT,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert!((result.fee_amount as f32 - 254.0).abs() < f32::EPSILON);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -838,7 +822,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
500_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -856,7 +840,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1000.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -873,7 +857,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
99932, // first utxo's effective value
0,
0.0,
)
.unwrap();
@@ -881,7 +865,7 @@ mod test {
assert_eq!(result.selected_amount(), 100_000);
let input_size = (TXIN_BASE_WEIGHT + P2WPKH_WITNESS_SIZE).vbytes();
let epsilon = 0.5;
assert!((1.0 - (result.fee_amount as f32 / input_size as f32)).abs() < epsilon);
assert!((1.0 - (result.fee_amount / input_size)).abs() < epsilon);
}
#[test]
@@ -900,7 +884,7 @@ mod test {
optional_utxos,
FeeRate::from_sat_per_vb(0.0),
target_amount,
0,
0.0,
)
.unwrap();
assert_eq!(result.selected_amount(), target_amount);
@@ -927,7 +911,7 @@ mod test {
0,
curr_available_value,
20_000,
FEE_AMOUNT,
50.0,
cost_of_change,
)
.unwrap();
@@ -954,7 +938,7 @@ mod test {
0,
curr_available_value,
20_000,
FEE_AMOUNT,
50.0,
cost_of_change,
)
.unwrap();
@@ -966,6 +950,7 @@ mod test {
let fee_rate = FeeRate::from_sat_per_vb(1.0);
let size_of_change = 31;
let cost_of_change = size_of_change as f32 * fee_rate.as_sat_vb();
let fee_amount = 50.0;
let utxos: Vec<_> = generate_same_value_utxos(50_000, 10)
.into_iter()
@@ -987,12 +972,12 @@ mod test {
curr_value,
curr_available_value,
target_amount,
FEE_AMOUNT,
fee_amount,
cost_of_change,
)
.unwrap();
assert!((result.fee_amount - 186.0).abs() < f32::EPSILON);
assert_eq!(result.selected_amount(), 100_000);
assert_eq!(result.fee_amount, 186);
}
// TODO: bnb() function should be optimized, and this test should be done with more utxos
@@ -1024,7 +1009,7 @@ mod test {
curr_value,
curr_available_value,
target_amount,
0,
0.0,
0.0,
)
.unwrap();
@@ -1050,10 +1035,12 @@ mod test {
utxos,
0,
target_amount as i64,
FEE_AMOUNT,
50.0,
);
assert!(result.selected_amount() > target_amount);
assert_eq!(result.fee_amount, (50 + result.selected.len() * 68) as u64);
assert!(
(result.fee_amount - (50.0 + result.selected.len() as f32 * 68.0)).abs() < f32::EPSILON
);
}
}

View File

@@ -18,7 +18,6 @@ use std::collections::HashMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::Arc;
use bitcoin::secp256k1::Secp256k1;
@@ -56,7 +55,6 @@ use tx_builder::{BumpFee, CreateTx, FeePolicy, TxBuilder, TxParams};
use utils::{check_nlocktime, check_nsequence_rbf, After, Older, SecpCtx, DUST_LIMIT_SATOSHI};
use crate::blockchain::{Blockchain, Progress};
use crate::database::memory::MemoryDatabase;
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::descriptor::derived::AsDerived;
use crate::descriptor::policy::BuildSatisfaction;
@@ -68,7 +66,6 @@ use crate::descriptor::{
use crate::error::Error;
use crate::psbt::PsbtUtils;
use crate::signer::SignerError;
use crate::testutils;
use crate::types::*;
const CACHE_ADDR_BATCH_SIZE: u32 = 100;
@@ -170,11 +167,6 @@ where
secp,
})
}
/// Get the Bitcoin network the wallet is using.
pub fn network(&self) -> Network {
self.network
}
}
/// The address index selection strategy to use to derived an address from the wallet's external
@@ -551,7 +543,7 @@ where
});
}
}
(FeeRate::from_sat_per_vb(0.0), *fee)
(FeeRate::from_sat_per_vb(0.0), *fee as f32)
}
FeePolicy::FeeRate(rate) => {
if let Some(previous_fee) = params.bumping_fee {
@@ -562,7 +554,7 @@ where
});
}
}
(*rate, 0)
(*rate, 0.0)
}
};
@@ -581,7 +573,8 @@ where
let mut outgoing: u64 = 0;
let mut received: u64 = 0;
fee_amount += fee_rate.fee_wu(tx.get_weight());
let calc_fee_bytes = |wu| (wu as f32) * fee_rate.as_sat_vb() / 4.0;
fee_amount += calc_fee_bytes(tx.get_weight());
let recipients = params.recipients.iter().map(|(r, v)| (r, *v));
@@ -598,7 +591,7 @@ where
script_pubkey: script_pubkey.clone(),
value,
};
fee_amount += fee_rate.fee_vb(serialize(&new_out).len());
fee_amount += calc_fee_bytes(serialize(&new_out).len() * 4);
tx.output.push(new_out);
@@ -656,8 +649,9 @@ where
}
};
fee_amount += fee_rate.fee_vb(serialize(&drain_output).len());
fee_amount += calc_fee_bytes(serialize(&drain_output).len() * 4);
let mut fee_amount = fee_amount.ceil() as u64;
let drain_val = (coin_selection.selected_amount() - outgoing).saturating_sub(fee_amount);
if tx.output.is_empty() {
@@ -760,10 +754,8 @@ where
return Err(Error::IrreplaceableTransaction);
}
let feerate = FeeRate::from_wu(
details.fee.ok_or(Error::FeeRateUnavailable)?,
tx.get_weight(),
);
let vbytes = tx.get_weight().vbytes();
let feerate = details.fee.ok_or(Error::FeeRateUnavailable)? as f32 / vbytes;
// remove the inputs from the tx and process them
let original_txin = tx.input.drain(..).collect::<Vec<_>>();
@@ -840,7 +832,7 @@ where
utxos: original_utxos,
bumping_fee: Some(tx_builder::PreviousFee {
absolute: details.fee.ok_or(Error::FeeRateUnavailable)?,
rate: feerate.as_sat_vb(),
rate: feerate,
}),
..Default::default()
};
@@ -1542,6 +1534,11 @@ where
&self.client
}
/// Get the Bitcoin network the wallet is using.
pub fn network(&self) -> Network {
self.network
}
/// Broadcast a transaction to the network
#[maybe_async]
pub fn broadcast(&self, tx: Transaction) -> Result<Txid, Error> {
@@ -1551,60 +1548,31 @@ where
}
}
/// Return a fake wallet that appears to be funded for testing.
pub fn get_funded_wallet(
descriptor: &str,
) -> (
Wallet<(), MemoryDatabase>,
(String, Option<String>),
bitcoin::Txid,
) {
let descriptors = testutils!(@descriptors (descriptor));
let wallet = Wallet::new_offline(
&descriptors.0,
None,
Network::Regtest,
MemoryDatabase::new(),
)
.unwrap();
/// Trait implemented by types that can be used to measure weight units.
pub trait Vbytes {
/// Convert weight units to virtual bytes.
fn vbytes(self) -> f32;
}
let funding_address_kix = 0;
let tx_meta = testutils! {
@tx ( (@external descriptors, funding_address_kix) => 50_000 ) (@confirmations 1)
};
wallet
.database
.borrow_mut()
.set_script_pubkey(
&bitcoin::Address::from_str(&tx_meta.output.get(0).unwrap().to_address)
.unwrap()
.script_pubkey(),
KeychainKind::External,
funding_address_kix,
)
.unwrap();
wallet
.database
.borrow_mut()
.set_last_index(KeychainKind::External, funding_address_kix)
.unwrap();
let txid = crate::populate_test_db!(wallet.database.borrow_mut(), tx_meta, Some(100));
(wallet, descriptors, txid)
impl Vbytes for usize {
fn vbytes(self) -> f32 {
self as f32 / 4.0
}
}
#[cfg(test)]
pub(crate) mod test {
use std::str::FromStr;
use bitcoin::{util::psbt, Network};
use crate::database::memory::MemoryDatabase;
use crate::database::Database;
use crate::types::KeychainKind;
use super::*;
use crate::signer::{SignOptions, SignerError};
use crate::testutils;
use crate::wallet::AddressIndex::{LastUnused, New, Peek, Reset};
#[test]
@@ -1716,6 +1684,50 @@ pub(crate) mod test {
"wsh(and_v(v:pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW),after(100000)))"
}
pub(crate) fn get_funded_wallet(
descriptor: &str,
) -> (
Wallet<(), MemoryDatabase>,
(String, Option<String>),
bitcoin::Txid,
) {
let descriptors = testutils!(@descriptors (descriptor));
let wallet = Wallet::new_offline(
&descriptors.0,
None,
Network::Regtest,
MemoryDatabase::new(),
)
.unwrap();
let funding_address_kix = 0;
let tx_meta = testutils! {
@tx ( (@external descriptors, funding_address_kix) => 50_000 ) (@confirmations 1)
};
wallet
.database
.borrow_mut()
.set_script_pubkey(
&bitcoin::Address::from_str(&tx_meta.output.get(0).unwrap().to_address)
.unwrap()
.script_pubkey(),
KeychainKind::External,
funding_address_kix,
)
.unwrap();
wallet
.database
.borrow_mut()
.set_last_index(KeychainKind::External, funding_address_kix)
.unwrap();
let txid = crate::populate_test_db!(wallet.database.borrow_mut(), tx_meta, Some(100));
(wallet, descriptors, txid)
}
macro_rules! assert_fee_rate {
($tx:expr, $fees:expr, $fee_rate:expr $( ,@dust_change $( $dust_change:expr )* )* $( ,@add_signature $( $add_signature:expr )* )* ) => ({
let mut tx = $tx.clone();
@@ -1734,13 +1746,13 @@ pub(crate) mod test {
dust_change = true;
)*
let tx_fee_rate = FeeRate::from_wu($fees, tx.get_weight());
let fee_rate = $fee_rate;
let tx_fee_rate = $fees as f32 / (tx.get_weight().vbytes());
let fee_rate = $fee_rate.as_sat_vb();
if !dust_change {
assert!((tx_fee_rate - fee_rate).as_sat_vb().abs() < 0.5, "Expected fee rate of {:?}, the tx has {:?}", fee_rate, tx_fee_rate);
assert!((tx_fee_rate - fee_rate).abs() < 0.5, "Expected fee rate of {}, the tx has {}", fee_rate, tx_fee_rate);
} else {
assert!(tx_fee_rate >= fee_rate, "Expected fee rate of at least {:?}, the tx has {:?}", fee_rate, tx_fee_rate);
assert!(tx_fee_rate >= fee_rate, "Expected fee rate of at least {}, the tx has {}", fee_rate, tx_fee_rate);
}
});
}