Compare commits

..

7 Commits

Author SHA1 Message Date
codeShark149
d2da3755f4 Add Peer Manager
The Peer Manager structure is responsible for maintaining a live
directory of connected CBF/Non-CBF peers. It can send and receive
messages from one or multiple peers.

It manages an underlying address manager to fetch network addresses,
and it maintains a ban score for each peer. A threshold banscore (100)
is used to signal banning of a peer. Once a peer is banned it will not
be connected to in subsequent runs.

Peer manager will be used to handle parallel fetching of filter data
from multiple peers.
2021-07-22 00:00:55 +05:30
Steve Myers
6acb4d9796 Ignore wip test 2021-07-20 09:45:34 -07:00
codeShark149
377e5cdd49 Update address manager test
The two tests have been combined into a single one. Added extra
log points for better debugging in CI if it gets stuck.
2021-07-20 19:55:16 +05:30
rajarshimaitra
70d2a0ee6b Return Option when getting addresses
When getting addresses return Option with Some vectors, or None in
case of empty vectors.

This makes handling the case of getting empty result easier.
2021-07-20 19:55:16 +05:30
codeShark149
de1fc2a677 Update compact_filters module
Compact filters module is updated to include the new Address Manager.

The new PeerError structure is added into CompactFiltersError to have
compatibility with existing APIs.

Minor cargo fmt nit fixes
2021-07-20 19:55:16 +05:30
codeShark149
671d90e57c Add Address Manager
Add the final address manager structures. Address Manager is responsible
for finding new peers from the network. It maintains a directory with
cbf/non_cbf catagories of peer. A Peer Manager will query this
Address Manager to provide it with fresh addresses.

Currently Address Manager is single threaded. When a user calls it to
fetch the network, it will block the call until fetch completes.

It knows when to stop fetching via config variable MIN_CBF_BUFFER and
MIN_NONCBF_BUFFER. And it stops fetching when buffer requirement is met.

Co-authored-by: John Cantrell <johncantrell97@protonmail.com>
2021-07-20 19:55:15 +05:30
codeShark149
9480faa5d3 Add PeerError structure in peer module
This adds a new PeerError structure in the peer module. To handle all
the peer related errors. PeerErrors contains all the mempool errors too,
for now. Later if we have a more complex mempool, we might decide to
have its own dedicated error.

PeerError is to be included in the global CompactFiltersError type.
2021-07-20 19:55:15 +05:30
40 changed files with 2038 additions and 2316 deletions

View File

@@ -24,7 +24,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Test
run: cargo test --features all-keys,compiler,esplora,ureq,compact_filters --no-default-features
run: cargo test --features all-keys,compiler,esplora,compact_filters --no-default-features
- id: coverage
name: Generate coverage

View File

@@ -10,23 +10,20 @@ jobs:
strategy:
matrix:
rust:
- 1.56.0 # STABLE
- 1.53.0 # STABLE
- 1.46.0 # MSRV
features:
- default
- minimal
- all-keys
- minimal,use-esplora-ureq
- minimal,esplora
- key-value-db
- electrum
- compact_filters
- esplora,ureq,key-value-db,electrum
- esplora,key-value-db,electrum
- compiler
- rpc
- verify
- async-interface
- use-esplora-reqwest
- sqlite
steps:
- name: checkout
uses: actions/checkout@v2
@@ -79,20 +76,15 @@ jobs:
run: cargo test --features test-md-docs --no-default-features -- doctest::ReadmeDoctests
test-blockchains:
name: Blockchain ${{ matrix.blockchain.features }}
name: Test ${{ matrix.blockchain.name }}
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
blockchain:
- name: electrum
features: test-electrum
- name: rpc
features: test-rpc
- name: esplora
features: test-esplora,use-esplora-reqwest
- name: esplora
features: test-esplora,use-esplora-ureq
steps:
- name: Checkout
uses: actions/checkout@v2
@@ -101,6 +93,8 @@ jobs:
with:
path: |
~/.cargo/registry
~/.cargo/bitcoin
~/.cargo/electrs
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ github.job }}-${{ hashFiles('**/Cargo.toml','**/Cargo.lock') }}
@@ -110,11 +104,11 @@ jobs:
toolchain: stable
override: true
- name: Test
run: cargo test --no-default-features --features ${{ matrix.blockchain.features }} ${{ matrix.blockchain.name }}::bdk_blockchain_tests
run: cargo test --features test-${{ matrix.blockchain.name }} ${{ matrix.blockchain.name }}::bdk_blockchain_tests
check-wasm:
name: Check WASM
runs-on: ubuntu-20.04
runs-on: ubuntu-16.04
env:
CC: clang-10
CFLAGS: -I/usr/include
@@ -131,11 +125,11 @@ jobs:
key: ${{ runner.os }}-cargo-${{ github.job }}-${{ hashFiles('**/Cargo.toml','**/Cargo.lock') }}
# Install a recent version of clang that supports wasm32
- run: wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add - || exit 1
- run: sudo apt-add-repository "deb http://apt.llvm.org/focal/ llvm-toolchain-focal-10 main" || exit 1
- run: sudo apt-add-repository "deb http://apt.llvm.org/xenial/ llvm-toolchain-xenial-10 main" || exit 1
- run: sudo apt-get update || exit 1
- run: sudo apt-get install -y libclang-common-10-dev clang-10 libc6-dev-i386 || exit 1
- name: Set default toolchain
run: rustup default 1.56.0 # STABLE
run: rustup default 1.53.0 # STABLE
- name: Set profile
run: rustup set profile minimal
- name: Add target wasm32
@@ -143,7 +137,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Check
run: cargo check --target wasm32-unknown-unknown --features use-esplora-reqwest --no-default-features
run: cargo check --target wasm32-unknown-unknown --features esplora --no-default-features
fmt:
name: Rust fmt

View File

@@ -24,7 +24,7 @@ jobs:
- name: Update toolchain
run: rustup update
- name: Build docs
run: cargo rustdoc --verbose --features=compiler,electrum,esplora,ureq,compact_filters,key-value-db,all-keys,sqlite -- --cfg docsrs -Dwarnings
run: cargo rustdoc --verbose --features=compiler,electrum,esplora,compact_filters,key-value-db,all-keys -- --cfg docsrs -Dwarnings
- name: Upload artifact
uses: actions/upload-artifact@v2
with:

View File

@@ -6,33 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [v0.13.0] - [v0.12.0]
- Exposed `get_tx()` method from `Database` to `Wallet`.
## [v0.12.0] - [v0.11.0]
- Activate `miniscript/use-serde` feature to allow consumers of the library to access it via the re-exported `miniscript` crate.
- Add support for proxies in `EsploraBlockchain`
- Added `SqliteDatabase` that implements `Database` backed by a sqlite database using `rusqlite` crate.
## [v0.11.0] - [v0.10.0]
- Added `flush` method to the `Database` trait to explicitly flush to disk latest changes on the db.
## [v0.10.0] - [v0.9.0]
- Added `RpcBlockchain` in the `AnyBlockchain` struct to allow using Rpc backend where `AnyBlockchain` is used (eg `bdk-cli`)
- Removed hard dependency on `tokio`.
### Wallet
- Removed and replaced `set_single_recipient` with more general `drain_to` and replaced `maintain_single_recipient` with `allow_shrinking`.
### Blockchain
- Removed `stop_gap` from `Blockchain` trait and added it to only `ElectrumBlockchain` and `EsploraBlockchain` structs.
- Added a `ureq` backend for use when not using feature `async-interface` or target WASM. `ureq` is a blocking HTTP client.
- Removed `stop_gap` from `Blockchain` trait and added it to only `ElectrumBlockchain` and `EsploraBlockchain` structs
## [v0.9.0] - [v0.8.0]
@@ -374,7 +354,7 @@ final transaction is created by calling `finish` on the builder.
- Use `MemoryDatabase` in the compiler example
- Make the REPL return JSON
[unreleased]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...HEAD
[unreleased]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...HEAD
[0.1.0-beta.1]: https://github.com/bitcoindevkit/bdk/compare/96c87ea5...0.1.0-beta.1
[v0.2.0]: https://github.com/bitcoindevkit/bdk/compare/0.1.0-beta.1...v0.2.0
[v0.3.0]: https://github.com/bitcoindevkit/bdk/compare/v0.2.0...v0.3.0
@@ -385,7 +365,3 @@ final transaction is created by calling `finish` on the builder.
[v0.7.0]: https://github.com/bitcoindevkit/bdk/compare/v0.6.0...v0.7.0
[v0.8.0]: https://github.com/bitcoindevkit/bdk/compare/v0.7.0...v0.8.0
[v0.9.0]: https://github.com/bitcoindevkit/bdk/compare/v0.8.0...v0.9.0
[v0.10.0]: https://github.com/bitcoindevkit/bdk/compare/v0.9.0...v0.10.0
[v0.11.0]: https://github.com/bitcoindevkit/bdk/compare/v0.10.0...v0.11.0
[v0.12.0]: https://github.com/bitcoindevkit/bdk/compare/v0.11.0...v0.12.0
[v0.13.0]: https://github.com/bitcoindevkit/bdk/compare/v0.12.0...v0.13.0

View File

@@ -57,21 +57,6 @@ comment suggesting that you're working on it. If someone is already assigned,
don't hesitate to ask if the assigned party or previous commenters are still
working on it if it has been awhile.
Deprecation policy
------------------
Where possible, breaking existing APIs should be avoided. Instead, add new APIs and
use [`#[deprecated]`](https://github.com/rust-lang/rfcs/blob/master/text/1270-deprecation.md)
to discourage use of the old one.
Deprecated APIs are typically maintained for one release cycle. In other words, an
API that has been deprecated with the 0.10 release can be expected to be removed in the
0.11 release. This allows for smoother upgrades without incurring too much technical
debt inside this library.
If you deprecated an API as part of a contribution, we encourage you to "own" that API
and send a follow-up to remove it as part of the next release cycle.
Peer review
-----------

View File

@@ -1,6 +1,6 @@
[package]
name = "bdk"
version = "0.13.1-dev"
version = "0.9.1-dev"
edition = "2018"
authors = ["Alekos Filini <alekos.filini@gmail.com>", "Riccardo Casatta <riccardo@casatta.it>"]
homepage = "https://bitcoindevkit.org"
@@ -12,40 +12,30 @@ readme = "README.md"
license = "MIT OR Apache-2.0"
[dependencies]
bdk-macros = "^0.6"
bdk-macros = "^0.4"
log = "^0.4"
miniscript = { version = "^6.0", features = ["use-serde"] }
bitcoin = { version = "^0.27", features = ["use-serde", "base64"] }
miniscript = "5.1"
bitcoin = { version = "~0.26.2", features = ["use-serde", "base64"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = { version = "^1.0" }
rand = "^0.7"
# Optional dependencies
sled = { version = "0.34", optional = true }
electrum-client = { version = "0.8", optional = true }
rusqlite = { version = "0.25.3", optional = true }
ahash = { version = "=0.7.4", optional = true }
electrum-client = { version = "0.7", optional = true }
reqwest = { version = "0.11", optional = true, features = ["json"] }
ureq = { version = "~2.2.0", features = ["json"], optional = true }
futures = { version = "0.3", optional = true }
async-trait = { version = "0.1", optional = true }
rocksdb = { version = "0.14", default-features = false, features = ["snappy"], optional = true }
rocksdb = { version = "0.14", optional = true }
cc = { version = ">=1.0.64", optional = true }
socks = { version = "0.3", optional = true }
lazy_static = { version = "1.4", optional = true }
# the latest 0.8 version of tiny-bip39 depends on zeroize_derive 1.2 which has MSRV 1.51 and our
# MSRV is 1.46, to fix this until we update our MSRV or replace the tiny-bip39
# dependency https://github.com/bitcoindevkit/bdk/issues/399 we can only use an older version
tiny-bip39 = { version = "< 0.8", optional = true }
# backtrace > 0.3.61 includes object v0.27 which doesn't compile on 1.46. this is used by
# tiny-bip39
backtrace = { version = "=0.3.61", optional = true }
tiny-bip39 = { version = "^0.8", optional = true }
zeroize = { version = "<1.4.0", optional = true }
bitcoinconsensus = { version = "0.19.0-3", optional = true }
# Needed by bdk_blockchain_tests macro
bitcoincore-rpc = { version = "0.14", optional = true }
bitcoincore-rpc = { version = "0.13", optional = true }
# Platform-specific dependencies
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@@ -61,47 +51,28 @@ minimal = []
compiler = ["miniscript/compiler"]
verify = ["bitcoinconsensus"]
default = ["key-value-db", "electrum"]
sqlite = ["rusqlite", "ahash"]
electrum = ["electrum-client"]
esplora = ["reqwest", "futures"]
compact_filters = ["rocksdb", "socks", "lazy_static", "cc"]
key-value-db = ["sled"]
all-keys = ["keys-bip39"]
keys-bip39 = ["tiny-bip39", "backtrace"]
rpc = ["bitcoincore-rpc"]
# We currently provide mulitple implementations of `Blockchain`, all are
# blocking except for the `EsploraBlockchain` which can be either async or
# blocking, depending on the HTTP client in use.
#
# - Users wanting asynchronous HTTP calls should enable `async-interface` to get
# access to the asynchronous method implementations. Then, if Esplora is wanted,
# enable `esplora` AND `reqwest` (`--features=use-esplora-reqwest`).
# - Users wanting blocking HTTP calls can use any of the other blockchain
# implementations (`compact_filters`, `electrum`, or `esplora`). Users wanting to
# use Esplora should enable `esplora` AND `ureq` (`--features=use-esplora-ureq`).
#
# WARNING: Please take care with the features below, various combinations will
# fail to build. We cannot currently build `bdk` with `--all-features`.
async-interface = ["async-trait"]
electrum = ["electrum-client"]
# MUST ALSO USE `--no-default-features`.
use-esplora-reqwest = ["esplora", "reqwest", "reqwest/socks", "futures"]
use-esplora-ureq = ["esplora", "ureq", "ureq/socks"]
# Typical configurations will not need to use `esplora` feature directly.
esplora = []
all-keys = ["keys-bip39"]
keys-bip39 = ["tiny-bip39", "zeroize"]
rpc = ["bitcoincore-rpc"]
# Debug/Test features
test-blockchains = ["bitcoincore-rpc", "electrum-client"]
test-electrum = ["electrum", "electrsd/electrs_0_8_10", "test-blockchains"]
test-rpc = ["rpc", "electrsd/electrs_0_8_10", "test-blockchains"]
test-esplora = ["electrsd/legacy", "electrsd/esplora_a33e97e1", "test-blockchains"]
test-esplora = ["esplora", "electrsd/legacy", "electrsd/esplora_a33e97e1", "test-blockchains"]
test-md-docs = ["electrum"]
[dev-dependencies]
lazy_static = "1.4"
env_logger = "0.7"
clap = "2.33"
electrsd = { version= "0.12", features = ["trigger", "bitcoind_0_21_1"] }
electrsd = { version= "0.6", features = ["trigger", "bitcoind_0_21_1"] }
[[example]]
name = "address_validator"
@@ -117,6 +88,6 @@ required-features = ["compiler"]
[workspace]
members = ["macros"]
[package.metadata.docs.rs]
features = ["compiler", "electrum", "esplora", "ureq", "compact_filters", "rpc", "key-value-db", "sqlite", "all-keys", "verify"]
features = ["compiler", "electrum", "esplora", "compact_filters", "rpc", "key-value-db", "all-keys", "verify"]
# defines the configuration attribute `docsrs`
rustdoc-args = ["--cfg", "docsrs"]

View File

@@ -32,14 +32,14 @@ Pre-`v1.0.0` our "major" releases only affect the "minor" semver value. Accordin
- If it's a minor issue you can just fix it in the release branch, since it will be merged back to `master` eventually
- For bigger issues you can fix them on `master` and then *cherry-pick* the commit to the release branch
6. Update the changelog with the new release version.
7. Update `src/lib.rs` with the new version (line ~43)
7. Update `src/lib.rs` with the new version (line ~59)
8. On release day, make a commit on the release branch to bump the version to `x.y.z`. The message should be "Bump version to x.y.z".
9. Add a tag to this commit. The tag name should be `vx.y.z` (for example `v0.5.0`), and the message "Release x.y.z". Make sure the tag is signed, for extra safety use the explicit `--sign` flag.
10. Push the new commits to the upstream release branch, wait for the CI to finish one last time.
11. Publish **all** the updated crates to crates.io.
12. Make a new commit to bump the version value to `x.y.(z+1)-dev`. The message should be "Bump version to x.y.(z+1)-dev".
13. Merge the release branch back into `master`.
14. If the `master` branch contains any unreleased changes to the `bdk-macros` crate, change the `bdk` Cargo.toml `[dependencies]` to point to the local path (ie. `bdk-macros = { path = "./macros"}`)
14. If the `master` branch contains any unreleased changes to the `bdk-macros`, `bdk-testutils`, or `bdk-testutils-macros` crates, change the `bdk` Cargo.toml `[dev-dependencies]` to point to the local path (ie. `bdk-testutils-macros = { path = "./testutils-macros"}`)
15. Create the release on GitHub: go to "tags", click on the dots on the right and select "Create Release". Then set the title to `vx.y.z` and write down some brief release notes.
16. Make sure the new release shows up on crates.io and that the docs are built correctly on docs.rs.
17. Announce the release on Twitter, Discord and Telegram.

View File

@@ -70,7 +70,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let policy_str = matches.value_of("POLICY").unwrap();
info!("Compiling policy: {}", policy_str);
let policy = Concrete::<String>::from_str(policy_str)?;
let policy = Concrete::<String>::from_str(&policy_str)?;
let descriptor = match matches.value_of("TYPE").unwrap() {
"sh" => Descriptor::new_sh(policy.compile()?)?,

View File

@@ -1,6 +1,6 @@
[package]
name = "bdk-macros"
version = "0.6.0"
version = "0.4.0"
authors = ["Alekos Filini <alekos.filini@gmail.com>"]
edition = "2018"
homepage = "https://bitcoindevkit.org"

View File

@@ -37,9 +37,9 @@
//! )?;
//! # }
//!
//! # #[cfg(all(feature = "esplora", feature = "ureq"))]
//! # #[cfg(feature = "esplora")]
//! # {
//! let esplora_blockchain = EsploraBlockchain::new("...", 20);
//! let esplora_blockchain = EsploraBlockchain::new("...", None, 20);
//! let wallet_esplora: Wallet<AnyBlockchain, _> = Wallet::new(
//! "...",
//! None,
@@ -60,8 +60,6 @@
//! # use bdk::blockchain::*;
//! # use bdk::database::MemoryDatabase;
//! # use bdk::Wallet;
//! # #[cfg(all(feature = "esplora", feature = "ureq"))]
//! # {
//! let config = serde_json::from_str("...")?;
//! let blockchain = AnyBlockchain::from_config(&config)?;
//! let wallet = Wallet::new(
@@ -71,7 +69,6 @@
//! MemoryDatabase::default(),
//! blockchain,
//! )?;
//! # }
//! # Ok::<(), bdk::Error>(())
//! ```
@@ -97,8 +94,6 @@ macro_rules! impl_inner_method {
AnyBlockchain::Esplora(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "compact_filters")]
AnyBlockchain::CompactFilters(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "rpc")]
AnyBlockchain::Rpc(inner) => inner.$name( $($args, )* ),
}
}
}
@@ -121,10 +116,6 @@ pub enum AnyBlockchain {
#[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))]
/// Compact filters client
CompactFilters(compact_filters::CompactFiltersBlockchain),
#[cfg(feature = "rpc")]
#[cfg_attr(docsrs, doc(cfg(feature = "rpc")))]
/// RPC client
Rpc(rpc::RpcBlockchain),
}
#[maybe_async]
@@ -166,7 +157,6 @@ impl Blockchain for AnyBlockchain {
impl_from!(electrum::ElectrumBlockchain, AnyBlockchain, Electrum, #[cfg(feature = "electrum")]);
impl_from!(esplora::EsploraBlockchain, AnyBlockchain, Esplora, #[cfg(feature = "esplora")]);
impl_from!(compact_filters::CompactFiltersBlockchain, AnyBlockchain, CompactFilters, #[cfg(feature = "compact_filters")]);
impl_from!(rpc::RpcBlockchain, AnyBlockchain, Rpc, #[cfg(feature = "rpc")]);
/// Type that can contain any of the blockchain configurations defined by the library
///
@@ -216,10 +206,6 @@ pub enum AnyBlockchainConfig {
#[cfg_attr(docsrs, doc(cfg(feature = "compact_filters")))]
/// Compact filters client
CompactFilters(compact_filters::CompactFiltersBlockchainConfig),
#[cfg(feature = "rpc")]
#[cfg_attr(docsrs, doc(cfg(feature = "rpc")))]
/// RPC client configuration
Rpc(rpc::RpcConfig),
}
impl ConfigurableBlockchain for AnyBlockchain {
@@ -239,10 +225,6 @@ impl ConfigurableBlockchain for AnyBlockchain {
AnyBlockchainConfig::CompactFilters(inner) => AnyBlockchain::CompactFilters(
compact_filters::CompactFiltersBlockchain::from_config(inner)?,
),
#[cfg(feature = "rpc")]
AnyBlockchainConfig::Rpc(inner) => {
AnyBlockchain::Rpc(rpc::RpcBlockchain::from_config(inner)?)
}
})
}
}
@@ -250,4 +232,3 @@ impl ConfigurableBlockchain for AnyBlockchain {
impl_from!(electrum::ElectrumBlockchainConfig, AnyBlockchainConfig, Electrum, #[cfg(feature = "electrum")]);
impl_from!(esplora::EsploraBlockchainConfig, AnyBlockchainConfig, Esplora, #[cfg(feature = "esplora")]);
impl_from!(compact_filters::CompactFiltersBlockchainConfig, AnyBlockchainConfig, CompactFilters, #[cfg(feature = "compact_filters")]);
impl_from!(rpc::RpcConfig, AnyBlockchainConfig, Rpc, #[cfg(feature = "rpc")]);

View File

@@ -0,0 +1,764 @@
// Bitcoin Dev Kit
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
// John Cantrell <johncantrell97@protonmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
use std::fs::File;
use std::io::prelude::*;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, SendError, Sender},
Arc, RwLock,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use super::{Mempool, Peer, PeerError};
use bitcoin::network::{
constants::{Network, ServiceFlags},
message::NetworkMessage,
Address,
};
/// Default address pool minimums
const MIN_CBF_BUFFER: usize = 5;
const MIN_NONCBF_BUFFER: usize = 5;
/// A Discovery structure used by workers
///
/// Discovery can be initiated via a cache,
/// Or it will start with default hardcoded seeds
pub struct AddressDiscovery {
pending: VecDeque<SocketAddr>,
visited: HashSet<SocketAddr>,
}
impl AddressDiscovery {
fn new(network: Network, seeds: VecDeque<SocketAddr>) -> AddressDiscovery {
let mut network_seeds = AddressDiscovery::seeds(network);
let mut total_seeds = seeds;
total_seeds.append(&mut network_seeds);
AddressDiscovery {
pending: total_seeds,
visited: HashSet::new(),
}
}
fn add_pendings(&mut self, addresses: Vec<SocketAddr>) {
for addr in addresses {
if !self.pending.contains(&addr) && !self.visited.contains(&addr) {
self.pending.push_back(addr);
}
}
}
fn get_next(&mut self) -> Option<SocketAddr> {
match self.pending.pop_front() {
None => None,
Some(next) => {
self.visited.insert(next);
Some(next)
}
}
}
fn seeds(network: Network) -> VecDeque<SocketAddr> {
let mut seeds = VecDeque::new();
let port: u16 = match network {
Network::Bitcoin => 8333,
Network::Testnet => 18333,
Network::Regtest => 18444,
Network::Signet => 38333,
};
let seedhosts: &[&str] = match network {
Network::Bitcoin => &[
"seed.bitcoin.sipa.be",
"dnsseed.bluematt.me",
"dnsseed.bitcoin.dashjr.org",
"seed.bitcoinstats.com",
"seed.bitcoin.jonasschnelli.ch",
"seed.btc.petertodd.org",
"seed.bitcoin.sprovoost.nl",
"dnsseed.emzy.de",
"seed.bitcoin.wiz.biz",
],
Network::Testnet => &[
"testnet-seed.bitcoin.jonasschnelli.ch",
"seed.tbtc.petertodd.org",
"seed.testnet.bitcoin.sprovoost.nl",
"testnet-seed.bluematt.me",
],
Network::Regtest => &[],
Network::Signet => &[],
};
for seedhost in seedhosts.iter() {
if let Ok(lookup) = (*seedhost, port).to_socket_addrs() {
for host in lookup {
if host.is_ipv4() {
seeds.push_back(host);
}
}
}
}
seeds
}
}
/// Crawler structure that will interface with Discovery and public bitcoin network
///
/// Address manager will spawn multiple crawlers in separate threads to discover new addresses.
struct AddressWorker {
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
}
impl AddressWorker {
fn new(
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
) -> AddressWorker {
AddressWorker {
discovery,
sender,
network,
}
}
fn try_receive_addr(&mut self, peer: &Peer) -> Result<(), AddressManagerError> {
if let Some(NetworkMessage::Addr(new_addresses)) =
peer.recv("addr", Some(Duration::from_secs(1)))?
{
self.consume_addr(new_addresses)?;
}
Ok(())
}
fn consume_addr(&mut self, addrs: Vec<(u32, Address)>) -> Result<(), AddressManagerError> {
let mut discovery_lock = self.discovery.write().map_err(PeerError::from)?;
let mut addresses = Vec::new();
for network_addrs in addrs {
if let Ok(socket_addrs) = network_addrs.1.socket_addr() {
addresses.push(socket_addrs);
}
}
discovery_lock.add_pendings(addresses);
Ok(())
}
fn work(&mut self) -> Result<(), AddressManagerError> {
loop {
let next_address = {
let mut address_discovery = self.discovery.write()?;
address_discovery.get_next()
};
match next_address {
Some(address) => {
let potential_peer = Peer::connect_with_timeout(
address,
Duration::from_secs(1),
Arc::new(Mempool::default()),
self.network,
);
if let Ok(peer) = potential_peer {
peer.send(NetworkMessage::GetAddr)?;
self.try_receive_addr(&peer)?;
self.try_receive_addr(&peer)?;
self.sender.send((address, peer.get_version().services))?;
// TODO: Investigate why close is being called on non existent connections
// currently the errors are ignored
peer.close().unwrap_or(());
}
}
None => continue,
}
}
}
}
/// A dedicated cache structure, with cbf/non_cbf separation
///
/// [AddressCache] will interface with file i/o
/// And can te turned into seeds. Generation of seed will put previously cached
/// cbf addresses at front of the vec, to boost up cbf node findings
#[derive(Serialize, Deserialize)]
struct AddressCache {
banned_peers: HashSet<SocketAddr>,
cbf: HashSet<SocketAddr>,
non_cbf: HashSet<SocketAddr>,
}
impl AddressCache {
fn empty() -> Self {
Self {
banned_peers: HashSet::new(),
cbf: HashSet::new(),
non_cbf: HashSet::new(),
}
}
fn from_file(path: &str) -> Result<Option<Self>, AddressManagerError> {
let serialized: Result<String, _> = std::fs::read_to_string(path);
let serialized = match serialized {
Ok(contents) => contents,
Err(_) => return Ok(None),
};
let address_cache = serde_json::from_str(&serialized)?;
Ok(Some(address_cache))
}
fn write_to_file(&self, path: &str) -> Result<(), AddressManagerError> {
let serialized = serde_json::to_string_pretty(&self)?;
let mut cache_file = File::create(path)?;
cache_file.write_all(serialized.as_bytes())?;
Ok(())
}
fn make_seeds(&self) -> VecDeque<SocketAddr> {
self.cbf
.iter()
.chain(self.non_cbf.iter())
.copied()
.collect()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.remove(addrs)
} else {
self.non_cbf.remove(addrs)
}
}
fn add_address(&mut self, addrs: SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.insert(addrs)
} else {
self.non_cbf.insert(addrs)
}
}
fn add_to_banlist(&mut self, addrs: SocketAddr, cbf: bool) {
if self.banned_peers.insert(addrs) {
self.remove_address(&addrs, cbf);
}
}
}
/// A Live directory maintained by [AddressManager] of freshly found cbf and non_cbf nodes by workers
///
/// Each instance of new [AddressManager] with have fresh [AddressDirectory]
/// This is independent from the cache and will be an in-memory database to
/// fetch addresses to the user.
struct AddressDirectory {
cbf_nodes: HashSet<SocketAddr>,
non_cbf_nodes: HashSet<SocketAddr>,
// List of addresses it has previously provided to the caller (PeerManager)
previously_sent: HashSet<SocketAddr>,
}
impl AddressDirectory {
fn new() -> AddressDirectory {
AddressDirectory {
cbf_nodes: HashSet::new(),
non_cbf_nodes: HashSet::new(),
previously_sent: HashSet::new(),
}
}
fn add_address(&mut self, addr: SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.insert(addr);
} else {
self.non_cbf_nodes.insert(addr);
}
}
fn get_new_address(&mut self, cbf: bool) -> Option<SocketAddr> {
if cbf {
if let Some(new_addresses) = self
.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
} else if let Some(new_addresses) = self
.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
}
fn get_cbf_address_count(&self) -> usize {
self.cbf_nodes.len()
}
fn get_non_cbf_address_count(&self) -> usize {
self.non_cbf_nodes.len()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.remove(addrs);
} else {
self.non_cbf_nodes.remove(addrs);
}
}
fn get_cbf_buffer(&self) -> usize {
self.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
fn get_non_cbf_buffer(&self) -> usize {
self.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
}
/// Discovery statistics, useful for logging
#[derive(Clone, Copy)]
pub struct DiscoveryData {
queued: usize,
visited: usize,
non_cbf_count: usize,
cbf_count: usize,
}
/// Progress trait for discovery statistics logging
pub trait DiscoveryProgress {
/// Update progress
fn update(&self, data: DiscoveryData);
}
/// Used when progress updates are not desired
#[derive(Clone)]
pub struct NoDiscoveryProgress;
impl DiscoveryProgress for NoDiscoveryProgress {
fn update(&self, _data: DiscoveryData) {}
}
/// Used to log progress update
#[derive(Clone)]
pub struct LogDiscoveryProgress;
impl DiscoveryProgress for LogDiscoveryProgress {
fn update(&self, data: DiscoveryData) {
log::trace!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued,
data.visited,
data.non_cbf_count,
data.cbf_count
);
#[cfg(test)]
println!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued, data.visited, data.non_cbf_count, data.cbf_count
);
}
}
/// A manager structure managing address discovery
///
/// Manager will try to maintain a given address buffer in its directory
/// buffer = len(exiting addresses) - len(previously provided addresses)
/// Manager will crawl the network until buffer criteria is satisfied
/// Manager will bootstrap workers from a cache, to speed up discovery progress in
/// subsequent call after the first crawl.
/// Manager will keep track of the cache and only update it if previously
/// unknown addresses are found.
pub struct AddressManager<P: DiscoveryProgress> {
directory: AddressDirectory,
cache_filename: String,
discovery: Arc<RwLock<AddressDiscovery>>,
threads: usize,
receiver: Receiver<(SocketAddr, ServiceFlags)>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
cbf_buffer: usize,
non_cbf_buffer: usize,
progress: P,
}
impl<P: DiscoveryProgress> AddressManager<P> {
/// Create a new manager. Initiate Discovery seeds from the cache
/// if it exists, else start with hardcoded seeds
pub fn new(
network: Network,
cache_filename: String,
threads: usize,
cbf_buffer: Option<usize>,
non_cbf_buffer: Option<usize>,
progress: P,
) -> Result<AddressManager<P>, AddressManagerError> {
let (sender, receiver) = channel();
let seeds = match AddressCache::from_file(&cache_filename)? {
Some(cache) => cache.make_seeds(),
None => VecDeque::new(),
};
let min_cbf = cbf_buffer.unwrap_or(MIN_CBF_BUFFER);
let min_non_cbf = non_cbf_buffer.unwrap_or(MIN_NONCBF_BUFFER);
let discovery = AddressDiscovery::new(network, seeds);
Ok(AddressManager {
cache_filename,
directory: AddressDirectory::new(),
discovery: Arc::new(RwLock::new(discovery)),
sender,
receiver,
network,
threads,
cbf_buffer: min_cbf,
non_cbf_buffer: min_non_cbf,
progress,
})
}
/// Get running address discovery progress
fn get_progress(&self) -> Result<DiscoveryData, AddressManagerError> {
let (queued_count, visited_count) = {
let address_discovery = self.discovery.read()?;
(
address_discovery.pending.len(),
address_discovery.visited.len(),
)
};
let cbf_node_count = self.directory.get_cbf_address_count();
let other_node_count = self.directory.get_non_cbf_address_count();
Ok(DiscoveryData {
queued: queued_count,
visited: visited_count,
non_cbf_count: cbf_node_count + other_node_count,
cbf_count: cbf_node_count,
})
}
/// Spawn [self.thread] no. of worker threads
fn spawn_workers(&mut self) -> Vec<JoinHandle<()>> {
let mut worker_handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..self.threads {
let sender = self.sender.clone();
let discovery = self.discovery.clone();
let network = self.network;
let worker_handle = thread::spawn(move || {
let mut worker = AddressWorker::new(discovery, sender, network);
worker.work().unwrap();
});
worker_handles.push(worker_handle);
}
worker_handles
}
/// Crawl the Bitcoin network until required number of cbf/non_cbf nodes are found
///
/// - This will start a bunch of crawlers.
/// - load up the existing cache.
/// - Update the cache with new found peers.
/// - check if address is in banlist
/// - run crawlers until buffer requirement is matched
/// - flush the current cache into disk
pub fn fetch(&mut self) -> Result<(), AddressManagerError> {
self.spawn_workers();
// Get already existing cache
let mut cache = match AddressCache::from_file(&self.cache_filename)? {
Some(cache) => cache,
None => AddressCache::empty(),
};
while self.directory.get_cbf_buffer() < self.cbf_buffer
|| self.directory.get_non_cbf_buffer() < self.non_cbf_buffer
{
if let Ok(message) = self.receiver.recv() {
let (addr, flag) = message;
if !cache.banned_peers.contains(&addr) {
let cbf = flag.has(ServiceFlags::COMPACT_FILTERS);
self.directory.add_address(addr, cbf);
cache.add_address(addr, cbf);
}
}
}
self.progress.update(self.get_progress()?);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename)?;
Ok(())
}
/// Get a new addresses not previously provided
pub fn get_new_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(true)
}
/// Get a new non_cbf address
pub fn get_new_non_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(false)
}
/// Ban an address
pub fn ban_peer(&mut self, addrs: &SocketAddr, cbf: bool) -> Result<(), AddressManagerError> {
let mut cache = AddressCache::from_file(&self.cache_filename)?.ok_or_else(|| {
AddressManagerError::Generic("Address Cache file not found".to_string())
})?;
cache.add_to_banlist(*addrs, cbf);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename).unwrap();
self.directory.remove_address(addrs, cbf);
Ok(())
}
/// Get all the known CBF addresses
pub fn get_known_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get all the known regular addresses
pub fn get_known_non_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.non_cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get previously tried addresses
pub fn get_previously_tried(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.previously_sent
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
}
#[derive(Debug)]
pub enum AddressManagerError {
/// Std I/O Error
Io(std::io::Error),
/// Internal Peer error
Peer(PeerError),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedOut,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Internal MPSC sending error
MpscSendError,
/// Serde Json Error
SerdeJson(SerdeError),
/// Generic Errors
Generic(String),
}
impl_error!(PeerError, Peer, AddressManagerError);
impl_error!(std::io::Error, Io, AddressManagerError);
impl_error!(SerdeError, SerdeJson, AddressManagerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
AddressManagerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
AddressManagerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
AddressManagerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for AddressManagerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
AddressManagerError::MutexTimedOut
} else {
AddressManagerError::MutexPoisoned
}
}
}
impl<T> From<SendError<T>> for AddressManagerError {
fn from(_: SendError<T>) -> Self {
AddressManagerError::MpscSendError
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
#[ignore]
fn test_address_manager() {
// Initiate a manager with an non existent cache file name.
// It will create a new cache file
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting manager and initial fetch");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration1 = start.elapsed();
println!("Completed Initial fetch");
// Create a new manager from existing cache and fetch again
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting new fetch with previous cache");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration2 = start.elapsed();
println!("Completed new fetch()");
println!("Time taken for initial crawl: {:#?}", duration1);
println!("Time taken for next crawl {:#?}", duration2);
// Check Buffer Management
println!("Checking buffer management");
// Fetch few new address and ensure buffer goes to zero
let mut addrs_list = Vec::new();
for _ in 0..5 {
let addr_cbf = manager.get_new_cbf_address().unwrap();
let addrs_non_cbf = manager.get_new_non_cbf_address().unwrap();
addrs_list.push(addr_cbf);
addrs_list.push(addrs_non_cbf);
}
assert_eq!(addrs_list.len(), 10);
// This should exhaust the cbf buffer
assert_eq!(manager.directory.get_cbf_buffer(), 0);
// Calling fetch again should start crawlers until buffer
// requirements are matched.
println!("Address buffer exhausted, starting new fetch");
manager.fetch().unwrap();
println!("Fetch Complete");
// It should again have a cbf buffer of 5
assert_eq!(manager.directory.get_cbf_buffer(), 5);
println!("Buffer management passed");
}
}

View File

@@ -63,7 +63,9 @@ use bitcoin::{Network, OutPoint, Transaction, Txid};
use rocksdb::{Options, SliceTransform, DB};
mod address_manager;
mod peer;
mod peermngr;
mod store;
mod sync;
@@ -77,8 +79,11 @@ use peer::*;
use store::*;
use sync::*;
// Only added to avoid unused warnings in addrsmngr module
pub use address_manager::{
AddressManager, DiscoveryProgress, LogDiscoveryProgress, NoDiscoveryProgress,
};
pub use peer::{Mempool, Peer};
const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
@@ -254,7 +259,7 @@ impl Blockchain for CompactFiltersBlockchain {
let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
if let Some(snapshot) = sync::sync_headers(
Arc::clone(first_peer),
Arc::clone(&first_peer),
Arc::clone(&self.headers),
|new_height| {
let local_headers_cost =
@@ -275,7 +280,7 @@ impl Blockchain for CompactFiltersBlockchain {
let buried_height = synced_height.saturating_sub(sync::BURIED_CONFIRMATIONS);
info!("Synced headers to height: {}", synced_height);
cf_sync.prepare_sync(Arc::clone(first_peer))?;
cf_sync.prepare_sync(Arc::clone(&first_peer))?;
let all_scripts = Arc::new(
database
@@ -294,7 +299,7 @@ impl Blockchain for CompactFiltersBlockchain {
let mut threads = Vec::with_capacity(self.peers.len());
for peer in &self.peers {
let cf_sync = Arc::clone(&cf_sync);
let peer = Arc::clone(peer);
let peer = Arc::clone(&peer);
let headers = Arc::clone(&self.headers);
let all_scripts = Arc::clone(&all_scripts);
let last_synced_block = Arc::clone(&last_synced_block);
@@ -371,10 +376,10 @@ impl Blockchain for CompactFiltersBlockchain {
database.commit_batch(updates)?;
match first_peer.ask_for_mempool() {
Err(CompactFiltersError::PeerBloomDisabled) => {
Err(PeerError::PeerBloomDisabled(_)) => {
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
}
e => e?,
e => e.map_err(CompactFiltersError::from)?,
};
let mut internal_max_deriv = None;
@@ -392,7 +397,12 @@ impl Blockchain for CompactFiltersBlockchain {
)?;
}
}
for tx in first_peer.get_mempool().iter_txs().iter() {
for tx in first_peer
.get_mempool()
.iter_txs()
.map_err(CompactFiltersError::from)?
.iter()
{
self.process_tx(
database,
tx,
@@ -435,11 +445,14 @@ impl Blockchain for CompactFiltersBlockchain {
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.peers[0]
.get_mempool()
.get_tx(&Inventory::Transaction(*txid)))
.get_tx(&Inventory::Transaction(*txid))
.map_err(CompactFiltersError::from)?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
self.peers[0].broadcast_tx(tx.clone())?;
self.peers[0]
.broadcast_tx(tx.clone())
.map_err(CompactFiltersError::from)?;
Ok(())
}
@@ -487,7 +500,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.peers
.iter()
.map(|peer_conf| match &peer_conf.socks5 {
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network)
.map_err(CompactFiltersError::from),
Some(proxy) => Peer::connect_proxy(
peer_conf.address.as_str(),
proxy,
@@ -497,7 +511,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.map(|(a, b)| (a.as_str(), b.as_str())),
Arc::clone(&mempool),
config.network,
),
)
.map_err(CompactFiltersError::from),
})
.collect::<Result<_, _>>()?;
@@ -546,6 +561,9 @@ pub enum CompactFiltersError {
/// Wrapper for [`crate::error::Error`]
Global(Box<crate::error::Error>),
/// Internal Peer Error
Peer(PeerError),
}
impl fmt::Display for CompactFiltersError {
@@ -560,6 +578,7 @@ impl_error!(rocksdb::Error, Db, CompactFiltersError);
impl_error!(std::io::Error, Io, CompactFiltersError);
impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError);
impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
impl_error!(PeerError, Peer, CompactFiltersError);
impl From<crate::error::Error> for CompactFiltersError {
fn from(err: crate::error::Error) -> Self {

View File

@@ -10,11 +10,15 @@
// licenses.
use std::collections::HashMap;
use std::net::{TcpStream, ToSocketAddrs};
use std::fmt;
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use socks::{Socks5Stream, ToTargetAddr};
use rand::{thread_rng, Rng};
@@ -30,8 +34,6 @@ use bitcoin::network::stream_reader::StreamReader;
use bitcoin::network::Address;
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
use super::CompactFiltersError;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
pub(crate) const TIMEOUT_SECS: u64 = 30;
@@ -65,17 +67,18 @@ impl Mempool {
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
let mut guard = self.0.write().unwrap();
pub fn add_tx(&self, tx: Transaction) -> Result<(), PeerError> {
let mut guard = self.0.write()?;
guard.wtxids.insert(tx.wtxid(), tx.txid());
guard.txs.insert(tx.txid(), tx);
Ok(())
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
pub fn get_tx(&self, inventory: &Inventory) -> Result<Option<Transaction>, PeerError> {
let identifer = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return Ok(None),
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
@@ -85,27 +88,34 @@ impl Mempool {
inv_type,
hash
);
return None;
return Ok(None);
}
};
let txid = match identifer {
TxIdentifier::Txid(txid) => Some(txid),
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
TxIdentifier::Wtxid(wtxid) => self.0.read()?.wtxids.get(&wtxid).cloned(),
};
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
.flatten()
let result = match txid {
Some(txid) => {
let read_lock = self.0.read()?;
read_lock.txs.get(&txid).cloned()
}
None => None,
};
Ok(result)
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
self.0.read().unwrap().txs.contains_key(txid)
pub fn has_tx(&self, txid: &Txid) -> Result<bool, PeerError> {
Ok(self.0.read()?.txs.contains_key(txid))
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
self.0.read().unwrap().txs.values().cloned().collect()
pub fn iter_txs(&self) -> Result<Vec<Transaction>, PeerError> {
Ok(self.0.read()?.txs.values().cloned().collect())
}
}
@@ -133,12 +143,31 @@ impl Peer {
address: A,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let stream = TcpStream::connect(address)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer over a plaintext TCP connection with a timeout
///
/// This function behaves exactly the same as `connect` except for two differences
/// 1) It assumes your ToSocketAddrs will resolve to a single address
/// 2) It lets you specify a connection timeout
pub fn connect_with_timeout<A: ToSocketAddrs>(
address: A,
timeout: Duration,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, PeerError> {
let socket_addr = address
.to_socket_addrs()?
.next()
.ok_or(PeerError::AddresseResolution)?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
/// as a tuple of `(username, password)`
///
@@ -150,7 +179,7 @@ impl Peer {
credentials: Option<(&str, &str)>,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let socks_stream = if let Some((username, password)) = credentials {
Socks5Stream::connect_with_password(proxy, target, username, password)?
} else {
@@ -165,12 +194,12 @@ impl Peer {
stream: TcpStream,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let writer = Arc::new(Mutex::new(stream.try_clone()?));
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
let connected = Arc::new(RwLock::new(true));
let mut locked_writer = writer.lock().unwrap();
let mut locked_writer = writer.lock()?;
let reader_thread_responses = Arc::clone(&responses);
let reader_thread_writer = Arc::clone(&writer);
@@ -185,6 +214,7 @@ impl Peer {
reader_thread_mempool,
reader_thread_connected,
)
.unwrap()
});
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
@@ -209,18 +239,20 @@ impl Peer {
0,
)),
)?;
let version = if let NetworkMessage::Version(version) =
Self::_recv(&responses, "version", None).unwrap()
{
version
} else {
return Err(CompactFiltersError::InvalidResponse);
let version = match Self::_recv(&responses, "version", Some(Duration::from_secs(1)))? {
Some(NetworkMessage::Version(version)) => version,
_ => {
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
};
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
if let Some(NetworkMessage::Verack) =
Self::_recv(&responses, "verack", Some(Duration::from_secs(1)))?
{
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
} else {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
std::mem::drop(locked_writer);
@@ -236,19 +268,26 @@ impl Peer {
})
}
/// Close the peer connection
// Consume Self
pub fn close(self) -> Result<(), PeerError> {
let locked_writer = self.writer.lock()?;
Ok((*locked_writer).shutdown(std::net::Shutdown::Both)?)
}
/// Get the socket address of the remote peer
pub fn get_address(&self) -> Result<SocketAddr, PeerError> {
let locked_writer = self.writer.lock()?;
Ok(locked_writer.peer_addr()?)
}
/// Send a Bitcoin network message
fn _send(
writer: &mut TcpStream,
magic: u32,
payload: NetworkMessage,
) -> Result<(), CompactFiltersError> {
fn _send(writer: &mut TcpStream, magic: u32, payload: NetworkMessage) -> Result<(), PeerError> {
log::trace!("==> {:?}", payload);
let raw_message = RawNetworkMessage { magic, payload };
raw_message
.consensus_encode(writer)
.map_err(|_| CompactFiltersError::DataCorruption)?;
raw_message.consensus_encode(writer)?;
Ok(())
}
@@ -258,30 +297,30 @@ impl Peer {
responses: &Arc<RwLock<ResponsesMap>>,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Option<NetworkMessage> {
) -> Result<Option<NetworkMessage>, PeerError> {
let message_resp = {
let mut lock = responses.write().unwrap();
let mut lock = responses.write()?;
let message_resp = lock.entry(wait_for).or_default();
Arc::clone(message_resp)
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
while messages.is_empty() {
match timeout {
None => messages = cvar.wait(messages).unwrap(),
None => messages = cvar.wait(messages)?,
Some(t) => {
let result = cvar.wait_timeout(messages, t).unwrap();
let result = cvar.wait_timeout(messages, t)?;
if result.1.timed_out() {
return None;
return Ok(None);
}
messages = result.0;
}
}
}
messages.pop()
Ok(messages.pop())
}
/// Return the [`VersionMessage`] sent by the peer
@@ -300,8 +339,8 @@ impl Peer {
}
/// Return whether or not the peer is still connected
pub fn is_connected(&self) -> bool {
*self.connected.read().unwrap()
pub fn is_connected(&self) -> Result<bool, PeerError> {
Ok(*self.connected.read()?)
}
/// Internal function called once the `reader_thread` is spawned
@@ -312,14 +351,14 @@ impl Peer {
reader_thread_writer: Arc<Mutex<TcpStream>>,
reader_thread_mempool: Arc<Mempool>,
reader_thread_connected: Arc<RwLock<bool>>,
) {
) -> Result<(), PeerError> {
macro_rules! check_disconnect {
($call:expr) => {
match $call {
Ok(good) => good,
Err(e) => {
log::debug!("Error {:?}", e);
*reader_thread_connected.write().unwrap() = false;
*reader_thread_connected.write()? = false;
break;
}
@@ -328,7 +367,7 @@ impl Peer {
}
let mut reader = StreamReader::new(connection, None);
loop {
while *reader_thread_connected.read()? {
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
let in_message = if raw_message.magic != network.magic() {
@@ -342,7 +381,7 @@ impl Peer {
match in_message {
NetworkMessage::Ping(nonce) => {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Pong(nonce),
));
@@ -353,19 +392,21 @@ impl Peer {
NetworkMessage::GetData(ref inv) => {
let (found, not_found): (Vec<_>, Vec<_>) = inv
.iter()
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
.map(|item| (*item, reader_thread_mempool.get_tx(item).unwrap()))
.partition(|(_, d)| d.is_some());
for (_, found_tx) in found {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Tx(found_tx.unwrap()),
NetworkMessage::Tx(found_tx.ok_or_else(|| PeerError::Generic(
"Got None while expecting Transaction".to_string()
))?),
));
}
if !not_found.is_empty() {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::NotFound(
not_found.into_iter().map(|(i, _)| i).collect(),
@@ -377,21 +418,23 @@ impl Peer {
}
let message_resp = {
let mut lock = reader_thread_responses.write().unwrap();
let mut lock = reader_thread_responses.write()?;
let message_resp = lock.entry(in_message.cmd()).or_default();
Arc::clone(message_resp)
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
messages.push(in_message);
cvar.notify_all();
}
Ok(())
}
/// Send a raw Bitcoin message to the peer
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
let mut writer = self.writer.lock().unwrap();
pub fn send(&self, payload: NetworkMessage) -> Result<(), PeerError> {
let mut writer = self.writer.lock()?;
Self::_send(&mut writer, self.network.magic(), payload)
}
@@ -400,30 +443,27 @@ impl Peer {
&self,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
Ok(Self::_recv(&self.responses, wait_for, timeout))
) -> Result<Option<NetworkMessage>, PeerError> {
Self::_recv(&self.responses, wait_for, timeout)
}
}
pub trait CompactFiltersPeer {
fn get_cf_checkpt(
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError>;
fn get_cf_checkpt(&self, filter_type: u8, stop_hash: BlockHash)
-> Result<CFCheckpt, PeerError>;
fn get_cf_headers(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError>;
) -> Result<CFHeaders, PeerError>;
fn get_cf_filters(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
) -> Result<(), PeerError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError>;
}
impl CompactFiltersPeer for Peer {
@@ -431,22 +471,20 @@ impl CompactFiltersPeer for Peer {
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError> {
) -> Result<CFCheckpt, PeerError> {
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
filter_type,
stop_hash,
}))?;
let response = self
.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFCheckpt(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFCheckpt(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
@@ -457,35 +495,31 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError> {
) -> Result<CFHeaders, PeerError> {
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
filter_type,
start_height,
stop_hash,
}))?;
let response = self
.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFHeaders(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFHeaders(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
}
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
let response = self
.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError> {
let response = self.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFilter(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFilter(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
Ok(response)
@@ -496,7 +530,7 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError> {
) -> Result<(), PeerError> {
self.send(NetworkMessage::GetCFilters(GetCFilters {
filter_type,
start_height,
@@ -508,13 +542,13 @@ impl CompactFiltersPeer for Peer {
}
pub trait InvPeer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError>;
fn ask_for_mempool(&self) -> Result<(), PeerError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError>;
}
impl InvPeer for Peer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError> {
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
block_hash,
)]))?;
@@ -522,51 +556,126 @@ impl InvPeer for Peer {
match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
None => Ok(None),
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
_ => Err(CompactFiltersError::InvalidResponse),
_ => Err(PeerError::InvalidResponse(self.get_address()?)),
}
}
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
fn ask_for_mempool(&self) -> Result<(), PeerError> {
if !self.version.services.has(ServiceFlags::BLOOM) {
return Err(CompactFiltersError::PeerBloomDisabled);
return Err(PeerError::PeerBloomDisabled(self.get_address()?));
}
self.send(NetworkMessage::MemPool)?;
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
None => return Ok(()), // empty mempool
Some(NetworkMessage::Inv(inv)) => inv,
_ => return Err(CompactFiltersError::InvalidResponse),
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
let getdata = inv
.iter()
.cloned()
.filter(
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid).unwrap()),
)
.collect::<Vec<_>>();
let num_txs = getdata.len();
self.send(NetworkMessage::GetData(getdata))?;
for _ in 0..num_txs {
let tx = self
.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let tx = self.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let tx = match tx {
NetworkMessage::Tx(tx) => tx,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::Tx(tx)) => tx,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
self.mempool.add_tx(tx);
self.mempool.add_tx(tx)?;
}
Ok(())
}
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
self.mempool.add_tx(tx.clone());
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError> {
self.mempool.add_tx(tx.clone())?;
self.send(NetworkMessage::Tx(tx))?;
Ok(())
}
}
/// Peer Errors
#[derive(Debug)]
pub enum PeerError {
/// Internal I/O error
Io(std::io::Error),
/// Internal system time error
Time(std::time::SystemTimeError),
/// A peer sent an invalid or unexpected response
InvalidResponse(SocketAddr),
/// Peer had bloom filter disabled
PeerBloomDisabled(SocketAddr),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedout,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Mempool Mutex poisoned
MempoolPoisoned,
/// Network address resolution Error
AddresseResolution,
/// Generic Errors
Generic(String),
}
impl std::fmt::Display for PeerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for PeerError {}
impl_error!(std::io::Error, Io, PeerError);
impl_error!(std::time::SystemTimeError, Time, PeerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
PeerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
PeerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
PeerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for PeerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
PeerError::MutexTimedout
} else {
PeerError::MutexPoisoned
}
}
}

View File

@@ -0,0 +1,578 @@
use super::address_manager::{AddressManager, AddressManagerError, DiscoveryProgress};
use super::peer::{Mempool, Peer, PeerError, TIMEOUT_SECS};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time;
use bitcoin::network::constants::{Network, ServiceFlags};
use bitcoin::network::message::NetworkMessage;
use std::path::PathBuf;
use std::collections::BTreeMap;
// Peer Manager Configuration constants
const MIN_CBF_PEERS: usize = 2;
const MIN_TOTAL_PEERS: usize = 5;
const MIN_CRAWLER_THREADS: usize = 20;
const BAN_SCORE_THRESHOLD: usize = 100;
const RECEIVE_TIMEOUT: time::Duration = time::Duration::from_secs(TIMEOUT_SECS);
#[allow(dead_code)]
/// An Error structure describing Peer Management errors
#[derive(Debug)]
pub enum PeerManagerError {
// Internal Peer Error
Peer(PeerError),
// Internal AddressManager Error
AddrsManager(AddressManagerError),
// Os String Error
OsString(std::ffi::OsString),
// Peer not found in directory
PeerNotFound,
// Generic Internal Error
Generic(String),
}
impl_error!(PeerError, Peer, PeerManagerError);
impl_error!(AddressManagerError, AddrsManager, PeerManagerError);
impl_error!(std::ffi::OsString, OsString, PeerManagerError);
/// Peer Data stored in the manager's directory
#[derive(Debug)]
struct PeerData {
peer: Peer,
is_cbf: bool,
ban_score: usize,
}
#[allow(dead_code)]
/// A Directory structure to hold live Peers
/// All peers in the directory have live ongoing connection
/// Banning a peer removes it from the directory
#[derive(Default, Debug)]
struct PeerDirectory {
peers: BTreeMap<SocketAddr, PeerData>,
}
#[allow(dead_code)]
impl PeerDirectory {
fn new() -> Self {
Self::default()
}
fn get_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let cbf_peers = self
.peers
.iter()
.filter(|(_, peer)| peer.is_cbf)
.map(|(_, peer)| peer)
.collect::<Vec<&PeerData>>();
match cbf_peers.len() {
0 => None,
_ => Some(cbf_peers),
}
}
fn get_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let cbf_addrseses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match cbf_addrseses.len() {
0 => None,
_ => Some(cbf_addrseses),
}
}
fn get_non_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let non_cbf_peers = self
.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&PeerData>>();
match non_cbf_peers.len() {
0 => None,
_ => Some(non_cbf_peers),
}
}
fn get_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if !peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
fn get_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_non_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| peerdata.is_cbf)
.count()
}
fn get_non_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.count()
}
fn insert_peer(&mut self, peerdata: PeerData) -> Result<(), PeerManagerError> {
let addrs = peerdata.peer.get_address()?;
self.peers.entry(addrs).or_insert(peerdata);
Ok(())
}
fn remove_peer(&mut self, addrs: &SocketAddr) -> Option<PeerData> {
self.peers.remove(addrs)
}
fn get_peer_banscore(&self, addrs: &SocketAddr) -> Option<usize> {
self.peers.get(addrs).map(|peerdata| peerdata.ban_score)
}
fn get_peerdata_mut(&mut self, address: &SocketAddr) -> Option<&mut PeerData> {
self.peers.get_mut(address)
}
fn get_peerdata(&self, address: &SocketAddr) -> Option<&PeerData> {
self.peers.get(address)
}
fn is_cbf(&self, addrs: &SocketAddr) -> Option<bool> {
if let Some(peer) = self.peers.get(addrs) {
match peer.is_cbf {
true => Some(true),
false => Some(false),
}
} else {
None
}
}
}
#[allow(dead_code)]
pub struct PeerManager<P: DiscoveryProgress> {
addrs_mngr: AddressManager<P>,
directory: PeerDirectory,
mempool: Arc<Mempool>,
min_cbf: usize,
min_total: usize,
network: Network,
}
#[allow(dead_code)]
impl<P: DiscoveryProgress> PeerManager<P> {
pub fn init(
network: Network,
cache_dir: &str,
crawler_threads: Option<usize>,
progress: P,
cbf_peers: Option<usize>,
total_peers: Option<usize>,
) -> Result<Self, PeerManagerError> {
let mut cache_filename = PathBuf::from(cache_dir);
cache_filename.push("addr_cache");
// Fetch minimum peer requirements, either by user input, or via default
let min_cbf = cbf_peers.unwrap_or(MIN_CBF_PEERS);
let min_total = total_peers.unwrap_or(MIN_TOTAL_PEERS);
let cbf_buff = min_cbf * 2;
let non_cbf_buff = (min_total - min_cbf) * 2;
// Create internal items
let addrs_mngr = AddressManager::new(
network,
cache_filename.into_os_string().into_string()?,
crawler_threads.unwrap_or(MIN_CRAWLER_THREADS),
Some(cbf_buff),
Some(non_cbf_buff),
progress,
)?;
let mempool = Arc::new(Mempool::new());
let peer_dir = PeerDirectory::new();
// Create self and update
let mut manager = Self {
addrs_mngr,
directory: peer_dir,
mempool,
min_cbf,
min_total,
network,
};
manager.update_directory()?;
Ok(manager)
}
fn update_directory(&mut self) -> Result<(), PeerManagerError> {
while self.directory.get_cbf_count() < self.min_cbf
|| self.directory.get_non_cbf_count() < (self.min_total - self.min_cbf)
{
// First connect with cbf peers, then with non_cbf
let cbf_fetch = self.directory.get_cbf_count() < self.min_cbf;
// Try to get an address
// if not present start crawlers
let target_addrs = match cbf_fetch {
true => {
if let Some(addrs) = self.addrs_mngr.get_new_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
false => {
if let Some(addrs) = self.addrs_mngr.get_new_non_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
};
if let Ok(peer) = Peer::connect(target_addrs, Arc::clone(&self.mempool), self.network) {
let address = peer.get_address()?;
assert_eq!(address, target_addrs);
let is_cbf = peer
.get_version()
.services
.has(ServiceFlags::COMPACT_FILTERS);
let peerdata = PeerData {
peer,
is_cbf,
ban_score: 0,
};
self.directory.insert_peer(peerdata)?;
} else {
continue;
}
}
Ok(())
}
pub fn set_banscore(
&mut self,
increase_by: usize,
address: &SocketAddr,
) -> Result<(), PeerManagerError> {
let mut current_score = if let Some(peer) = self.directory.get_peerdata_mut(address) {
peer.ban_score
} else {
return Err(PeerManagerError::PeerNotFound);
};
current_score += increase_by;
let mut banned = false;
if current_score >= BAN_SCORE_THRESHOLD {
match (
self.directory.is_cbf(address),
self.directory.remove_peer(address),
) {
(Some(true), Some(_)) => {
self.addrs_mngr.ban_peer(address, true)?;
banned = true;
}
(Some(false), Some(_)) => {
self.addrs_mngr.ban_peer(address, false)?;
banned = true;
}
_ => {
return Err(PeerManagerError::Generic(
"data inconsistency in directory, should not happen".to_string(),
))
}
}
}
if banned {
self.update_directory()?;
}
Ok(())
}
pub fn send_to(
&self,
address: &SocketAddr,
message: NetworkMessage,
) -> Result<(), PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
peerdata.peer.send(message)?;
Ok(())
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn receive_from(
&self,
address: &SocketAddr,
wait_for: &'static str,
) -> Result<Option<NetworkMessage>, PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
if let Some(response) = peerdata.peer.recv(wait_for, Some(RECEIVE_TIMEOUT))? {
Ok(Some(response))
} else {
Ok(None)
}
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn connected_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_cbf_addresses()
}
pub fn connected_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_non_cbf_addresses()
}
pub fn known_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_cbfs()
}
pub fn known_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_non_cbfs()
}
pub fn previously_tried_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_previously_tried()
}
}
#[cfg(test)]
mod test {
use super::super::LogDiscoveryProgress;
use super::*;
#[test]
#[ignore]
fn test_ban() {
let mut manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let connected_cbfs = manager.connected_cbf_addresses().unwrap();
let connected_non_cbfs = manager.connected_non_cbf_addresses().unwrap();
println!("Currently Connected CBFs: {:#?}", connected_cbfs);
assert_eq!(connected_cbfs.len(), 2);
assert_eq!(connected_non_cbfs.len(), 3);
let to_banned = &connected_cbfs[0];
println!("Banning address : {}", to_banned);
manager.set_banscore(100, to_banned).unwrap();
let newly_connected = manager.connected_cbf_addresses().unwrap();
println!("Newly Connected CBFs: {:#?}", newly_connected);
assert_eq!(newly_connected.len(), 2);
assert_ne!(newly_connected, connected_cbfs);
}
#[test]
#[ignore]
fn test_send_recv() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let target_address = manager.connected_cbf_addresses().unwrap()[0];
let ping = NetworkMessage::Ping(30);
println!("Asking peer {}", target_address);
manager.send_to(&target_address, ping).unwrap();
let response = manager
.receive_from(&target_address, "pong")
.unwrap()
.unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
let value = value.unwrap();
println!("Got value {:#?}", value);
}
#[test]
#[ignore]
fn test_connect_all() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let cbf_pings = vec![100u64; manager.min_cbf];
let non_cbf_pings = vec![200u64; manager.min_total - manager.min_cbf];
let cbf_peers = manager.connected_cbf_addresses().unwrap();
let non_cbf_peers = manager.connected_non_cbf_addresses().unwrap();
let sent_cbf: Vec<bool> = cbf_pings
.iter()
.zip(cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_cbf, vec![true; manager.min_cbf]);
println!("Sent pings to cbf peers");
let sent_noncbf: Vec<bool> = non_cbf_pings
.iter()
.zip(non_cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_noncbf, vec![true; manager.min_total - manager.min_cbf]);
println!("Sent pings to non cbf peers");
let cbf_received: Vec<u64> = cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
let non_cbf_received: Vec<u64> = non_cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
assert_eq!(cbf_pings, cbf_received);
assert_eq!(non_cbf_pings, non_cbf_received);
}
}

View File

@@ -760,7 +760,7 @@ impl CfStore {
let cf_headers: Vec<FilterHeader> = filter_hashes
.into_iter()
.scan(checkpoint, |prev_header, filter_hash| {
let filter_header = filter_hash.filter_header(prev_header);
let filter_header = filter_hash.filter_header(&prev_header);
*prev_header = filter_header;
Some(filter_header)
@@ -801,7 +801,7 @@ impl CfStore {
.zip(headers.into_iter())
.scan(checkpoint, |prev_header, ((_, filter_content), header)| {
let filter = BlockFilter::new(&filter_content);
if header != filter.filter_header(prev_header) {
if header != filter.filter_header(&prev_header) {
return Some(Err(CompactFiltersError::InvalidFilter));
}
*prev_header = header;

View File

@@ -205,7 +205,7 @@ impl CfSync {
let block_hash = self.headers_store.get_block_hash(height)?.unwrap();
// TODO: also download random blocks?
if process(&block_hash, &BlockFilter::new(filter))? {
if process(&block_hash, &BlockFilter::new(&filter))? {
log::debug!("Downloading block {}", block_hash);
let block = peer

View File

@@ -9,30 +9,41 @@
// You may not use this file except in accordance with one or both of these
// licenses.
//! Esplora by way of `reqwest` HTTP client.
//! Esplora
//!
//! This module defines a [`Blockchain`] struct that can query an Esplora backend
//! populate the wallet's [database](crate::database::Database) by
//!
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::esplora::EsploraBlockchain;
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", None, 20);
//! # Ok::<(), bdk::Error>(())
//! ```
use std::collections::{HashMap, HashSet};
use std::fmt;
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::consensus::{self, deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use reqwest::{Client, StatusCode};
use serde::Deserialize;
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
use ::reqwest::{Client, StatusCode};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::wallet::utils::ChunksIterator;
use crate::FeeRate;
use super::*;
use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
#[derive(Debug)]
@@ -64,23 +75,17 @@ impl std::convert::From<UrlClient> for EsploraBlockchain {
}
impl EsploraBlockchain {
/// Create a new instance of the client from a base URL and `stop_gap`.
pub fn new(base_url: &str, stop_gap: usize) -> Self {
/// Create a new instance of the client from a base URL
pub fn new(base_url: &str, concurrency: Option<u8>, stop_gap: usize) -> Self {
EsploraBlockchain {
url_client: UrlClient {
url: base_url.to_string(),
client: Client::new(),
concurrency: DEFAULT_CONCURRENT_REQUESTS,
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENT_REQUESTS),
},
stop_gap,
}
}
/// Set the concurrency to use when doing batch queries against the Esplora instance.
pub fn with_concurrency(mut self, concurrency: u8) -> Self {
self.url_client.concurrency = concurrency;
self
}
}
#[maybe_async]
@@ -119,7 +124,19 @@ impl Blockchain for EsploraBlockchain {
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
let estimates = await_or_block!(self.url_client._get_fee_estimates())?;
super::into_fee_rate(target, estimates)
let fee_val = estimates
.into_iter()
.map(|(k, v)| Ok::<_, std::num::ParseIntError>((k.parse::<usize>()?, v)))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| Error::Generic(e.to_string()))?
.into_iter()
.take_while(|(k, _)| k <= &target)
.map(|(_, v)| v)
.last()
.unwrap_or(1.0);
Ok(FeeRate::from_sat_per_vb(fee_val as f32))
}
}
@@ -281,51 +298,74 @@ impl ElectrumLikeSync for UrlClient {
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for script in chunk {
futs.push(self._script_get_history(script));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(scripts.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for script in chunk {
futs.push(self._script_get_history(&script));
}
let partial_results: Vec<Vec<ElsGetHistoryRes>> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<Vec<ElsGetHistoryRes>> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for txid in chunk {
futs.push(self._get_tx_no_opt(txid));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(txids.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for txid in chunk {
futs.push(self._get_tx_no_opt(&txid));
}
let partial_results: Vec<Transaction> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<Transaction> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for height in chunk {
futs.push(self._get_header(height));
let future = async {
let mut results = vec![];
for chunk in ChunksIterator::new(heights.into_iter(), self.concurrency as usize) {
let mut futs = FuturesOrdered::new();
for height in chunk {
futs.push(self._get_header(height));
}
let partial_results: Vec<BlockHeader> = futs.try_collect().await?;
results.extend(partial_results);
}
let partial_results: Vec<BlockHeader> = await_or_block!(futs.try_collect())?;
results.extend(partial_results);
}
Ok(await_or_block!(stream::iter(results).collect()))
Ok(stream::iter(results).collect().await)
};
await_or_block!(future)
}
}
#[derive(Deserialize)]
struct EsploraGetHistoryStatus {
block_height: Option<usize>,
}
#[derive(Deserialize)]
struct EsploraGetHistory {
txid: Txid,
status: EsploraGetHistoryStatus,
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
@@ -333,20 +373,9 @@ pub struct EsploraBlockchainConfig {
///
/// eg. `https://blockstream.info/api/`
pub base_url: String,
/// Optional URL of the proxy to use to make requests to the Esplora server
///
/// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
///
/// Note that the format of this value and the supported protocols change slightly between the
/// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
/// details check with the documentation of the two crates. Both of them are compiled with
/// the `socks` feature enabled.
///
/// The proxy is ignored when targeting `wasm32`.
pub proxy: Option<String>,
/// Number of parallel requests sent to the esplora service (default: 4)
pub concurrency: Option<u8>,
/// Stop searching addresses for transactions after finding an unused gap of this length.
/// Stop searching addresses for transactions after finding an unused gap of this length
pub stop_gap: usize,
}
@@ -354,19 +383,51 @@ impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let map_e = |e: reqwest::Error| Error::Esplora(Box::new(e.into()));
let mut blockchain = EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap);
if let Some(concurrency) = config.concurrency {
blockchain.url_client.concurrency = concurrency;
}
#[cfg(not(target_arch = "wasm32"))]
if let Some(proxy) = &config.proxy {
blockchain.url_client.client = Client::builder()
.proxy(reqwest::Proxy::all(proxy).map_err(map_e)?)
.build()
.map_err(map_e)?;
}
Ok(blockchain)
Ok(EsploraBlockchain::new(
config.base_url.as_str(),
config.concurrency,
config.stop_gap,
))
}
}
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
/// Error with the HTTP call
Reqwest(reqwest::Error),
/// Invalid number returned
Parsing(std::num::ParseIntError),
/// Invalid Bitcoin data returned
BitcoinEncoding(bitcoin::consensus::encode::Error),
/// Invalid Hex data returned
Hex(bitcoin::hashes::hex::Error),
/// Transaction not found
TransactionNotFound(Txid),
/// Header height not found
HeaderHeightNotFound(u32),
/// Header hash not found
HeaderHashNotFound(BlockHash),
}
impl fmt::Display for EsploraError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for EsploraError {}
impl_error!(reqwest::Error, Reqwest, EsploraError);
impl_error!(std::num::ParseIntError, Parsing, EsploraError);
impl_error!(consensus::encode::Error, BitcoinEncoding, EsploraError);
impl_error!(bitcoin::hashes::hex::Error, Hex, EsploraError);
#[cfg(test)]
#[cfg(feature = "test-esplora")]
crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> EsploraBlockchain {
EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), None, 20)
}
}

View File

@@ -1,129 +0,0 @@
//! Esplora
//!
//! This module defines a [`EsploraBlockchain`] struct that can query an Esplora
//! backend populate the wallet's [database](crate::database::Database) by:
//!
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::esplora::EsploraBlockchain;
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", 20);
//! # Ok::<(), bdk::Error>(())
//! ```
//!
//! Esplora blockchain can use either `ureq` or `reqwest` for the HTTP client
//! depending on your needs (blocking or async respectively).
//!
//! Please note, to configure the Esplora HTTP client correctly use one of:
//! Blocking: --features='esplora,ureq'
//! Async: --features='async-interface,esplora,reqwest' --no-default-features
use std::collections::HashMap;
use std::fmt;
use std::io;
use serde::Deserialize;
use bitcoin::consensus;
use bitcoin::{BlockHash, Txid};
use crate::error::Error;
use crate::FeeRate;
#[cfg(feature = "reqwest")]
mod reqwest;
#[cfg(feature = "reqwest")]
pub use self::reqwest::*;
#[cfg(feature = "ureq")]
mod ureq;
#[cfg(feature = "ureq")]
pub use self::ureq::*;
fn into_fee_rate(target: usize, estimates: HashMap<String, f64>) -> Result<FeeRate, Error> {
let fee_val = estimates
.into_iter()
.map(|(k, v)| Ok::<_, std::num::ParseIntError>((k.parse::<usize>()?, v)))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| Error::Generic(e.to_string()))?
.into_iter()
.take_while(|(k, _)| k <= &target)
.map(|(_, v)| v)
.last()
.unwrap_or(1.0);
Ok(FeeRate::from_sat_per_vb(fee_val as f32))
}
/// Data type used when fetching transaction history from Esplora.
#[derive(Deserialize)]
pub struct EsploraGetHistory {
txid: Txid,
status: EsploraGetHistoryStatus,
}
#[derive(Deserialize)]
struct EsploraGetHistoryStatus {
block_height: Option<usize>,
}
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
/// Error during ureq HTTP request
#[cfg(feature = "ureq")]
Ureq(::ureq::Error),
/// Transport error during the ureq HTTP call
#[cfg(feature = "ureq")]
UreqTransport(::ureq::Transport),
/// Error during reqwest HTTP request
#[cfg(feature = "reqwest")]
Reqwest(::reqwest::Error),
/// HTTP response error
HttpResponse(u16),
/// IO error during ureq response read
Io(io::Error),
/// No header found in ureq response
NoHeader,
/// Invalid number returned
Parsing(std::num::ParseIntError),
/// Invalid Bitcoin data returned
BitcoinEncoding(bitcoin::consensus::encode::Error),
/// Invalid Hex data returned
Hex(bitcoin::hashes::hex::Error),
/// Transaction not found
TransactionNotFound(Txid),
/// Header height not found
HeaderHeightNotFound(u32),
/// Header hash not found
HeaderHashNotFound(BlockHash),
}
impl fmt::Display for EsploraError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for EsploraError {}
#[cfg(feature = "ureq")]
impl_error!(::ureq::Error, Ureq, EsploraError);
#[cfg(feature = "ureq")]
impl_error!(::ureq::Transport, UreqTransport, EsploraError);
#[cfg(feature = "reqwest")]
impl_error!(::reqwest::Error, Reqwest, EsploraError);
impl_error!(io::Error, Io, EsploraError);
impl_error!(std::num::ParseIntError, Parsing, EsploraError);
impl_error!(consensus::encode::Error, BitcoinEncoding, EsploraError);
impl_error!(bitcoin::hashes::hex::Error, Hex, EsploraError);
#[cfg(test)]
#[cfg(feature = "test-esplora")]
crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> EsploraBlockchain {
EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), 20)
}
}

View File

@@ -1,398 +0,0 @@
// Bitcoin Dev Kit
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
//! Esplora by way of `ureq` HTTP client.
use std::collections::{HashMap, HashSet};
use std::io;
use std::io::Read;
use std::time::Duration;
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use ureq::{Agent, Proxy, Response};
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
use crate::blockchain::esplora::{EsploraError, EsploraGetHistory};
use crate::blockchain::utils::{ElectrumLikeSync, ElsGetHistoryRes};
use crate::blockchain::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
#[derive(Debug)]
struct UrlClient {
url: String,
agent: Agent,
}
/// Structure that implements the logic to sync with Esplora
///
/// ## Example
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
#[derive(Debug)]
pub struct EsploraBlockchain {
url_client: UrlClient,
stop_gap: usize,
}
impl std::convert::From<UrlClient> for EsploraBlockchain {
fn from(url_client: UrlClient) -> Self {
EsploraBlockchain {
url_client,
stop_gap: 20,
}
}
}
impl EsploraBlockchain {
/// Create a new instance of the client from a base URL and the `stop_gap`.
pub fn new(base_url: &str, stop_gap: usize) -> Self {
EsploraBlockchain {
url_client: UrlClient {
url: base_url.to_string(),
agent: Agent::new(),
},
stop_gap,
}
}
/// Set the inner `ureq` agent.
pub fn with_agent(mut self, agent: Agent) -> Self {
self.url_client.agent = agent;
self
}
}
impl Blockchain for EsploraBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
vec![
Capability::FullHistory,
Capability::GetAnyTx,
Capability::AccurateFees,
]
.into_iter()
.collect()
}
fn setup<D: BatchDatabase, P: Progress>(
&self,
database: &mut D,
progress_update: P,
) -> Result<(), Error> {
self.url_client
.electrum_like_setup(self.stop_gap, database, progress_update)
}
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.url_client._get_tx(txid)?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
let _txid = self.url_client._broadcast(tx)?;
Ok(())
}
fn get_height(&self) -> Result<u32, Error> {
Ok(self.url_client._get_height()?)
}
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
let estimates = self.url_client._get_fee_estimates()?;
super::into_fee_rate(target, estimates)
}
}
impl UrlClient {
fn script_to_scripthash(script: &Script) -> String {
sha256::Hash::hash(script.as_bytes()).into_inner().to_hex()
}
fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> {
let resp = self
.agent
.get(&format!("{}/tx/{}/raw", self.url, txid))
.call();
match resp {
Ok(resp) => Ok(Some(deserialize(&into_bytes(resp)?)?)),
Err(ureq::Error::Status(code, _)) => {
if is_status_not_found(code) {
return Ok(None);
}
Err(EsploraError::HttpResponse(code))
}
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, EsploraError> {
match self._get_tx(txid) {
Ok(Some(tx)) => Ok(tx),
Ok(None) => Err(EsploraError::TransactionNotFound(*txid)),
Err(e) => Err(e),
}
}
fn _get_header(&self, block_height: u32) -> Result<BlockHeader, EsploraError> {
let resp = self
.agent
.get(&format!("{}/block-height/{}", self.url, block_height))
.call();
let bytes = match resp {
Ok(resp) => Ok(into_bytes(resp)?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
let hash = std::str::from_utf8(&bytes)
.map_err(|_| EsploraError::HeaderHeightNotFound(block_height))?;
let resp = self
.agent
.get(&format!("{}/block/{}/header", self.url, hash))
.call();
match resp {
Ok(resp) => Ok(deserialize(&Vec::from_hex(&resp.into_string()?)?)?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _broadcast(&self, transaction: &Transaction) -> Result<(), EsploraError> {
let resp = self
.agent
.post(&format!("{}/tx", self.url))
.send_string(&serialize(transaction).to_hex());
match resp {
Ok(_) => Ok(()), // We do not return the txid?
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _get_height(&self) -> Result<u32, EsploraError> {
let resp = self
.agent
.get(&format!("{}/blocks/tip/height", self.url))
.call();
match resp {
Ok(resp) => Ok(resp.into_string()?.parse()?),
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}
}
fn _script_get_history(&self, script: &Script) -> Result<Vec<ElsGetHistoryRes>, EsploraError> {
let mut result = Vec::new();
let scripthash = Self::script_to_scripthash(script);
// Add the unconfirmed transactions first
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/mempool",
self.url, scripthash
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
debug!(
"Found {} mempool txs for {} - {:?}",
result.len(),
scripthash,
script
);
// Then go through all the pages of confirmed transactions
let mut last_txid = String::new();
loop {
let resp = self
.agent
.get(&format!(
"{}/scripthash/{}/txs/chain/{}",
self.url, scripthash, last_txid
))
.call();
let v = match resp {
Ok(resp) => {
let v: Vec<EsploraGetHistory> = resp.into_json()?;
Ok(v)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
let len = v.len();
if let Some(elem) = v.last() {
last_txid = elem.txid.to_hex();
}
debug!("... adding {} confirmed transactions", len);
result.extend(v.into_iter().map(|x| ElsGetHistoryRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0) as i32,
}));
if len < 25 {
break;
}
}
Ok(result)
}
fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
let resp = self
.agent
.get(&format!("{}/fee-estimates", self.url,))
.call();
let map = match resp {
Ok(resp) => {
let map: HashMap<String, f64> = resp.into_json()?;
Ok(map)
}
Err(ureq::Error::Status(code, _)) => Err(EsploraError::HttpResponse(code)),
Err(e) => Err(EsploraError::Ureq(e)),
}?;
Ok(map)
}
}
fn is_status_not_found(status: u16) -> bool {
status == 404
}
fn into_bytes(resp: Response) -> Result<Vec<u8>, io::Error> {
const BYTES_LIMIT: usize = 10 * 1_024 * 1_024;
let mut buf: Vec<u8> = vec![];
resp.into_reader()
.take((BYTES_LIMIT + 1) as u64)
.read_to_end(&mut buf)?;
if buf.len() > BYTES_LIMIT {
return Err(io::Error::new(
io::ErrorKind::Other,
"response too big for into_bytes",
));
}
Ok(buf)
}
impl ElectrumLikeSync for UrlClient {
fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ElsGetHistoryRes>>, Error> {
let mut results = vec![];
for script in scripts.into_iter() {
let v = self._script_get_history(script)?;
results.push(v);
}
Ok(results)
}
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
txids: I,
) -> Result<Vec<Transaction>, Error> {
let mut results = vec![];
for txid in txids.into_iter() {
let tx = self._get_tx_no_opt(txid)?;
results.push(tx);
}
Ok(results)
}
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let mut results = vec![];
for height in heights.into_iter() {
let header = self._get_header(height)?;
results.push(header);
}
Ok(results)
}
}
/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service eg. `https://blockstream.info/api/`
pub base_url: String,
/// Optional URL of the proxy to use to make requests to the Esplora server
///
/// The string should be formatted as: `<protocol>://<user>:<password>@host:<port>`.
///
/// Note that the format of this value and the supported protocols change slightly between the
/// sync version of esplora (using `ureq`) and the async version (using `reqwest`). For more
/// details check with the documentation of the two crates. Both of them are compiled with
/// the `socks` feature enabled.
///
/// The proxy is ignored when targeting `wasm32`.
pub proxy: Option<String>,
/// Socket read timeout.
pub timeout_read: u64,
/// Socket write timeout.
pub timeout_write: u64,
/// Stop searching addresses for transactions after finding an unused gap of this length.
pub stop_gap: usize,
}
impl ConfigurableBlockchain for EsploraBlockchain {
type Config = EsploraBlockchainConfig;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
let mut agent_builder = ureq::AgentBuilder::new()
.timeout_read(Duration::from_secs(config.timeout_read))
.timeout_write(Duration::from_secs(config.timeout_write));
if let Some(proxy) = &config.proxy {
agent_builder = agent_builder
.proxy(Proxy::new(proxy).map_err(|e| Error::Esplora(Box::new(e.into())))?);
}
Ok(
EsploraBlockchain::new(config.base_url.as_str(), config.stop_gap)
.with_agent(agent_builder.build()),
)
}
}

View File

@@ -30,19 +30,9 @@ use crate::FeeRate;
#[cfg(any(feature = "electrum", feature = "esplora"))]
pub(crate) mod utils;
#[cfg(any(
feature = "electrum",
feature = "esplora",
feature = "compact_filters",
feature = "rpc"
))]
#[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]
pub mod any;
#[cfg(any(
feature = "electrum",
feature = "esplora",
feature = "compact_filters",
feature = "rpc"
))]
#[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]
pub use any::{AnyBlockchain, AnyBlockchainConfig};
#[cfg(feature = "electrum")]
@@ -201,7 +191,7 @@ impl Progress for NoopProgress {
#[derive(Clone, Copy)]
pub struct LogProgress;
/// Create a new instance of [`LogProgress`]
/// Create a nwe instance of [`LogProgress`]
pub fn log_progress() -> LogProgress {
LogProgress
}

View File

@@ -18,12 +18,10 @@
//! ## Example
//!
//! ```no_run
//! # use bdk::blockchain::{RpcConfig, RpcBlockchain, ConfigurableBlockchain, rpc::Auth};
//! # use bdk::blockchain::{RpcConfig, RpcBlockchain, ConfigurableBlockchain};
//! let config = RpcConfig {
//! url: "127.0.0.1:18332".to_string(),
//! auth: Auth::Cookie {
//! file: "/home/user/.bitcoin/.cookie".into(),
//! },
//! auth: bitcoincore_rpc::Auth::CookieFile("/home/user/.bitcoin/.cookie".into()),
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! skip_blocks: None,
@@ -43,12 +41,10 @@ use bitcoincore_rpc::json::{
ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
};
use bitcoincore_rpc::jsonrpc::serde_json::Value;
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use bitcoincore_rpc::{Auth, Client, RpcApi};
use log::debug;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::str::FromStr;
/// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
@@ -68,7 +64,7 @@ pub struct RpcBlockchain {
}
/// RpcBlockchain configuration options
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[derive(Debug)]
pub struct RpcConfig {
/// The bitcoin node url
pub url: String,
@@ -82,39 +78,6 @@ pub struct RpcConfig {
pub skip_blocks: Option<u32>,
}
/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
/// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181)
#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
pub enum Auth {
/// None authentication
None,
/// Authentication with username and password, usually [Auth::Cookie] should be preferred
UserPass {
/// Username
username: String,
/// Password
password: String,
},
/// Authentication with a cookie file
Cookie {
/// Cookie file
file: PathBuf,
},
}
impl From<Auth> for RpcAuth {
fn from(auth: Auth) -> Self {
match auth {
Auth::None => RpcAuth::None,
Auth::UserPass { username, password } => RpcAuth::UserPass(username, password),
Auth::Cookie { file } => RpcAuth::CookieFile(file),
}
}
}
impl RpcBlockchain {
fn get_node_synced_height(&self) -> Result<u32, Error> {
let info = self.client.get_address_info(&self._storage_address)?;
@@ -156,7 +119,7 @@ impl Blockchain for RpcBlockchain {
.iter()
.map(|s| ImportMultiRequest {
timestamp: ImportMultiRescanSince::Timestamp(0),
script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)),
script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(&s)),
watchonly: Some(true),
..Default::default()
})
@@ -169,25 +132,22 @@ impl Blockchain for RpcBlockchain {
//TODO maybe convenient using import_descriptor for compatible descriptor and import_multi as fallback
self.client.import_multi(&requests, Some(&options))?;
loop {
let current_height = self.get_height()?;
let current_height = self.get_height()?;
// min because block invalidate may cause height to go down
let node_synced = self.get_node_synced_height()?.min(current_height);
// min because block invalidate may cause height to go down
let node_synced = self.get_node_synced_height()?.min(current_height);
let sync_up_to = node_synced.saturating_add(10_000).min(current_height);
//TODO call rescan in chunks (updating node_synced_height) so that in case of
// interruption work can be partially recovered
debug!(
"rescan_blockchain from:{} to:{}",
node_synced, current_height
);
self.client
.rescan_blockchain(Some(node_synced as usize), Some(current_height as usize))?;
progress_update.update(1.0, None)?;
debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to);
self.client
.rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?;
progress_update.update((sync_up_to as f32) / (current_height as f32), None)?;
self.set_node_synced_height(sync_up_to)?;
if sync_up_to == current_height {
break;
}
}
self.set_node_synced_height(current_height)?;
self.sync(database, progress_update)
}
@@ -238,7 +198,7 @@ impl Blockchain for RpcBlockchain {
txid, confirmation_time
);
known_tx.confirmation_time = confirmation_time;
db.set_tx(known_tx)?;
db.set_tx(&known_tx)?;
}
} else {
//TODO check there is already the raw tx in db?
@@ -360,7 +320,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
let client = Client::new(wallet_url, config.auth.clone())?;
let loaded_wallets = client.list_wallets()?;
if loaded_wallets.contains(&wallet_name) {
debug!("wallet already loaded {:?}", wallet_name);
@@ -427,13 +387,13 @@ where
{
//TODO check descriptors contains only public keys
let descriptor = descriptor
.into_wallet_descriptor(secp, network)?
.into_wallet_descriptor(&secp, network)?
.0
.to_string();
let mut wallet_name = get_checksum(&descriptor[..descriptor.find('#').unwrap()])?;
if let Some(change_descriptor) = change_descriptor {
let change_descriptor = change_descriptor
.into_wallet_descriptor(secp, network)?
.into_wallet_descriptor(&secp, network)?
.0
.to_string();
wallet_name.push_str(
@@ -467,7 +427,7 @@ crate::bdk_blockchain_tests! {
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
let config = RpcConfig {
url: test_client.bitcoind.rpc_url(),
auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
auth: Auth::CookieFile(test_client.bitcoind.params.cookie_file.clone()),
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{:?}", std::time::SystemTime::now() ),
skip_blocks: None,

View File

@@ -369,7 +369,7 @@ fn save_transaction_details_and_utxos<D: BatchDatabase>(
}
/// returns utxo dependency as the inputs needed for the utxo to exist
/// `tx_raw_in_db` must contains utxo's generating txs or errors with [crate::Error::TransactionNotFound]
/// `tx_raw_in_db` must contains utxo's generating txs or errors witt [crate::Error::TransactionNotFound]
fn utxos_deps<D: BatchDatabase>(
db: &mut D,
tx_raw_in_db: &HashMap<Txid, Transaction>,

View File

@@ -65,8 +65,6 @@ macro_rules! impl_inner_method {
$enum_name::Memory(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "key-value-db")]
$enum_name::Sled(inner) => inner.$name( $($args, )* ),
#[cfg(feature = "sqlite")]
$enum_name::Sqlite(inner) => inner.$name( $($args, )* ),
}
}
}
@@ -84,15 +82,10 @@ pub enum AnyDatabase {
#[cfg_attr(docsrs, doc(cfg(feature = "key-value-db")))]
/// Simple key-value embedded database based on [`sled`]
Sled(sled::Tree),
#[cfg(feature = "sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
/// Sqlite embedded database using [`rusqlite`]
Sqlite(sqlite::SqliteDatabase),
}
impl_from!(memory::MemoryDatabase, AnyDatabase, Memory,);
impl_from!(sled::Tree, AnyDatabase, Sled, #[cfg(feature = "key-value-db")]);
impl_from!(sqlite::SqliteDatabase, AnyDatabase, Sqlite, #[cfg(feature = "sqlite")]);
/// Type that contains any of the [`BatchDatabase::Batch`] types defined by the library
pub enum AnyBatch {
@@ -102,10 +95,6 @@ pub enum AnyBatch {
#[cfg_attr(docsrs, doc(cfg(feature = "key-value-db")))]
/// Simple key-value embedded database based on [`sled`]
Sled(<sled::Tree as BatchDatabase>::Batch),
#[cfg(feature = "sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
/// Sqlite embedded database using [`rusqlite`]
Sqlite(<sqlite::SqliteDatabase as BatchDatabase>::Batch),
}
impl_from!(
@@ -114,7 +103,6 @@ impl_from!(
Memory,
);
impl_from!(<sled::Tree as BatchDatabase>::Batch, AnyBatch, Sled, #[cfg(feature = "key-value-db")]);
impl_from!(<sqlite::SqliteDatabase as BatchDatabase>::Batch, AnyBatch, Sqlite, #[cfg(feature = "sqlite")]);
impl BatchOperations for AnyDatabase {
fn set_script_pubkey(
@@ -245,10 +233,6 @@ impl Database for AnyDatabase {
fn increment_last_index(&mut self, keychain: KeychainKind) -> Result<u32, Error> {
impl_inner_method!(AnyDatabase, self, increment_last_index, keychain)
}
fn flush(&mut self) -> Result<(), Error> {
impl_inner_method!(AnyDatabase, self, flush)
}
}
impl BatchOperations for AnyBatch {
@@ -312,26 +296,19 @@ impl BatchDatabase for AnyDatabase {
AnyDatabase::Memory(inner) => inner.begin_batch().into(),
#[cfg(feature = "key-value-db")]
AnyDatabase::Sled(inner) => inner.begin_batch().into(),
#[cfg(feature = "sqlite")]
AnyDatabase::Sqlite(inner) => inner.begin_batch().into(),
}
}
fn commit_batch(&mut self, batch: Self::Batch) -> Result<(), Error> {
match self {
AnyDatabase::Memory(db) => match batch {
AnyBatch::Memory(batch) => db.commit_batch(batch),
#[cfg(any(feature = "key-value-db", feature = "sqlite"))]
_ => unimplemented!("Other batch shouldn't be used with Memory db."),
#[cfg(feature = "key-value-db")]
_ => unimplemented!("Sled batch shouldn't be used with Memory db."),
},
#[cfg(feature = "key-value-db")]
AnyDatabase::Sled(db) => match batch {
AnyBatch::Sled(batch) => db.commit_batch(batch),
_ => unimplemented!("Other batch shouldn't be used with Sled db."),
},
#[cfg(feature = "sqlite")]
AnyDatabase::Sqlite(db) => match batch {
AnyBatch::Sqlite(batch) => db.commit_batch(batch),
_ => unimplemented!("Other batch shouldn't be used with Sqlite db."),
_ => unimplemented!("Memory batch shouldn't be used with Sled db."),
},
}
}
@@ -356,23 +333,6 @@ impl ConfigurableDatabase for sled::Tree {
}
}
/// Configuration type for a [`sqlite::SqliteDatabase`] database
#[cfg(feature = "sqlite")]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SqliteDbConfiguration {
/// Main directory of the db
pub path: String,
}
#[cfg(feature = "sqlite")]
impl ConfigurableDatabase for sqlite::SqliteDatabase {
type Config = SqliteDbConfiguration;
fn from_config(config: &Self::Config) -> Result<Self, Error> {
Ok(sqlite::SqliteDatabase::new(config.path.clone()))
}
}
/// Type that can contain any of the database configurations defined by the library
///
/// This allows storing a single configuration that can be loaded into an [`AnyDatabase`]
@@ -386,10 +346,6 @@ pub enum AnyDatabaseConfig {
#[cfg_attr(docsrs, doc(cfg(feature = "key-value-db")))]
/// Simple key-value embedded database based on [`sled`]
Sled(SledDbConfiguration),
#[cfg(feature = "sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
/// Sqlite embedded database using [`rusqlite`]
Sqlite(SqliteDbConfiguration),
}
impl ConfigurableDatabase for AnyDatabase {
@@ -402,14 +358,9 @@ impl ConfigurableDatabase for AnyDatabase {
}
#[cfg(feature = "key-value-db")]
AnyDatabaseConfig::Sled(inner) => AnyDatabase::Sled(sled::Tree::from_config(inner)?),
#[cfg(feature = "sqlite")]
AnyDatabaseConfig::Sqlite(inner) => {
AnyDatabase::Sqlite(sqlite::SqliteDatabase::from_config(inner)?)
}
})
}
}
impl_from!((), AnyDatabaseConfig, Memory,);
impl_from!(SledDbConfiguration, AnyDatabaseConfig, Sled, #[cfg(feature = "key-value-db")]);
impl_from!(SqliteDbConfiguration, AnyDatabaseConfig, Sqlite, #[cfg(feature = "sqlite")]);

View File

@@ -367,10 +367,6 @@ impl Database for Tree {
Ok(val)
})
}
fn flush(&mut self) -> Result<(), Error> {
Ok(Tree::flush(self).map(|_| ())?)
}
}
impl BatchDatabase for Tree {

View File

@@ -14,7 +14,6 @@
//! This module defines an in-memory database type called [`MemoryDatabase`] that is based on a
//! [`BTreeMap`].
use std::any::Any;
use std::collections::BTreeMap;
use std::ops::Bound::{Excluded, Included};
@@ -111,7 +110,7 @@ fn after(key: &[u8]) -> Vec<u8> {
/// [`database`]: crate::database
#[derive(Debug, Default)]
pub struct MemoryDatabase {
map: BTreeMap<Vec<u8>, Box<dyn Any + Send + Sync>>,
map: BTreeMap<Vec<u8>, Box<dyn std::any::Any>>,
deleted_keys: Vec<Vec<u8>>,
}
@@ -420,10 +419,6 @@ impl Database for MemoryDatabase {
Ok(*value)
}
fn flush(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl BatchDatabase for MemoryDatabase {
@@ -457,21 +452,20 @@ impl ConfigurableDatabase for MemoryDatabase {
/// don't have `test` set.
macro_rules! populate_test_db {
($db:expr, $tx_meta:expr, $current_height:expr$(,)?) => {{
use std::str::FromStr;
use $crate::database::BatchOperations;
let mut db = $db;
let tx_meta = $tx_meta;
let current_height: Option<u32> = $current_height;
let tx = $crate::bitcoin::Transaction {
let tx = Transaction {
version: 1,
lock_time: 0,
input: vec![],
output: tx_meta
.output
.iter()
.map(|out_meta| $crate::bitcoin::TxOut {
.map(|out_meta| bitcoin::TxOut {
value: out_meta.value,
script_pubkey: $crate::bitcoin::Address::from_str(&out_meta.to_address)
script_pubkey: bitcoin::Address::from_str(&out_meta.to_address)
.unwrap()
.script_pubkey(),
})
@@ -479,14 +473,12 @@ macro_rules! populate_test_db {
};
let txid = tx.txid();
let confirmation_time = tx_meta
.min_confirmations
.map(|conf| $crate::ConfirmationTime {
height: current_height.unwrap().checked_sub(conf as u32).unwrap(),
timestamp: 0,
});
let confirmation_time = tx_meta.min_confirmations.map(|conf| ConfirmationTime {
height: current_height.unwrap().checked_sub(conf as u32).unwrap(),
timestamp: 0,
});
let tx_details = $crate::TransactionDetails {
let tx_details = TransactionDetails {
transaction: Some(tx.clone()),
txid,
fee: Some(0),
@@ -498,13 +490,13 @@ macro_rules! populate_test_db {
db.set_tx(&tx_details).unwrap();
for (vout, out) in tx.output.iter().enumerate() {
db.set_utxo(&$crate::LocalUtxo {
db.set_utxo(&LocalUtxo {
txout: out.clone(),
outpoint: $crate::bitcoin::OutPoint {
outpoint: OutPoint {
txid,
vout: vout as u32,
},
keychain: $crate::KeychainKind::External,
keychain: KeychainKind::External,
})
.unwrap();
}

View File

@@ -36,11 +36,6 @@ pub use any::{AnyDatabase, AnyDatabaseConfig};
#[cfg(feature = "key-value-db")]
pub(crate) mod keyvalue;
#[cfg(feature = "sqlite")]
pub(crate) mod sqlite;
#[cfg(feature = "sqlite")]
pub use sqlite::SqliteDatabase;
pub mod memory;
pub use memory::MemoryDatabase;
@@ -139,9 +134,6 @@ pub trait Database: BatchOperations {
///
/// It should insert and return `0` if not present in the database
fn increment_last_index(&mut self, keychain: KeychainKind) -> Result<u32, Error>;
/// Force changes to be written to disk
fn flush(&mut self) -> Result<(), Error>;
}
/// Trait for a database that supports batch operations

View File

@@ -1,968 +0,0 @@
// Bitcoin Dev Kit
// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin::hash_types::Txid;
use bitcoin::{OutPoint, Script, Transaction, TxOut};
use crate::database::{BatchDatabase, BatchOperations, Database};
use crate::error::Error;
use crate::types::*;
use rusqlite::{named_params, Connection};
static MIGRATIONS: &[&str] = &[
"CREATE TABLE version (version INTEGER)",
"INSERT INTO version VALUES (1)",
"CREATE TABLE script_pubkeys (keychain TEXT, child INTEGER, script BLOB);",
"CREATE INDEX idx_keychain_child ON script_pubkeys(keychain, child);",
"CREATE INDEX idx_script ON script_pubkeys(script);",
"CREATE TABLE utxos (value INTEGER, keychain TEXT, vout INTEGER, txid BLOB, script BLOB);",
"CREATE INDEX idx_txid_vout ON utxos(txid, vout);",
"CREATE TABLE transactions (txid BLOB, raw_tx BLOB);",
"CREATE INDEX idx_txid ON transactions(txid);",
"CREATE TABLE transaction_details (txid BLOB, timestamp INTEGER, received INTEGER, sent INTEGER, fee INTEGER, height INTEGER, verified INTEGER DEFAULT 0);",
"CREATE INDEX idx_txdetails_txid ON transaction_details(txid);",
"CREATE TABLE last_derivation_indices (keychain TEXT, value INTEGER);",
"CREATE UNIQUE INDEX idx_indices_keychain ON last_derivation_indices(keychain);",
"CREATE TABLE checksums (keychain TEXT, checksum BLOB);",
"CREATE INDEX idx_checksums_keychain ON checksums(keychain);",
];
/// Sqlite database stored on filesystem
///
/// This is a permanent storage solution for devices and platforms that provide a filesystem.
/// [`crate::database`]
#[derive(Debug)]
pub struct SqliteDatabase {
/// Path on the local filesystem to store the sqlite file
pub path: String,
/// A rusqlite connection object to the sqlite database
pub connection: Connection,
}
impl SqliteDatabase {
/// Instantiate a new SqliteDatabase instance by creating a connection
/// to the database stored at path
pub fn new(path: String) -> Self {
let connection = get_connection(&path).unwrap();
SqliteDatabase { path, connection }
}
fn insert_script_pubkey(
&self,
keychain: String,
child: u32,
script: &[u8],
) -> Result<i64, Error> {
let mut statement = self.connection.prepare_cached("INSERT INTO script_pubkeys (keychain, child, script) VALUES (:keychain, :child, :script)")?;
statement.execute(named_params! {
":keychain": keychain,
":child": child,
":script": script
})?;
Ok(self.connection.last_insert_rowid())
}
fn insert_utxo(
&self,
value: u64,
keychain: String,
vout: u32,
txid: &[u8],
script: &[u8],
) -> Result<i64, Error> {
let mut statement = self.connection.prepare_cached("INSERT INTO utxos (value, keychain, vout, txid, script) VALUES (:value, :keychain, :vout, :txid, :script)")?;
statement.execute(named_params! {
":value": value,
":keychain": keychain,
":vout": vout,
":txid": txid,
":script": script
})?;
Ok(self.connection.last_insert_rowid())
}
fn insert_transaction(&self, txid: &[u8], raw_tx: &[u8]) -> Result<i64, Error> {
let mut statement = self
.connection
.prepare_cached("INSERT INTO transactions (txid, raw_tx) VALUES (:txid, :raw_tx)")?;
statement.execute(named_params! {
":txid": txid,
":raw_tx": raw_tx,
})?;
Ok(self.connection.last_insert_rowid())
}
fn update_transaction(&self, txid: &[u8], raw_tx: &[u8]) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("UPDATE transactions SET raw_tx=:raw_tx WHERE txid=:txid")?;
statement.execute(named_params! {
":txid": txid,
":raw_tx": raw_tx,
})?;
Ok(())
}
fn insert_transaction_details(&self, transaction: &TransactionDetails) -> Result<i64, Error> {
let (timestamp, height) = match &transaction.confirmation_time {
Some(confirmation_time) => (
Some(confirmation_time.timestamp),
Some(confirmation_time.height),
),
None => (None, None),
};
let txid: &[u8] = &transaction.txid;
let mut statement = self.connection.prepare_cached("INSERT INTO transaction_details (txid, timestamp, received, sent, fee, height, verified) VALUES (:txid, :timestamp, :received, :sent, :fee, :height, :verified)")?;
statement.execute(named_params! {
":txid": txid,
":timestamp": timestamp,
":received": transaction.received,
":sent": transaction.sent,
":fee": transaction.fee,
":height": height,
":verified": transaction.verified
})?;
Ok(self.connection.last_insert_rowid())
}
fn update_transaction_details(&self, transaction: &TransactionDetails) -> Result<(), Error> {
let (timestamp, height) = match &transaction.confirmation_time {
Some(confirmation_time) => (
Some(confirmation_time.timestamp),
Some(confirmation_time.height),
),
None => (None, None),
};
let txid: &[u8] = &transaction.txid;
let mut statement = self.connection.prepare_cached("UPDATE transaction_details SET timestamp=:timestamp, received=:received, sent=:sent, fee=:fee, height=:height, verified=:verified WHERE txid=:txid")?;
statement.execute(named_params! {
":txid": txid,
":timestamp": timestamp,
":received": transaction.received,
":sent": transaction.sent,
":fee": transaction.fee,
":height": height,
":verified": transaction.verified,
})?;
Ok(())
}
fn insert_last_derivation_index(&self, keychain: String, value: u32) -> Result<i64, Error> {
let mut statement = self.connection.prepare_cached(
"INSERT INTO last_derivation_indices (keychain, value) VALUES (:keychain, :value)",
)?;
statement.execute(named_params! {
":keychain": keychain,
":value": value,
})?;
Ok(self.connection.last_insert_rowid())
}
fn insert_checksum(&self, keychain: String, checksum: &[u8]) -> Result<i64, Error> {
let mut statement = self.connection.prepare_cached(
"INSERT INTO checksums (keychain, checksum) VALUES (:keychain, :checksum)",
)?;
statement.execute(named_params! {
":keychain": keychain,
":checksum": checksum,
})?;
Ok(self.connection.last_insert_rowid())
}
fn update_last_derivation_index(&self, keychain: String, value: u32) -> Result<(), Error> {
let mut statement = self.connection.prepare_cached(
"INSERT INTO last_derivation_indices (keychain, value) VALUES (:keychain, :value) ON CONFLICT(keychain) DO UPDATE SET value=:value WHERE keychain=:keychain",
)?;
statement.execute(named_params! {
":keychain": keychain,
":value": value,
})?;
Ok(())
}
fn select_script_pubkeys(&self) -> Result<Vec<Script>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT script FROM script_pubkeys")?;
let mut scripts: Vec<Script> = vec![];
let mut rows = statement.query([])?;
while let Some(row) = rows.next()? {
let raw_script: Vec<u8> = row.get(0)?;
scripts.push(raw_script.into());
}
Ok(scripts)
}
fn select_script_pubkeys_by_keychain(&self, keychain: String) -> Result<Vec<Script>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT script FROM script_pubkeys WHERE keychain=:keychain")?;
let mut scripts: Vec<Script> = vec![];
let mut rows = statement.query(named_params! {":keychain": keychain})?;
while let Some(row) = rows.next()? {
let raw_script: Vec<u8> = row.get(0)?;
scripts.push(raw_script.into());
}
Ok(scripts)
}
fn select_script_pubkey_by_path(
&self,
keychain: String,
child: u32,
) -> Result<Option<Script>, Error> {
let mut statement = self.connection.prepare_cached(
"SELECT script FROM script_pubkeys WHERE keychain=:keychain AND child=:child",
)?;
let mut rows = statement.query(named_params! {":keychain": keychain,":child": child})?;
match rows.next()? {
Some(row) => {
let script: Vec<u8> = row.get(0)?;
let script: Script = script.into();
Ok(Some(script))
}
None => Ok(None),
}
}
fn select_script_pubkey_by_script(
&self,
script: &[u8],
) -> Result<Option<(KeychainKind, u32)>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT keychain, child FROM script_pubkeys WHERE script=:script")?;
let mut rows = statement.query(named_params! {":script": script})?;
match rows.next()? {
Some(row) => {
let keychain: String = row.get(0)?;
let keychain: KeychainKind = serde_json::from_str(&keychain)?;
let child: u32 = row.get(1)?;
Ok(Some((keychain, child)))
}
None => Ok(None),
}
}
fn select_utxos(&self) -> Result<Vec<LocalUtxo>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT value, keychain, vout, txid, script FROM utxos")?;
let mut utxos: Vec<LocalUtxo> = vec![];
let mut rows = statement.query([])?;
while let Some(row) = rows.next()? {
let value = row.get(0)?;
let keychain: String = row.get(1)?;
let vout = row.get(2)?;
let txid: Vec<u8> = row.get(3)?;
let script: Vec<u8> = row.get(4)?;
let keychain: KeychainKind = serde_json::from_str(&keychain)?;
utxos.push(LocalUtxo {
outpoint: OutPoint::new(deserialize(&txid)?, vout),
txout: TxOut {
value,
script_pubkey: script.into(),
},
keychain,
})
}
Ok(utxos)
}
fn select_utxo_by_outpoint(
&self,
txid: &[u8],
vout: u32,
) -> Result<Option<(u64, KeychainKind, Script)>, Error> {
let mut statement = self.connection.prepare_cached(
"SELECT value, keychain, script FROM utxos WHERE txid=:txid AND vout=:vout",
)?;
let mut rows = statement.query(named_params! {":txid": txid,":vout": vout})?;
match rows.next()? {
Some(row) => {
let value: u64 = row.get(0)?;
let keychain: String = row.get(1)?;
let keychain: KeychainKind = serde_json::from_str(&keychain)?;
let script: Vec<u8> = row.get(2)?;
let script: Script = script.into();
Ok(Some((value, keychain, script)))
}
None => Ok(None),
}
}
fn select_transactions(&self) -> Result<Vec<Transaction>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT raw_tx FROM transactions")?;
let mut txs: Vec<Transaction> = vec![];
let mut rows = statement.query([])?;
while let Some(row) = rows.next()? {
let raw_tx: Vec<u8> = row.get(0)?;
let tx: Transaction = deserialize(&raw_tx)?;
txs.push(tx);
}
Ok(txs)
}
fn select_transaction_by_txid(&self, txid: &[u8]) -> Result<Option<Transaction>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT raw_tx FROM transactions WHERE txid=:txid")?;
let mut rows = statement.query(named_params! {":txid": txid})?;
match rows.next()? {
Some(row) => {
let raw_tx: Vec<u8> = row.get(0)?;
let tx: Transaction = deserialize(&raw_tx)?;
Ok(Some(tx))
}
None => Ok(None),
}
}
fn select_transaction_details_with_raw(&self) -> Result<Vec<TransactionDetails>, Error> {
let mut statement = self.connection.prepare_cached("SELECT transaction_details.txid, transaction_details.timestamp, transaction_details.received, transaction_details.sent, transaction_details.fee, transaction_details.height, transaction_details.verified, transactions.raw_tx FROM transaction_details, transactions WHERE transaction_details.txid = transactions.txid")?;
let mut transaction_details: Vec<TransactionDetails> = vec![];
let mut rows = statement.query([])?;
while let Some(row) = rows.next()? {
let txid: Vec<u8> = row.get(0)?;
let txid: Txid = deserialize(&txid)?;
let timestamp: Option<u64> = row.get(1)?;
let received: u64 = row.get(2)?;
let sent: u64 = row.get(3)?;
let fee: Option<u64> = row.get(4)?;
let height: Option<u32> = row.get(5)?;
let verified: bool = row.get(6)?;
let raw_tx: Option<Vec<u8>> = row.get(7)?;
let tx: Option<Transaction> = match raw_tx {
Some(raw_tx) => {
let tx: Transaction = deserialize(&raw_tx)?;
Some(tx)
}
None => None,
};
let confirmation_time = match (height, timestamp) {
(Some(height), Some(timestamp)) => Some(ConfirmationTime { height, timestamp }),
_ => None,
};
transaction_details.push(TransactionDetails {
transaction: tx,
txid,
received,
sent,
fee,
confirmation_time,
verified,
});
}
Ok(transaction_details)
}
fn select_transaction_details(&self) -> Result<Vec<TransactionDetails>, Error> {
let mut statement = self.connection.prepare_cached(
"SELECT txid, timestamp, received, sent, fee, height, verified FROM transaction_details",
)?;
let mut transaction_details: Vec<TransactionDetails> = vec![];
let mut rows = statement.query([])?;
while let Some(row) = rows.next()? {
let txid: Vec<u8> = row.get(0)?;
let txid: Txid = deserialize(&txid)?;
let timestamp: Option<u64> = row.get(1)?;
let received: u64 = row.get(2)?;
let sent: u64 = row.get(3)?;
let fee: Option<u64> = row.get(4)?;
let height: Option<u32> = row.get(5)?;
let verified: bool = row.get(6)?;
let confirmation_time = match (height, timestamp) {
(Some(height), Some(timestamp)) => Some(ConfirmationTime { height, timestamp }),
_ => None,
};
transaction_details.push(TransactionDetails {
transaction: None,
txid,
received,
sent,
fee,
confirmation_time,
verified,
});
}
Ok(transaction_details)
}
fn select_transaction_details_by_txid(
&self,
txid: &[u8],
) -> Result<Option<TransactionDetails>, Error> {
let mut statement = self.connection.prepare_cached("SELECT transaction_details.timestamp, transaction_details.received, transaction_details.sent, transaction_details.fee, transaction_details.height, transaction_details.verified, transactions.raw_tx FROM transaction_details, transactions WHERE transaction_details.txid=transactions.txid AND transaction_details.txid=:txid")?;
let mut rows = statement.query(named_params! { ":txid": txid })?;
match rows.next()? {
Some(row) => {
let timestamp: Option<u64> = row.get(0)?;
let received: u64 = row.get(1)?;
let sent: u64 = row.get(2)?;
let fee: Option<u64> = row.get(3)?;
let height: Option<u32> = row.get(4)?;
let verified: bool = row.get(5)?;
let raw_tx: Option<Vec<u8>> = row.get(6)?;
let tx: Option<Transaction> = match raw_tx {
Some(raw_tx) => {
let tx: Transaction = deserialize(&raw_tx)?;
Some(tx)
}
None => None,
};
let confirmation_time = match (height, timestamp) {
(Some(height), Some(timestamp)) => Some(ConfirmationTime { height, timestamp }),
_ => None,
};
Ok(Some(TransactionDetails {
transaction: tx,
txid: deserialize(txid)?,
received,
sent,
fee,
confirmation_time,
verified,
}))
}
None => Ok(None),
}
}
fn select_last_derivation_index_by_keychain(
&self,
keychain: String,
) -> Result<Option<u32>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT value FROM last_derivation_indices WHERE keychain=:keychain")?;
let mut rows = statement.query(named_params! {":keychain": keychain})?;
match rows.next()? {
Some(row) => {
let value: u32 = row.get(0)?;
Ok(Some(value))
}
None => Ok(None),
}
}
fn select_checksum_by_keychain(&self, keychain: String) -> Result<Option<Vec<u8>>, Error> {
let mut statement = self
.connection
.prepare_cached("SELECT checksum FROM checksums WHERE keychain=:keychain")?;
let mut rows = statement.query(named_params! {":keychain": keychain})?;
match rows.next()? {
Some(row) => {
let checksum: Vec<u8> = row.get(0)?;
Ok(Some(checksum))
}
None => Ok(None),
}
}
fn delete_script_pubkey_by_path(&self, keychain: String, child: u32) -> Result<(), Error> {
let mut statement = self.connection.prepare_cached(
"DELETE FROM script_pubkeys WHERE keychain=:keychain AND child=:child",
)?;
statement.execute(named_params! {
":keychain": keychain,
":child": child
})?;
Ok(())
}
fn delete_script_pubkey_by_script(&self, script: &[u8]) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("DELETE FROM script_pubkeys WHERE script=:script")?;
statement.execute(named_params! {
":script": script
})?;
Ok(())
}
fn delete_utxo_by_outpoint(&self, txid: &[u8], vout: u32) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("DELETE FROM utxos WHERE txid=:txid AND vout=:vout")?;
statement.execute(named_params! {
":txid": txid,
":vout": vout
})?;
Ok(())
}
fn delete_transaction_by_txid(&self, txid: &[u8]) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("DELETE FROM transactions WHERE txid=:txid")?;
statement.execute(named_params! {":txid": txid})?;
Ok(())
}
fn delete_transaction_details_by_txid(&self, txid: &[u8]) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("DELETE FROM transaction_details WHERE txid=:txid")?;
statement.execute(named_params! {":txid": txid})?;
Ok(())
}
fn delete_last_derivation_index_by_keychain(&self, keychain: String) -> Result<(), Error> {
let mut statement = self
.connection
.prepare_cached("DELETE FROM last_derivation_indices WHERE keychain=:keychain")?;
statement.execute(named_params! {
":keychain": &keychain
})?;
Ok(())
}
}
impl BatchOperations for SqliteDatabase {
fn set_script_pubkey(
&mut self,
script: &Script,
keychain: KeychainKind,
child: u32,
) -> Result<(), Error> {
let keychain = serde_json::to_string(&keychain)?;
self.insert_script_pubkey(keychain, child, script.as_bytes())?;
Ok(())
}
fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), Error> {
self.insert_utxo(
utxo.txout.value,
serde_json::to_string(&utxo.keychain)?,
utxo.outpoint.vout,
&utxo.outpoint.txid,
utxo.txout.script_pubkey.as_bytes(),
)?;
Ok(())
}
fn set_raw_tx(&mut self, transaction: &Transaction) -> Result<(), Error> {
match self.select_transaction_by_txid(&transaction.txid())? {
Some(_) => {
self.update_transaction(&transaction.txid(), &serialize(transaction))?;
}
None => {
self.insert_transaction(&transaction.txid(), &serialize(transaction))?;
}
}
Ok(())
}
fn set_tx(&mut self, transaction: &TransactionDetails) -> Result<(), Error> {
match self.select_transaction_details_by_txid(&transaction.txid)? {
Some(_) => {
self.update_transaction_details(transaction)?;
}
None => {
self.insert_transaction_details(transaction)?;
}
}
if let Some(tx) = &transaction.transaction {
self.set_raw_tx(tx)?;
}
Ok(())
}
fn set_last_index(&mut self, keychain: KeychainKind, value: u32) -> Result<(), Error> {
self.update_last_derivation_index(serde_json::to_string(&keychain)?, value)?;
Ok(())
}
fn del_script_pubkey_from_path(
&mut self,
keychain: KeychainKind,
child: u32,
) -> Result<Option<Script>, Error> {
let keychain = serde_json::to_string(&keychain)?;
let script = self.select_script_pubkey_by_path(keychain.clone(), child)?;
match script {
Some(script) => {
self.delete_script_pubkey_by_path(keychain, child)?;
Ok(Some(script))
}
None => Ok(None),
}
}
fn del_path_from_script_pubkey(
&mut self,
script: &Script,
) -> Result<Option<(KeychainKind, u32)>, Error> {
match self.select_script_pubkey_by_script(script.as_bytes())? {
Some((keychain, child)) => {
self.delete_script_pubkey_by_script(script.as_bytes())?;
Ok(Some((keychain, child)))
}
None => Ok(None),
}
}
fn del_utxo(&mut self, outpoint: &OutPoint) -> Result<Option<LocalUtxo>, Error> {
match self.select_utxo_by_outpoint(&outpoint.txid, outpoint.vout)? {
Some((value, keychain, script_pubkey)) => {
self.delete_utxo_by_outpoint(&outpoint.txid, outpoint.vout)?;
Ok(Some(LocalUtxo {
outpoint: *outpoint,
txout: TxOut {
value,
script_pubkey,
},
keychain,
}))
}
None => Ok(None),
}
}
fn del_raw_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error> {
match self.select_transaction_by_txid(txid)? {
Some(tx) => {
self.delete_transaction_by_txid(txid)?;
Ok(Some(tx))
}
None => Ok(None),
}
}
fn del_tx(
&mut self,
txid: &Txid,
include_raw: bool,
) -> Result<Option<TransactionDetails>, Error> {
match self.select_transaction_details_by_txid(txid)? {
Some(transaction_details) => {
self.delete_transaction_details_by_txid(txid)?;
if include_raw {
self.delete_transaction_by_txid(txid)?;
}
Ok(Some(transaction_details))
}
None => Ok(None),
}
}
fn del_last_index(&mut self, keychain: KeychainKind) -> Result<Option<u32>, Error> {
let keychain = serde_json::to_string(&keychain)?;
match self.select_last_derivation_index_by_keychain(keychain.clone())? {
Some(value) => {
self.delete_last_derivation_index_by_keychain(keychain)?;
Ok(Some(value))
}
None => Ok(None),
}
}
}
impl Database for SqliteDatabase {
fn check_descriptor_checksum<B: AsRef<[u8]>>(
&mut self,
keychain: KeychainKind,
bytes: B,
) -> Result<(), Error> {
let keychain = serde_json::to_string(&keychain)?;
match self.select_checksum_by_keychain(keychain.clone())? {
Some(checksum) => {
if checksum == bytes.as_ref().to_vec() {
Ok(())
} else {
Err(Error::ChecksumMismatch)
}
}
None => {
self.insert_checksum(keychain, bytes.as_ref())?;
Ok(())
}
}
}
fn iter_script_pubkeys(&self, keychain: Option<KeychainKind>) -> Result<Vec<Script>, Error> {
match keychain {
Some(keychain) => {
let keychain = serde_json::to_string(&keychain)?;
self.select_script_pubkeys_by_keychain(keychain)
}
None => self.select_script_pubkeys(),
}
}
fn iter_utxos(&self) -> Result<Vec<LocalUtxo>, Error> {
self.select_utxos()
}
fn iter_raw_txs(&self) -> Result<Vec<Transaction>, Error> {
self.select_transactions()
}
fn iter_txs(&self, include_raw: bool) -> Result<Vec<TransactionDetails>, Error> {
match include_raw {
true => self.select_transaction_details_with_raw(),
false => self.select_transaction_details(),
}
}
fn get_script_pubkey_from_path(
&self,
keychain: KeychainKind,
child: u32,
) -> Result<Option<Script>, Error> {
let keychain = serde_json::to_string(&keychain)?;
match self.select_script_pubkey_by_path(keychain, child)? {
Some(script) => Ok(Some(script)),
None => Ok(None),
}
}
fn get_path_from_script_pubkey(
&self,
script: &Script,
) -> Result<Option<(KeychainKind, u32)>, Error> {
match self.select_script_pubkey_by_script(script.as_bytes())? {
Some((keychain, child)) => Ok(Some((keychain, child))),
None => Ok(None),
}
}
fn get_utxo(&self, outpoint: &OutPoint) -> Result<Option<LocalUtxo>, Error> {
match self.select_utxo_by_outpoint(&outpoint.txid, outpoint.vout)? {
Some((value, keychain, script_pubkey)) => Ok(Some(LocalUtxo {
outpoint: *outpoint,
txout: TxOut {
value,
script_pubkey,
},
keychain,
})),
None => Ok(None),
}
}
fn get_raw_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
match self.select_transaction_by_txid(txid)? {
Some(tx) => Ok(Some(tx)),
None => Ok(None),
}
}
fn get_tx(&self, txid: &Txid, include_raw: bool) -> Result<Option<TransactionDetails>, Error> {
match self.select_transaction_details_by_txid(txid)? {
Some(mut transaction_details) => {
if !include_raw {
transaction_details.transaction = None;
}
Ok(Some(transaction_details))
}
None => Ok(None),
}
}
fn get_last_index(&self, keychain: KeychainKind) -> Result<Option<u32>, Error> {
let keychain = serde_json::to_string(&keychain)?;
let value = self.select_last_derivation_index_by_keychain(keychain)?;
Ok(value)
}
fn increment_last_index(&mut self, keychain: KeychainKind) -> Result<u32, Error> {
let keychain_string = serde_json::to_string(&keychain)?;
match self.get_last_index(keychain)? {
Some(value) => {
self.update_last_derivation_index(keychain_string, value + 1)?;
Ok(value + 1)
}
None => {
self.insert_last_derivation_index(keychain_string, 0)?;
Ok(0)
}
}
}
fn flush(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl BatchDatabase for SqliteDatabase {
type Batch = SqliteDatabase;
fn begin_batch(&self) -> Self::Batch {
let db = SqliteDatabase::new(self.path.clone());
db.connection.execute("BEGIN TRANSACTION", []).unwrap();
db
}
fn commit_batch(&mut self, batch: Self::Batch) -> Result<(), Error> {
batch.connection.execute("COMMIT TRANSACTION", [])?;
Ok(())
}
}
pub fn get_connection(path: &str) -> Result<Connection, Error> {
let connection = Connection::open(path)?;
migrate(&connection)?;
Ok(connection)
}
pub fn get_schema_version(conn: &Connection) -> rusqlite::Result<i32> {
let statement = conn.prepare_cached("SELECT version FROM version");
match statement {
Err(rusqlite::Error::SqliteFailure(e, Some(msg))) => {
if msg == "no such table: version" {
Ok(0)
} else {
Err(rusqlite::Error::SqliteFailure(e, Some(msg)))
}
}
Ok(mut stmt) => {
let mut rows = stmt.query([])?;
match rows.next()? {
Some(row) => {
let version: i32 = row.get(0)?;
Ok(version)
}
None => Ok(0),
}
}
_ => Ok(0),
}
}
pub fn set_schema_version(conn: &Connection, version: i32) -> rusqlite::Result<usize> {
conn.execute(
"UPDATE version SET version=:version",
named_params! {":version": version},
)
}
pub fn migrate(conn: &Connection) -> rusqlite::Result<()> {
let version = get_schema_version(conn)?;
let stmts = &MIGRATIONS[(version as usize)..];
let mut i: i32 = version;
if version == MIGRATIONS.len() as i32 {
log::info!("db up to date, no migration needed");
return Ok(());
}
for stmt in stmts {
let res = conn.execute(stmt, []);
if res.is_err() {
println!("migration failed on:\n{}\n{:?}", stmt, res);
break;
}
i += 1;
}
set_schema_version(conn, i)?;
Ok(())
}
#[cfg(test)]
pub mod test {
use crate::database::SqliteDatabase;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_database() -> SqliteDatabase {
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut dir = std::env::temp_dir();
dir.push(format!("bdk_{}", time.as_nanos()));
SqliteDatabase::new(String::from(dir.to_str().unwrap()))
}
#[test]
fn test_script_pubkey() {
crate::database::test::test_script_pubkey(get_database());
}
#[test]
fn test_batch_script_pubkey() {
crate::database::test::test_batch_script_pubkey(get_database());
}
#[test]
fn test_iter_script_pubkey() {
crate::database::test::test_iter_script_pubkey(get_database());
}
#[test]
fn test_del_script_pubkey() {
crate::database::test::test_del_script_pubkey(get_database());
}
#[test]
fn test_utxo() {
crate::database::test::test_utxo(get_database());
}
#[test]
fn test_raw_tx() {
crate::database::test::test_raw_tx(get_database());
}
#[test]
fn test_tx() {
crate::database::test::test_tx(get_database());
}
#[test]
fn test_last_index() {
crate::database::test::test_last_index(get_database());
}
}

View File

@@ -571,9 +571,8 @@ macro_rules! fragment {
( pk ( $key:expr ) ) => ({
$crate::fragment!(c:pk_k ( $key ))
});
( pk_h ( $key:expr ) ) => ({
let secp = $crate::bitcoin::secp256k1::Secp256k1::new();
$crate::keys::make_pkh($key, &secp)
( pk_h ( $key_hash:expr ) ) => ({
$crate::impl_leaf_opcode_value!(PkH, $key_hash)
});
( after ( $value:expr ) ) => ({
$crate::impl_leaf_opcode_value!(After, $value)
@@ -602,9 +601,6 @@ macro_rules! fragment {
( and_or ( $( $inner:tt )* ) ) => ({
$crate::impl_node_opcode_three!(AndOr, $( $inner )*)
});
( andor ( $( $inner:tt )* ) ) => ({
$crate::impl_node_opcode_three!(AndOr, $( $inner )*)
});
( or_b ( $( $inner:tt )* ) ) => ({
$crate::impl_node_opcode_two!(OrB, $( $inner )*)
});

View File

@@ -47,12 +47,14 @@ use bitcoin::util::bip32::Fingerprint;
use bitcoin::PublicKey;
use miniscript::descriptor::{DescriptorPublicKey, ShInner, SortedMultiVec, WshInner};
use miniscript::{Descriptor, Miniscript, MiniscriptKey, Satisfier, ScriptContext, Terminal};
use miniscript::{
Descriptor, Miniscript, MiniscriptKey, Satisfier, ScriptContext, Terminal, ToPublicKey,
};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use crate::descriptor::ExtractPolicy;
use crate::descriptor::{DerivedDescriptorKey, ExtractPolicy};
use crate::wallet::signer::{SignerId, SignersContainer};
use crate::wallet::utils::{self, After, Older, SecpCtx};
@@ -86,6 +88,13 @@ impl PkOrF {
},
}
}
fn from_key_hash(k: hash160::Hash) -> Self {
PkOrF {
pubkey_hash: Some(k),
..Default::default()
}
}
}
/// An item that needs to be satisfied
@@ -770,6 +779,25 @@ fn signature_in_psbt(psbt: &Psbt, key: &DescriptorPublicKey, secp: &SecpCtx) ->
})
}
fn signature_key(
key: &<DescriptorPublicKey as MiniscriptKey>::Hash,
signers: &SignersContainer,
secp: &SecpCtx,
) -> Policy {
let key_hash = DerivedDescriptorKey::new(key.clone(), secp)
.to_public_key()
.to_pubkeyhash();
let mut policy: Policy = SatisfiableItem::Signature(PkOrF::from_key_hash(key_hash)).into();
if signers.find(SignerId::PkHash(key_hash)).is_some() {
policy.contribution = Satisfaction::Complete {
condition: Default::default(),
}
}
policy
}
impl<Ctx: ScriptContext> ExtractPolicy for Miniscript<DescriptorPublicKey, Ctx> {
fn extract_policy(
&self,
@@ -781,7 +809,7 @@ impl<Ctx: ScriptContext> ExtractPolicy for Miniscript<DescriptorPublicKey, Ctx>
// Leaves
Terminal::True | Terminal::False => None,
Terminal::PkK(pubkey) => Some(signature(pubkey, signers, build_sat, secp)),
Terminal::PkH(pubkey_hash) => Some(signature(pubkey_hash, signers, build_sat, secp)),
Terminal::PkH(pubkey_hash) => Some(signature_key(pubkey_hash, signers, secp)),
Terminal::After(value) => {
let mut policy: Policy = SatisfiableItem::AbsoluteTimelock { value: *value }.into();
policy.contribution = Satisfaction::Complete {
@@ -1416,7 +1444,6 @@ mod test {
const ALICE_TPRV_STR:&str = "tprv8ZgxMBicQKsPf6T5X327efHnvJDr45Xnb8W4JifNWtEoqXu9MRYS4v1oYe6DFcMVETxy5w3bqpubYRqvcVTqovG1LifFcVUuJcbwJwrhYzP";
const BOB_TPRV_STR:&str = "tprv8ZgxMBicQKsPeinZ155cJAn117KYhbaN6MV3WeG6sWhxWzcvX1eg1awd4C9GpUN1ncLEM2rzEvunAg3GizdZD4QPPCkisTz99tXXB4wZArp";
const CAROL_TPRV_STR:&str = "tprv8ZgxMBicQKsPdC3CicFifuLCEyVVdXVUNYorxUWj3iGZ6nimnLAYAY9SYB7ib8rKzRxrCKFcEytCt6szwd2GHnGPRCBLAEAoSVDefSNk4Bt";
const ALICE_BOB_PATH: &str = "m/0'";
#[test]
@@ -1575,28 +1602,4 @@ mod test {
);
//println!("{}", serde_json::to_string(&policy_expired_signed).unwrap());
}
#[test]
fn test_extract_pkh() {
let secp = Secp256k1::new();
let (prvkey_alice, _, _) = setup_keys(ALICE_TPRV_STR, ALICE_BOB_PATH, &secp);
let (prvkey_bob, _, _) = setup_keys(BOB_TPRV_STR, ALICE_BOB_PATH, &secp);
let (prvkey_carol, _, _) = setup_keys(CAROL_TPRV_STR, ALICE_BOB_PATH, &secp);
let desc = descriptor!(wsh(c: andor(
pk(prvkey_alice),
pk_k(prvkey_bob),
pk_h(prvkey_carol),
)))
.unwrap();
let (wallet_desc, keymap) = desc
.into_wallet_descriptor(&secp, Network::Testnet)
.unwrap();
let signers_container = Arc::new(SignersContainer::from(keymap));
let policy = wallet_desc.extract_policy(&signers_container, BuildSatisfaction::None, &secp);
assert!(policy.is_ok());
}
}

View File

@@ -130,7 +130,7 @@ pub enum Error {
Electrum(electrum_client::Error),
#[cfg(feature = "esplora")]
/// Esplora client error
Esplora(Box<crate::blockchain::esplora::EsploraError>),
Esplora(crate::blockchain::esplora::EsploraError),
#[cfg(feature = "compact_filters")]
/// Compact filters client error)
CompactFilters(crate::blockchain::compact_filters::CompactFiltersError),
@@ -140,9 +140,6 @@ pub enum Error {
#[cfg(feature = "rpc")]
/// Rpc client error
Rpc(bitcoincore_rpc::Error),
#[cfg(feature = "sqlite")]
/// Rusqlite client error
Rusqlite(rusqlite::Error),
}
impl fmt::Display for Error {
@@ -193,12 +190,12 @@ impl_error!(bitcoin::util::psbt::PsbtParseError, PsbtParse);
#[cfg(feature = "electrum")]
impl_error!(electrum_client::Error, Electrum);
#[cfg(feature = "esplora")]
impl_error!(crate::blockchain::esplora::EsploraError, Esplora);
#[cfg(feature = "key-value-db")]
impl_error!(sled::Error, Sled);
#[cfg(feature = "rpc")]
impl_error!(bitcoincore_rpc::Error, Rpc);
#[cfg(feature = "sqlite")]
impl_error!(rusqlite::Error, Rusqlite);
#[cfg(feature = "compact_filters")]
impl From<crate::blockchain::compact_filters::CompactFiltersError> for Error {
@@ -219,10 +216,3 @@ impl From<crate::wallet::verify::VerifyError> for Error {
}
}
}
#[cfg(feature = "esplora")]
impl From<crate::blockchain::esplora::EsploraError> for Error {
fn from(other: crate::blockchain::esplora::EsploraError) -> Self {
Error::Esplora(Box::new(other))
}
}

View File

@@ -40,7 +40,7 @@ pub type MnemonicWithPassphrase = (Mnemonic, Option<String>);
#[cfg_attr(docsrs, doc(cfg(feature = "keys-bip39")))]
impl<Ctx: ScriptContext> DerivableKey<Ctx> for Seed {
fn into_extended_key(self) -> Result<ExtendedKey<Ctx>, KeyError> {
Ok(bip32::ExtendedPrivKey::new_master(Network::Bitcoin, self.as_bytes())?.into())
Ok(bip32::ExtendedPrivKey::new_master(Network::Bitcoin, &self.as_bytes())?.into())
}
fn into_descriptor_key(

View File

@@ -753,20 +753,6 @@ pub fn make_pk<Pk: IntoDescriptorKey<Ctx>, Ctx: ScriptContext>(
Ok((minisc, key_map, valid_networks))
}
// Used internally by `bdk::fragment!` to build `pk_h()` fragments
#[doc(hidden)]
pub fn make_pkh<Pk: IntoDescriptorKey<Ctx>, Ctx: ScriptContext>(
descriptor_key: Pk,
secp: &SecpCtx,
) -> Result<(Miniscript<DescriptorPublicKey, Ctx>, KeyMap, ValidNetworks), DescriptorError> {
let (key, key_map, valid_networks) = descriptor_key.into_descriptor_key()?.extract(secp)?;
let minisc = Miniscript::from_ast(Terminal::PkH(key))?;
minisc.check_minsicript()?;
Ok((minisc, key_map, valid_networks))
}
// Used internally by `bdk::fragment!` to build `multi()` fragments
#[doc(hidden)]
pub fn make_multi<Pk: IntoDescriptorKey<Ctx>, Ctx: ScriptContext>(

View File

@@ -40,7 +40,7 @@
//! interact with the bitcoin P2P network.
//!
//! ```toml
//! bdk = "0.13.0"
//! bdk = "0.9.0"
//! ```
#![cfg_attr(
feature = "electrum",
@@ -205,24 +205,11 @@ extern crate serde;
#[macro_use]
extern crate serde_json;
#[cfg(all(feature = "reqwest", feature = "ureq"))]
compile_error!("Features reqwest and ureq are mutually exclusive and cannot be enabled together");
#[cfg(all(feature = "async-interface", feature = "electrum"))]
compile_error!(
"Features async-interface and electrum are mutually exclusive and cannot be enabled together"
);
#[cfg(all(feature = "async-interface", feature = "ureq"))]
compile_error!(
"Features async-interface and ureq are mutually exclusive and cannot be enabled together"
);
#[cfg(all(feature = "async-interface", feature = "compact_filters"))]
compile_error!(
"Features async-interface and compact_filters are mutually exclusive and cannot be enabled together"
);
#[cfg(feature = "keys-bip39")]
extern crate bip39;
@@ -241,12 +228,12 @@ pub extern crate bitcoincore_rpc;
#[cfg(feature = "electrum")]
pub extern crate electrum_client;
#[cfg(feature = "esplora")]
pub extern crate reqwest;
#[cfg(feature = "key-value-db")]
pub extern crate sled;
#[cfg(feature = "sqlite")]
pub extern crate rusqlite;
#[allow(unused_imports)]
#[macro_use]
pub(crate) mod error;

View File

@@ -43,8 +43,8 @@ impl PsbtUtils for Psbt {
mod test {
use crate::bitcoin::TxIn;
use crate::psbt::Psbt;
use crate::wallet::test::{get_funded_wallet, get_test_wpkh};
use crate::wallet::AddressIndex;
use crate::wallet::{get_funded_wallet, test::get_test_wpkh};
use crate::SignOptions;
use std::str::FromStr;

View File

@@ -10,7 +10,7 @@ use electrsd::bitcoind::BitcoinD;
use electrsd::{bitcoind, ElectrsD};
pub use electrum_client::{Client as ElectrumClient, ElectrumApi};
#[allow(unused_imports)]
use log::{debug, error, info, log_enabled, trace, Level};
use log::{debug, error, info, trace};
use std::collections::HashMap;
use std::env;
use std::ops::Deref;
@@ -24,16 +24,11 @@ pub struct TestClient {
impl TestClient {
pub fn new(bitcoind_exe: String, electrs_exe: String) -> Self {
debug!("launching {} and {}", &bitcoind_exe, &electrs_exe);
let bitcoind = BitcoinD::new(bitcoind_exe).unwrap();
let mut conf = bitcoind::Conf::default();
conf.view_stdout = log_enabled!(Level::Debug);
let bitcoind = BitcoinD::with_conf(bitcoind_exe, &conf).unwrap();
let http_enabled = cfg!(feature = "test-esplora");
let mut conf = electrsd::Conf::default();
conf.view_stderr = log_enabled!(Level::Debug);
conf.http_enabled = cfg!(feature = "test-esplora");
let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &conf).unwrap();
let electrsd = ElectrsD::new(electrs_exe, &bitcoind, false, http_enabled).unwrap();
let node_address = bitcoind.client.get_new_address(None, None).unwrap();
bitcoind

View File

@@ -100,8 +100,8 @@ impl TranslateDescriptor for Descriptor<DescriptorPublicKey> {
#[macro_export]
macro_rules! testutils {
( @external $descriptors:expr, $child:expr ) => ({
use $crate::bitcoin::secp256k1::Secp256k1;
use $crate::miniscript::descriptor::{Descriptor, DescriptorPublicKey, DescriptorTrait};
use bitcoin::secp256k1::Secp256k1;
use miniscript::descriptor::{Descriptor, DescriptorPublicKey, DescriptorTrait};
use $crate::testutils::TranslateDescriptor;
@@ -111,15 +111,15 @@ macro_rules! testutils {
parsed.derive_translated(&secp, $child).address(bitcoin::Network::Regtest).expect("No address form")
});
( @internal $descriptors:expr, $child:expr ) => ({
use $crate::bitcoin::secp256k1::Secp256k1;
use $crate::miniscript::descriptor::{Descriptor, DescriptorPublicKey, DescriptorTrait};
use bitcoin::secp256k1::Secp256k1;
use miniscript::descriptor::{Descriptor, DescriptorPublicKey, DescriptorTrait};
use $crate::testutils::TranslateDescriptor;
let secp = Secp256k1::new();
let parsed = Descriptor::<DescriptorPublicKey>::parse_descriptor(&secp, &$descriptors.1.expect("Missing internal descriptor")).expect("Failed to parse descriptor in `testutils!(@internal)`").0;
parsed.derive_translated(&secp, $child).address($crate::bitcoin::Network::Regtest).expect("No address form")
parsed.derive_translated(&secp, $child).address(bitcoin::Network::Regtest).expect("No address form")
});
( @e $descriptors:expr, $child:expr ) => ({ testutils!(@external $descriptors, $child) });
( @i $descriptors:expr, $child:expr ) => ({ testutils!(@internal $descriptors, $child) });
@@ -145,8 +145,8 @@ macro_rules! testutils {
let mut seed = [0u8; 32];
rand::thread_rng().fill(&mut seed[..]);
let key = $crate::bitcoin::util::bip32::ExtendedPrivKey::new_master(
$crate::bitcoin::Network::Testnet,
let key = bitcoin::util::bip32::ExtendedPrivKey::new_master(
bitcoin::Network::Testnet,
&seed,
);
@@ -158,13 +158,13 @@ macro_rules! testutils {
( @generate_wif ) => ({
use rand::Rng;
let mut key = [0u8; $crate::bitcoin::secp256k1::constants::SECRET_KEY_SIZE];
let mut key = [0u8; bitcoin::secp256k1::constants::SECRET_KEY_SIZE];
rand::thread_rng().fill(&mut key[..]);
($crate::bitcoin::PrivateKey {
(bitcoin::PrivateKey {
compressed: true,
network: $crate::bitcoin::Network::Testnet,
key: $crate::bitcoin::secp256k1::SecretKey::from_slice(&key).unwrap(),
network: bitcoin::Network::Testnet,
key: bitcoin::secp256k1::SecretKey::from_slice(&key).unwrap(),
}.to_string(), None::<String>, None::<String>)
});
@@ -181,8 +181,8 @@ macro_rules! testutils {
( @descriptors ( $external_descriptor:expr ) $( ( $internal_descriptor:expr ) )? $( ( @keys $( $keys:tt )* ) )* ) => ({
use std::str::FromStr;
use std::collections::HashMap;
use $crate::miniscript::descriptor::Descriptor;
use $crate::miniscript::TranslatePk;
use miniscript::descriptor::Descriptor;
use miniscript::TranslatePk;
#[allow(unused_assignments, unused_mut)]
let mut keys: HashMap<&'static str, (String, Option<String>, Option<String>)> = HashMap::new();

View File

@@ -10,7 +10,6 @@
// licenses.
use std::convert::AsRef;
use std::ops::Sub;
use bitcoin::blockdata::transaction::{OutPoint, Transaction, TxOut};
use bitcoin::{hash_types::Txid, util::psbt};
@@ -66,31 +65,10 @@ impl FeeRate {
FeeRate(1.0)
}
/// Calculate fee rate from `fee` and weight units (`wu`).
pub fn from_wu(fee: u64, wu: usize) -> FeeRate {
Self::from_vb(fee, wu.vbytes())
}
/// Calculate fee rate from `fee` and `vbytes`.
pub fn from_vb(fee: u64, vbytes: usize) -> FeeRate {
let rate = fee as f32 / vbytes as f32;
Self::from_sat_per_vb(rate)
}
/// Return the value as satoshi/vbyte
pub fn as_sat_vb(&self) -> f32 {
self.0
}
/// Calculate absolute fee in Satoshis using size in weight units.
pub fn fee_wu(&self, wu: usize) -> u64 {
self.fee_vb(wu.vbytes())
}
/// Calculate absolute fee in Satoshis using size in virtual bytes.
pub fn fee_vb(&self, vbytes: usize) -> u64 {
(self.as_sat_vb() * vbytes as f32).ceil() as u64
}
}
impl std::default::Default for FeeRate {
@@ -99,27 +77,6 @@ impl std::default::Default for FeeRate {
}
}
impl Sub for FeeRate {
type Output = Self;
fn sub(self, other: FeeRate) -> Self::Output {
FeeRate(self.0 - other.0)
}
}
/// Trait implemented by types that can be used to measure weight units.
pub trait Vbytes {
/// Convert weight units to virtual bytes.
fn vbytes(self) -> usize;
}
impl Vbytes for usize {
fn vbytes(self) -> usize {
// ref: https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki#transaction-size-calculations
(self as f32 / 4.0).ceil() as usize
}
}
/// An unspent output owned by a [`Wallet`].
///
/// [`Wallet`]: crate::Wallet

View File

@@ -115,8 +115,8 @@ mod test {
use std::sync::Arc;
use super::*;
use crate::wallet::test::{get_funded_wallet, get_test_wpkh};
use crate::wallet::AddressIndex::New;
use crate::wallet::{get_funded_wallet, test::get_test_wpkh};
#[derive(Debug)]
struct TestValidator;

View File

@@ -26,7 +26,7 @@
//! ```
//! # use std::str::FromStr;
//! # use bitcoin::*;
//! # use bdk::wallet::{self, coin_selection::*};
//! # use bdk::wallet::coin_selection::*;
//! # use bdk::database::Database;
//! # use bdk::*;
//! # const TXIN_BASE_WEIGHT: usize = (32 + 4 + 4 + 1) * 4;
@@ -41,7 +41,7 @@
//! optional_utxos: Vec<WeightedUtxo>,
//! fee_rate: FeeRate,
//! amount_needed: u64,
//! fee_amount: u64,
//! fee_amount: f32,
//! ) -> Result<CoinSelectionResult, bdk::Error> {
//! let mut selected_amount = 0;
//! let mut additional_weight = 0;
@@ -57,8 +57,9 @@
//! },
//! )
//! .collect::<Vec<_>>();
//! let additional_fees = fee_rate.fee_wu(additional_weight);
//! let amount_needed_with_fees = (fee_amount + additional_fees) + amount_needed;
//! let additional_fees = additional_weight as f32 * fee_rate.as_sat_vb() / 4.0;
//! let amount_needed_with_fees =
//! (fee_amount + additional_fees).ceil() as u64 + amount_needed;
//! if amount_needed_with_fees > selected_amount {
//! return Err(bdk::Error::InsufficientFunds {
//! needed: amount_needed_with_fees,
@@ -89,6 +90,7 @@
//! ```
use crate::types::FeeRate;
use crate::wallet::Vbytes;
use crate::{database::Database, WeightedUtxo};
use crate::{error::Error, Utxo};
@@ -116,7 +118,7 @@ pub struct CoinSelectionResult {
/// List of outputs selected for use as inputs
pub selected: Vec<Utxo>,
/// Total fee amount in satoshi
pub fee_amount: u64,
pub fee_amount: f32,
}
impl CoinSelectionResult {
@@ -163,7 +165,7 @@ pub trait CoinSelectionAlgorithm<D: Database>: std::fmt::Debug {
optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
fee_amount: u64,
fee_amount: f32,
) -> Result<CoinSelectionResult, Error>;
}
@@ -182,8 +184,10 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
mut optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
mut fee_amount: u64,
mut fee_amount: f32,
) -> Result<CoinSelectionResult, Error> {
let calc_fee_bytes = |wu| (wu as f32) * fee_rate.as_sat_vb() / 4.0;
log::debug!(
"amount_needed = `{}`, fee_amount = `{}`, fee_rate = `{:?}`",
amount_needed,
@@ -208,9 +212,9 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
.scan(
(&mut selected_amount, &mut fee_amount),
|(selected_amount, fee_amount), (must_use, weighted_utxo)| {
if must_use || **selected_amount < amount_needed + **fee_amount {
if must_use || **selected_amount < amount_needed + (fee_amount.ceil() as u64) {
**fee_amount +=
fee_rate.fee_wu(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
calc_fee_bytes(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
**selected_amount += weighted_utxo.utxo.txout().value;
log::debug!(
@@ -227,7 +231,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
)
.collect::<Vec<_>>();
let amount_needed_with_fees = amount_needed + fee_amount;
let amount_needed_with_fees = amount_needed + (fee_amount.ceil() as u64);
if selected_amount < amount_needed_with_fees {
return Err(Error::InsufficientFunds {
needed: amount_needed_with_fees,
@@ -247,15 +251,16 @@ impl<D: Database> CoinSelectionAlgorithm<D> for LargestFirstCoinSelection {
struct OutputGroup {
weighted_utxo: WeightedUtxo,
// Amount of fees for spending a certain utxo, calculated using a certain FeeRate
fee: u64,
fee: f32,
// The effective value of the UTXO, i.e., the utxo value minus the fee for spending it
effective_value: i64,
}
impl OutputGroup {
fn new(weighted_utxo: WeightedUtxo, fee_rate: FeeRate) -> Self {
let fee = fee_rate.fee_wu(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight);
let effective_value = weighted_utxo.utxo.txout().value as i64 - fee as i64;
let fee =
(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight).vbytes() * fee_rate.as_sat_vb();
let effective_value = weighted_utxo.utxo.txout().value as i64 - fee.ceil() as i64;
OutputGroup {
weighted_utxo,
fee,
@@ -298,7 +303,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
optional_utxos: Vec<WeightedUtxo>,
fee_rate: FeeRate,
amount_needed: u64,
fee_amount: u64,
fee_amount: f32,
) -> Result<CoinSelectionResult, Error> {
// Mapping every (UTXO, usize) to an output group
let required_utxos: Vec<OutputGroup> = required_utxos
@@ -320,7 +325,7 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
.iter()
.fold(0, |acc, x| acc + x.effective_value);
let actual_target = fee_amount + amount_needed;
let actual_target = fee_amount.ceil() as u64 + amount_needed;
let cost_of_change = self.size_of_change as f32 * fee_rate.as_sat_vb();
let expected = (curr_available_value + curr_value)
@@ -340,14 +345,6 @@ impl<D: Database> CoinSelectionAlgorithm<D> for BranchAndBoundCoinSelection {
.try_into()
.expect("Bitcoin amount to fit into i64");
if curr_value > actual_target {
return Ok(BranchAndBoundCoinSelection::calculate_cs_result(
vec![],
required_utxos,
fee_amount,
));
}
Ok(self
.bnb(
required_utxos.clone(),
@@ -381,7 +378,7 @@ impl BranchAndBoundCoinSelection {
mut curr_value: i64,
mut curr_available_value: i64,
actual_target: i64,
fee_amount: u64,
fee_amount: f32,
cost_of_change: f32,
) -> Result<CoinSelectionResult, Error> {
// current_selection[i] will contain true if we are using optional_utxos[i],
@@ -489,7 +486,7 @@ impl BranchAndBoundCoinSelection {
mut optional_utxos: Vec<OutputGroup>,
curr_value: i64,
actual_target: i64,
fee_amount: u64,
fee_amount: f32,
) -> CoinSelectionResult {
#[cfg(not(test))]
optional_utxos.shuffle(&mut thread_rng());
@@ -518,10 +515,10 @@ impl BranchAndBoundCoinSelection {
fn calculate_cs_result(
mut selected_utxos: Vec<OutputGroup>,
mut required_utxos: Vec<OutputGroup>,
mut fee_amount: u64,
mut fee_amount: f32,
) -> CoinSelectionResult {
selected_utxos.append(&mut required_utxos);
fee_amount += selected_utxos.iter().map(|u| u.fee).sum::<u64>();
fee_amount += selected_utxos.iter().map(|u| u.fee).sum::<f32>();
let selected = selected_utxos
.into_iter()
.map(|u| u.weighted_utxo.utxo)
@@ -543,7 +540,6 @@ mod test {
use super::*;
use crate::database::MemoryDatabase;
use crate::types::*;
use crate::wallet::Vbytes;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
@@ -551,33 +547,52 @@ mod test {
const P2WPKH_WITNESS_SIZE: usize = 73 + 33 + 2;
const FEE_AMOUNT: u64 = 50;
fn utxo(value: u64, index: u32) -> WeightedUtxo {
assert!(index < 10);
let outpoint = OutPoint::from_str(&format!(
"000000000000000000000000000000000000000000000000000000000000000{}:0",
index
))
.unwrap();
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint,
txout: TxOut {
value,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
}
}
const FEE_AMOUNT: f32 = 50.0;
fn get_test_utxos() -> Vec<WeightedUtxo> {
vec![
utxo(100_000, 0),
utxo(FEE_AMOUNT as u64 - 40, 1),
utxo(200_000, 2),
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000000:0",
)
.unwrap(),
txout: TxOut {
value: 100_000,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
},
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000001:0",
)
.unwrap(),
txout: TxOut {
value: FEE_AMOUNT as u64 - 40,
script_pubkey: Script::new(),
},
keychain: KeychainKind::External,
}),
},
WeightedUtxo {
satisfaction_weight: P2WPKH_WITNESS_SIZE,
utxo: Utxo::Local(LocalUtxo {
outpoint: OutPoint::from_str(
"0000000000000000000000000000000000000000000000000000000000000002:0",
)
.unwrap(),
txout: TxOut {
value: 200_000,
script_pubkey: Script::new(),
},
keychain: KeychainKind::Internal,
}),
},
]
}
@@ -641,13 +656,13 @@ mod test {
vec![],
FeeRate::from_sat_per_vb(1.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254)
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -662,13 +677,13 @@ mod test {
vec![],
FeeRate::from_sat_per_vb(1.0),
20_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -683,13 +698,13 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
20_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 1);
assert_eq!(result.selected_amount(), 200_000);
assert_eq!(result.fee_amount, 118);
assert!((result.fee_amount - 118.0).abs() < f32::EPSILON);
}
#[test]
@@ -705,7 +720,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
500_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -723,7 +738,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1000.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -743,13 +758,13 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_000);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -770,7 +785,7 @@ mod test {
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert_eq!(result.fee_amount, 254);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -791,38 +806,7 @@ mod test {
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300010);
assert_eq!(result.fee_amount, 254);
}
#[test]
fn test_bnb_coin_selection_required_not_enough() {
let utxos = get_test_utxos();
let database = MemoryDatabase::default();
let required = vec![utxos[0].clone()];
let mut optional = utxos[1..].to_vec();
optional.push(utxo(500_000, 3));
// Defensive assertions, for sanity and in case someone changes the test utxos vector.
let amount: u64 = required.iter().map(|u| u.utxo.txout().value).sum();
assert_eq!(amount, 100_000);
let amount: u64 = optional.iter().map(|u| u.utxo.txout().value).sum();
assert!(amount > 150_000);
let result = BranchAndBoundCoinSelection::default()
.coin_select(
&database,
required,
optional,
FeeRate::from_sat_per_vb(1.0),
150_000,
FEE_AMOUNT,
)
.unwrap();
assert_eq!(result.selected.len(), 3);
assert_eq!(result.selected_amount(), 300_010);
assert!((result.fee_amount as f32 - 254.0).abs() < f32::EPSILON);
assert!((result.fee_amount - 254.0).abs() < f32::EPSILON);
}
#[test]
@@ -838,7 +822,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
500_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -856,7 +840,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1000.0),
250_000,
FEE_AMOUNT,
50.0,
)
.unwrap();
}
@@ -873,7 +857,7 @@ mod test {
utxos,
FeeRate::from_sat_per_vb(1.0),
99932, // first utxo's effective value
0,
0.0,
)
.unwrap();
@@ -881,7 +865,7 @@ mod test {
assert_eq!(result.selected_amount(), 100_000);
let input_size = (TXIN_BASE_WEIGHT + P2WPKH_WITNESS_SIZE).vbytes();
let epsilon = 0.5;
assert!((1.0 - (result.fee_amount as f32 / input_size as f32)).abs() < epsilon);
assert!((1.0 - (result.fee_amount / input_size)).abs() < epsilon);
}
#[test]
@@ -900,7 +884,7 @@ mod test {
optional_utxos,
FeeRate::from_sat_per_vb(0.0),
target_amount,
0,
0.0,
)
.unwrap();
assert_eq!(result.selected_amount(), target_amount);
@@ -927,7 +911,7 @@ mod test {
0,
curr_available_value,
20_000,
FEE_AMOUNT,
50.0,
cost_of_change,
)
.unwrap();
@@ -954,7 +938,7 @@ mod test {
0,
curr_available_value,
20_000,
FEE_AMOUNT,
50.0,
cost_of_change,
)
.unwrap();
@@ -966,6 +950,7 @@ mod test {
let fee_rate = FeeRate::from_sat_per_vb(1.0);
let size_of_change = 31;
let cost_of_change = size_of_change as f32 * fee_rate.as_sat_vb();
let fee_amount = 50.0;
let utxos: Vec<_> = generate_same_value_utxos(50_000, 10)
.into_iter()
@@ -987,12 +972,12 @@ mod test {
curr_value,
curr_available_value,
target_amount,
FEE_AMOUNT,
fee_amount,
cost_of_change,
)
.unwrap();
assert!((result.fee_amount - 186.0).abs() < f32::EPSILON);
assert_eq!(result.selected_amount(), 100_000);
assert_eq!(result.fee_amount, 186);
}
// TODO: bnb() function should be optimized, and this test should be done with more utxos
@@ -1024,7 +1009,7 @@ mod test {
curr_value,
curr_available_value,
target_amount,
0,
0.0,
0.0,
)
.unwrap();
@@ -1050,10 +1035,12 @@ mod test {
utxos,
0,
target_amount as i64,
FEE_AMOUNT,
50.0,
);
assert!(result.selected_amount() > target_amount);
assert_eq!(result.fee_amount, (50 + result.selected.len() * 68) as u64);
assert!(
(result.fee_amount - (50.0 + result.selected.len() as f32 * 68.0)).abs() < f32::EPSILON
);
}
}

View File

@@ -18,7 +18,6 @@ use std::collections::HashMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::Arc;
use bitcoin::secp256k1::Secp256k1;
@@ -56,7 +55,6 @@ use tx_builder::{BumpFee, CreateTx, FeePolicy, TxBuilder, TxParams};
use utils::{check_nlocktime, check_nsequence_rbf, After, Older, SecpCtx, DUST_LIMIT_SATOSHI};
use crate::blockchain::{Blockchain, Progress};
use crate::database::memory::MemoryDatabase;
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::descriptor::derived::AsDerived;
use crate::descriptor::policy::BuildSatisfaction;
@@ -68,7 +66,6 @@ use crate::descriptor::{
use crate::error::Error;
use crate::psbt::PsbtUtils;
use crate::signer::SignerError;
use crate::testutils;
use crate::types::*;
const CACHE_ADDR_BATCH_SIZE: u32 = 100;
@@ -170,11 +167,6 @@ where
secp,
})
}
/// Get the Bitcoin network the wallet is using.
pub fn network(&self) -> Network {
self.network
}
}
/// The address index selection strategy to use to derived an address from the wallet's external
@@ -323,7 +315,7 @@ where
/// Return the list of unspent outputs of this wallet
///
/// Note that this method only operates on the internal database, which first needs to be
/// Note that this methods only operate on the internal database, which first needs to be
/// [`Wallet::sync`] manually.
pub fn list_unspent(&self) -> Result<Vec<LocalUtxo>, Error> {
self.database.borrow().iter_utxos()
@@ -335,21 +327,6 @@ where
self.database.borrow().get_utxo(&outpoint)
}
/// Return a single transactions made and received by the wallet
///
/// Optionally fill the [`TransactionDetails::transaction`] field with the raw transaction if
/// `include_raw` is `true`.
///
/// Note that this method only operates on the internal database, which first needs to be
/// [`Wallet::sync`] manually.
pub fn get_tx(
&self,
txid: &Txid,
include_raw: bool,
) -> Result<Option<TransactionDetails>, Error> {
self.database.borrow().get_tx(txid, include_raw)
}
/// Return the list of transactions made and received by the wallet
///
/// Optionally fill the [`TransactionDetails::transaction`] field with the raw transaction if
@@ -566,7 +543,7 @@ where
});
}
}
(FeeRate::from_sat_per_vb(0.0), *fee)
(FeeRate::from_sat_per_vb(0.0), *fee as f32)
}
FeePolicy::FeeRate(rate) => {
if let Some(previous_fee) = params.bumping_fee {
@@ -577,7 +554,7 @@ where
});
}
}
(*rate, 0)
(*rate, 0.0)
}
};
@@ -596,7 +573,8 @@ where
let mut outgoing: u64 = 0;
let mut received: u64 = 0;
fee_amount += fee_rate.fee_wu(tx.get_weight());
let calc_fee_bytes = |wu| (wu as f32) * fee_rate.as_sat_vb() / 4.0;
fee_amount += calc_fee_bytes(tx.get_weight());
let recipients = params.recipients.iter().map(|(r, v)| (r, *v));
@@ -613,7 +591,7 @@ where
script_pubkey: script_pubkey.clone(),
value,
};
fee_amount += fee_rate.fee_vb(serialize(&new_out).len());
fee_amount += calc_fee_bytes(serialize(&new_out).len() * 4);
tx.output.push(new_out);
@@ -671,8 +649,9 @@ where
}
};
fee_amount += fee_rate.fee_vb(serialize(&drain_output).len());
fee_amount += calc_fee_bytes(serialize(&drain_output).len() * 4);
let mut fee_amount = fee_amount.ceil() as u64;
let drain_val = (coin_selection.selected_amount() - outgoing).saturating_sub(fee_amount);
if tx.output.is_empty() {
@@ -775,10 +754,8 @@ where
return Err(Error::IrreplaceableTransaction);
}
let feerate = FeeRate::from_wu(
details.fee.ok_or(Error::FeeRateUnavailable)?,
tx.get_weight(),
);
let vbytes = tx.get_weight().vbytes();
let feerate = details.fee.ok_or(Error::FeeRateUnavailable)? as f32 / vbytes;
// remove the inputs from the tx and process them
let original_txin = tx.input.drain(..).collect::<Vec<_>>();
@@ -855,7 +832,7 @@ where
utxos: original_utxos,
bumping_fee: Some(tx_builder::PreviousFee {
absolute: details.fee.ok_or(Error::FeeRateUnavailable)?,
rate: feerate.as_sat_vb(),
rate: feerate,
}),
..Default::default()
};
@@ -1557,6 +1534,11 @@ where
&self.client
}
/// Get the Bitcoin network the wallet is using.
pub fn network(&self) -> Network {
self.network
}
/// Broadcast a transaction to the network
#[maybe_async]
pub fn broadcast(&self, tx: Transaction) -> Result<Txid, Error> {
@@ -1566,60 +1548,31 @@ where
}
}
/// Return a fake wallet that appears to be funded for testing.
pub fn get_funded_wallet(
descriptor: &str,
) -> (
Wallet<(), MemoryDatabase>,
(String, Option<String>),
bitcoin::Txid,
) {
let descriptors = testutils!(@descriptors (descriptor));
let wallet = Wallet::new_offline(
&descriptors.0,
None,
Network::Regtest,
MemoryDatabase::new(),
)
.unwrap();
/// Trait implemented by types that can be used to measure weight units.
pub trait Vbytes {
/// Convert weight units to virtual bytes.
fn vbytes(self) -> f32;
}
let funding_address_kix = 0;
let tx_meta = testutils! {
@tx ( (@external descriptors, funding_address_kix) => 50_000 ) (@confirmations 1)
};
wallet
.database
.borrow_mut()
.set_script_pubkey(
&bitcoin::Address::from_str(&tx_meta.output.get(0).unwrap().to_address)
.unwrap()
.script_pubkey(),
KeychainKind::External,
funding_address_kix,
)
.unwrap();
wallet
.database
.borrow_mut()
.set_last_index(KeychainKind::External, funding_address_kix)
.unwrap();
let txid = crate::populate_test_db!(wallet.database.borrow_mut(), tx_meta, Some(100));
(wallet, descriptors, txid)
impl Vbytes for usize {
fn vbytes(self) -> f32 {
self as f32 / 4.0
}
}
#[cfg(test)]
pub(crate) mod test {
use std::str::FromStr;
use bitcoin::{util::psbt, Network};
use crate::database::memory::MemoryDatabase;
use crate::database::Database;
use crate::types::KeychainKind;
use super::*;
use crate::signer::{SignOptions, SignerError};
use crate::testutils;
use crate::wallet::AddressIndex::{LastUnused, New, Peek, Reset};
#[test]
@@ -1731,6 +1684,50 @@ pub(crate) mod test {
"wsh(and_v(v:pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW),after(100000)))"
}
pub(crate) fn get_funded_wallet(
descriptor: &str,
) -> (
Wallet<(), MemoryDatabase>,
(String, Option<String>),
bitcoin::Txid,
) {
let descriptors = testutils!(@descriptors (descriptor));
let wallet = Wallet::new_offline(
&descriptors.0,
None,
Network::Regtest,
MemoryDatabase::new(),
)
.unwrap();
let funding_address_kix = 0;
let tx_meta = testutils! {
@tx ( (@external descriptors, funding_address_kix) => 50_000 ) (@confirmations 1)
};
wallet
.database
.borrow_mut()
.set_script_pubkey(
&bitcoin::Address::from_str(&tx_meta.output.get(0).unwrap().to_address)
.unwrap()
.script_pubkey(),
KeychainKind::External,
funding_address_kix,
)
.unwrap();
wallet
.database
.borrow_mut()
.set_last_index(KeychainKind::External, funding_address_kix)
.unwrap();
let txid = crate::populate_test_db!(wallet.database.borrow_mut(), tx_meta, Some(100));
(wallet, descriptors, txid)
}
macro_rules! assert_fee_rate {
($tx:expr, $fees:expr, $fee_rate:expr $( ,@dust_change $( $dust_change:expr )* )* $( ,@add_signature $( $add_signature:expr )* )* ) => ({
let mut tx = $tx.clone();
@@ -1749,13 +1746,13 @@ pub(crate) mod test {
dust_change = true;
)*
let tx_fee_rate = FeeRate::from_wu($fees, tx.get_weight());
let fee_rate = $fee_rate;
let tx_fee_rate = $fees as f32 / (tx.get_weight().vbytes());
let fee_rate = $fee_rate.as_sat_vb();
if !dust_change {
assert!((tx_fee_rate - fee_rate).as_sat_vb().abs() < 0.5, "Expected fee rate of {:?}, the tx has {:?}", fee_rate, tx_fee_rate);
assert!((tx_fee_rate - fee_rate).abs() < 0.5, "Expected fee rate of {}, the tx has {}", fee_rate, tx_fee_rate);
} else {
assert!(tx_fee_rate >= fee_rate, "Expected fee rate of at least {:?}, the tx has {:?}", fee_rate, tx_fee_rate);
assert!(tx_fee_rate >= fee_rate, "Expected fee rate of at least {}, the tx has {}", fee_rate, tx_fee_rate);
}
});
}