Compare commits
33 Commits
release/0.
...
compact_fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2da3755f4 | ||
|
|
6acb4d9796 | ||
|
|
377e5cdd49 | ||
|
|
70d2a0ee6b | ||
|
|
de1fc2a677 | ||
|
|
671d90e57c | ||
|
|
9480faa5d3 | ||
|
|
474620e6a5 | ||
|
|
a5919f4ab0 | ||
|
|
7e986fd904 | ||
|
|
77379e9262 | ||
|
|
ea699a6ec1 | ||
|
|
81c1ccb185 | ||
|
|
4f4802b0f3 | ||
|
|
bab9d99a00 | ||
|
|
22f4db0de1 | ||
|
|
a6ce75fa2d | ||
|
|
7597645ed6 | ||
|
|
618e0d3700 | ||
|
|
44d0e8d07c | ||
|
|
c1077b95cf | ||
|
|
e5d4994329 | ||
|
|
7109f7d9b4 | ||
|
|
f52fda4b4b | ||
|
|
a6be470fe4 | ||
|
|
8e41c4587d | ||
|
|
2ecae348ea | ||
|
|
f4ecfa0d49 | ||
|
|
696647b893 | ||
|
|
18dcda844f | ||
|
|
6394c3e209 | ||
|
|
42adad7dbd | ||
|
|
4498e0f7f8 |
41
.github/workflows/cont_integration.yml
vendored
41
.github/workflows/cont_integration.yml
vendored
@@ -77,28 +77,14 @@ jobs:
|
||||
|
||||
test-blockchains:
|
||||
name: Test ${{ matrix.blockchain.name }}
|
||||
runs-on: ubuntu-16.04
|
||||
runs-on: ubuntu-20.04
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
blockchain:
|
||||
- name: electrum
|
||||
container: bitcoindevkit/electrs:0.4.0
|
||||
start: /root/electrs --network regtest --cookie-file $GITHUB_WORKSPACE/.bitcoin/regtest/.cookie --jsonrpc-import
|
||||
- name: esplora
|
||||
container: bitcoindevkit/esplora:0.4.0
|
||||
start: /root/electrs --network regtest -vvv --daemon-dir $GITHUB_WORKSPACE/.bitcoin --jsonrpc-import --electrum-rpc-addr=0.0.0.0:60401 --http-addr 0.0.0.0:3002
|
||||
- name: rpc
|
||||
container: bitcoindevkit/electrs:0.4.0
|
||||
start: /root/electrs --network regtest --cookie-file $GITHUB_WORKSPACE/.bitcoin/regtest/.cookie --jsonrpc-import
|
||||
container: ${{ matrix.blockchain.container }}
|
||||
env:
|
||||
BDK_RPC_AUTH: COOKIEFILE
|
||||
BDK_RPC_COOKIEFILE: ${{ github.workspace }}/.bitcoin/regtest/.cookie
|
||||
BDK_RPC_URL: 127.0.0.1:18443
|
||||
BDK_RPC_WALLET: bdk-test
|
||||
BDK_ELECTRUM_URL: tcp://127.0.0.1:60401
|
||||
BDK_ESPLORA_URL: http://127.0.0.1:3002
|
||||
- name: esplora
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
@@ -107,25 +93,18 @@ jobs:
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
~/.cargo/bitcoin
|
||||
~/.cargo/electrs
|
||||
~/.cargo/git
|
||||
target
|
||||
key: ${{ runner.os }}-cargo-${{ github.job }}-${{ hashFiles('**/Cargo.toml','**/Cargo.lock') }}
|
||||
- name: get pkg-config # running esplora tests seems to need this
|
||||
run: apt update && apt install -y --fix-missing pkg-config libssl-dev
|
||||
- name: Install rustup
|
||||
run: curl https://sh.rustup.rs -sSf | sh -s -- -y
|
||||
- name: Set default toolchain
|
||||
run: $HOME/.cargo/bin/rustup default 1.53.0 # STABLE
|
||||
- name: Set profile
|
||||
run: $HOME/.cargo/bin/rustup set profile minimal
|
||||
- name: Update toolchain
|
||||
run: $HOME/.cargo/bin/rustup update
|
||||
- name: Start core
|
||||
run: ./ci/start-core.sh
|
||||
- name: start ${{ matrix.blockchain.name }}
|
||||
run: nohup ${{ matrix.blockchain.start }} & sleep 5
|
||||
- name: Setup rust toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
override: true
|
||||
- name: Test
|
||||
run: $HOME/.cargo/bin/cargo test --features test-${{ matrix.blockchain.name }},test-blockchains --no-default-features ${{ matrix.blockchain.name }}::bdk_blockchain_tests
|
||||
run: cargo test --features test-${{ matrix.blockchain.name }} ${{ matrix.blockchain.name }}::bdk_blockchain_tests
|
||||
|
||||
check-wasm:
|
||||
name: Check WASM
|
||||
|
||||
14
CHANGELOG.md
14
CHANGELOG.md
@@ -6,12 +6,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Wallet
|
||||
|
||||
- Removed and replaced `set_single_recipient` with more general `drain_to` and replaced `maintain_single_recipient` with `allow_shrinking`.
|
||||
|
||||
### Blockchain
|
||||
|
||||
- Removed `stop_gap` from `Blockchain` trait and added it to only `ElectrumBlockchain` and `EsploraBlockchain` structs
|
||||
|
||||
## [v0.9.0] - [v0.8.0]
|
||||
|
||||
### Wallet
|
||||
#### Added
|
||||
- Bitcoin core RPC added as blockchain backend
|
||||
- Add a `verify` feature that can be enable to verify the unconfirmed txs we download against the consensus rules
|
||||
|
||||
- Added Bitcoin core RPC added as blockchain backend
|
||||
- Added a `verify` feature that can be enable to verify the unconfirmed txs we download against the consensus rules
|
||||
|
||||
## [v0.8.0] - [v0.7.0]
|
||||
|
||||
|
||||
13
Cargo.toml
13
Cargo.toml
@@ -31,11 +31,11 @@ cc = { version = ">=1.0.64", optional = true }
|
||||
socks = { version = "0.3", optional = true }
|
||||
lazy_static = { version = "1.4", optional = true }
|
||||
tiny-bip39 = { version = "^0.8", optional = true }
|
||||
zeroize = { version = "<1.4.0", optional = true }
|
||||
bitcoinconsensus = { version = "0.19.0-3", optional = true }
|
||||
|
||||
# Needed by bdk_blockchain_tests macro
|
||||
bitcoincore-rpc = { version = "0.13", optional = true }
|
||||
serial_test = { version = "0.4", optional = true }
|
||||
|
||||
# Platform-specific dependencies
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
@@ -57,23 +57,22 @@ compact_filters = ["rocksdb", "socks", "lazy_static", "cc"]
|
||||
key-value-db = ["sled"]
|
||||
async-interface = ["async-trait"]
|
||||
all-keys = ["keys-bip39"]
|
||||
keys-bip39 = ["tiny-bip39"]
|
||||
keys-bip39 = ["tiny-bip39", "zeroize"]
|
||||
rpc = ["bitcoincore-rpc"]
|
||||
|
||||
|
||||
# Debug/Test features
|
||||
test-blockchains = ["bitcoincore-rpc", "electrum-client"]
|
||||
test-electrum = ["electrum"]
|
||||
test-rpc = ["rpc"]
|
||||
test-esplora = ["esplora"]
|
||||
test-electrum = ["electrum", "electrsd/electrs_0_8_10", "test-blockchains"]
|
||||
test-rpc = ["rpc", "electrsd/electrs_0_8_10", "test-blockchains"]
|
||||
test-esplora = ["esplora", "electrsd/legacy", "electrsd/esplora_a33e97e1", "test-blockchains"]
|
||||
test-md-docs = ["electrum"]
|
||||
|
||||
[dev-dependencies]
|
||||
lazy_static = "1.4"
|
||||
env_logger = "0.7"
|
||||
clap = "2.33"
|
||||
serial_test = "0.4"
|
||||
bitcoind = "0.10.0"
|
||||
electrsd = { version= "0.6", features = ["trigger", "bitcoind_0_21_1"] }
|
||||
|
||||
[[example]]
|
||||
name = "address_validator"
|
||||
|
||||
19
README.md
19
README.md
@@ -151,6 +151,25 @@ fn main() -> Result<(), bdk::Error> {
|
||||
}
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
### Unit testing
|
||||
|
||||
```
|
||||
cargo test
|
||||
```
|
||||
|
||||
### Integration testing
|
||||
|
||||
Integration testing require testing features, for example:
|
||||
|
||||
```
|
||||
cargo test --features test-electrum
|
||||
```
|
||||
|
||||
The other options are `test-esplora` or `test-rpc`.
|
||||
Note that `electrs` and `bitcoind` binaries are automatically downloaded (on mac and linux), to specify you already have installed binaries you must use `--no-default-features` and provide `BITCOIND_EXE` and `ELECTRS_EXE` as environment variables.
|
||||
|
||||
## License
|
||||
|
||||
Licensed under either of
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
usage() {
|
||||
cat <<'EOF'
|
||||
Script for running the bdk blockchain tests for a specific blockchain by starting up the backend in docker.
|
||||
|
||||
Usage: ./run_blockchain_tests.sh [esplora|electrum|rpc] [test name].
|
||||
|
||||
EOF
|
||||
}
|
||||
|
||||
eprintln(){
|
||||
echo "$@" >&2
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
if test "$id"; then
|
||||
eprintln "cleaning up $blockchain docker container $id";
|
||||
docker rm -fv "$id" > /dev/null;
|
||||
rm /tmp/regtest-"$id".cookie;
|
||||
fi
|
||||
trap - EXIT INT
|
||||
}
|
||||
|
||||
# Makes sure we clean up the container at the end or if ^C
|
||||
trap 'rc=$?; cleanup; exit $rc' EXIT INT
|
||||
|
||||
blockchain="$1"
|
||||
test_name="$2"
|
||||
|
||||
case "$blockchain" in
|
||||
electrum)
|
||||
eprintln "starting electrs docker container"
|
||||
id="$(docker run --detach -p 127.0.0.1:18443-18444:18443-18444/tcp -p 127.0.0.1:60401:60401/tcp bitcoindevkit/electrs:0.4.0)"
|
||||
;;
|
||||
esplora)
|
||||
eprintln "starting esplora docker container"
|
||||
id="$(docker run --detach -p 127.0.0.1:18443-18444:18443-18444/tcp -p 127.0.0.1:60401:60401/tcp -p 127.0.0.1:3002:3002/tcp bitcoindevkit/esplora:0.4.0)"
|
||||
export BDK_ESPLORA_URL=http://127.0.0.1:3002
|
||||
;;
|
||||
rpc)
|
||||
eprintln "starting bitcoind docker container (via electrs container)"
|
||||
id="$(docker run --detach -p 127.0.0.1:18443-18444:18443-18444/tcp -p 127.0.0.1:60401:60401/tcp bitcoindevkit/electrs:0.4.0)"
|
||||
;;
|
||||
*)
|
||||
usage;
|
||||
exit 1;
|
||||
;;
|
||||
esac
|
||||
|
||||
# taken from https://github.com/bitcoindevkit/bitcoin-regtest-box
|
||||
export BDK_RPC_AUTH=COOKIEFILE
|
||||
export BDK_RPC_COOKIEFILE=/tmp/regtest-"$id".cookie
|
||||
export BDK_RPC_URL=127.0.0.1:18443
|
||||
export BDK_RPC_WALLET=bdk-test
|
||||
export BDK_ELECTRUM_URL=tcp://127.0.0.1:60401
|
||||
|
||||
cli(){
|
||||
docker exec -it "$id" /root/bitcoin-cli -regtest -datadir=/root/.bitcoin $@
|
||||
}
|
||||
|
||||
#eprintln "running getwalletinfo until bitcoind seems to be alive"
|
||||
while ! cli getwalletinfo >/dev/null; do sleep 1; done
|
||||
|
||||
# sleep again for good measure!
|
||||
sleep 1;
|
||||
|
||||
# copy bitcoind cookie file to /tmp
|
||||
docker cp "$id":/root/.bitcoin/regtest/.cookie /tmp/regtest-"$id".cookie
|
||||
|
||||
cargo test --features "test-blockchains,test-$blockchain" --no-default-features "$blockchain::bdk_blockchain_tests::$test_name"
|
||||
@@ -39,7 +39,7 @@
|
||||
//!
|
||||
//! # #[cfg(feature = "esplora")]
|
||||
//! # {
|
||||
//! let esplora_blockchain = EsploraBlockchain::new("...", None);
|
||||
//! let esplora_blockchain = EsploraBlockchain::new("...", None, 20);
|
||||
//! let wallet_esplora: Wallet<AnyBlockchain, _> = Wallet::new(
|
||||
//! "...",
|
||||
//! None,
|
||||
@@ -126,31 +126,17 @@ impl Blockchain for AnyBlockchain {
|
||||
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(impl_inner_method!(
|
||||
self,
|
||||
setup,
|
||||
stop_gap,
|
||||
database,
|
||||
progress_update
|
||||
))
|
||||
maybe_await!(impl_inner_method!(self, setup, database, progress_update))
|
||||
}
|
||||
fn sync<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(impl_inner_method!(
|
||||
self,
|
||||
sync,
|
||||
stop_gap,
|
||||
database,
|
||||
progress_update
|
||||
))
|
||||
maybe_await!(impl_inner_method!(self, sync, database, progress_update))
|
||||
}
|
||||
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
@@ -188,7 +174,8 @@ impl_from!(compact_filters::CompactFiltersBlockchain, AnyBlockchain, CompactFilt
|
||||
/// r#"{
|
||||
/// "type" : "electrum",
|
||||
/// "url" : "ssl://electrum.blockstream.info:50002",
|
||||
/// "retry": 2
|
||||
/// "retry": 2,
|
||||
/// "stop_gap": 20
|
||||
/// }"#,
|
||||
/// )
|
||||
/// .unwrap();
|
||||
@@ -198,7 +185,8 @@ impl_from!(compact_filters::CompactFiltersBlockchain, AnyBlockchain, CompactFilt
|
||||
/// url: "ssl://electrum.blockstream.info:50002".into(),
|
||||
/// retry: 2,
|
||||
/// socks5: None,
|
||||
/// timeout: None
|
||||
/// timeout: None,
|
||||
/// stop_gap: 20,
|
||||
/// })
|
||||
/// );
|
||||
/// # }
|
||||
|
||||
764
src/blockchain/compact_filters/address_manager.rs
Normal file
764
src/blockchain/compact_filters/address_manager.rs
Normal file
@@ -0,0 +1,764 @@
|
||||
// Bitcoin Dev Kit
|
||||
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
|
||||
// John Cantrell <johncantrell97@protonmail.com>
|
||||
//
|
||||
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
|
||||
//
|
||||
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
|
||||
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
|
||||
// You may not use this file except in accordance with one or both of these
|
||||
// licenses.
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{
|
||||
mpsc::{channel, Receiver, SendError, Sender},
|
||||
Arc, RwLock,
|
||||
};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Error as SerdeError;
|
||||
|
||||
use super::{Mempool, Peer, PeerError};
|
||||
|
||||
use bitcoin::network::{
|
||||
constants::{Network, ServiceFlags},
|
||||
message::NetworkMessage,
|
||||
Address,
|
||||
};
|
||||
|
||||
/// Default address pool minimums
|
||||
const MIN_CBF_BUFFER: usize = 5;
|
||||
const MIN_NONCBF_BUFFER: usize = 5;
|
||||
|
||||
/// A Discovery structure used by workers
|
||||
///
|
||||
/// Discovery can be initiated via a cache,
|
||||
/// Or it will start with default hardcoded seeds
|
||||
pub struct AddressDiscovery {
|
||||
pending: VecDeque<SocketAddr>,
|
||||
visited: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressDiscovery {
|
||||
fn new(network: Network, seeds: VecDeque<SocketAddr>) -> AddressDiscovery {
|
||||
let mut network_seeds = AddressDiscovery::seeds(network);
|
||||
let mut total_seeds = seeds;
|
||||
total_seeds.append(&mut network_seeds);
|
||||
AddressDiscovery {
|
||||
pending: total_seeds,
|
||||
visited: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_pendings(&mut self, addresses: Vec<SocketAddr>) {
|
||||
for addr in addresses {
|
||||
if !self.pending.contains(&addr) && !self.visited.contains(&addr) {
|
||||
self.pending.push_back(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_next(&mut self) -> Option<SocketAddr> {
|
||||
match self.pending.pop_front() {
|
||||
None => None,
|
||||
Some(next) => {
|
||||
self.visited.insert(next);
|
||||
Some(next)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn seeds(network: Network) -> VecDeque<SocketAddr> {
|
||||
let mut seeds = VecDeque::new();
|
||||
|
||||
let port: u16 = match network {
|
||||
Network::Bitcoin => 8333,
|
||||
Network::Testnet => 18333,
|
||||
Network::Regtest => 18444,
|
||||
Network::Signet => 38333,
|
||||
};
|
||||
|
||||
let seedhosts: &[&str] = match network {
|
||||
Network::Bitcoin => &[
|
||||
"seed.bitcoin.sipa.be",
|
||||
"dnsseed.bluematt.me",
|
||||
"dnsseed.bitcoin.dashjr.org",
|
||||
"seed.bitcoinstats.com",
|
||||
"seed.bitcoin.jonasschnelli.ch",
|
||||
"seed.btc.petertodd.org",
|
||||
"seed.bitcoin.sprovoost.nl",
|
||||
"dnsseed.emzy.de",
|
||||
"seed.bitcoin.wiz.biz",
|
||||
],
|
||||
Network::Testnet => &[
|
||||
"testnet-seed.bitcoin.jonasschnelli.ch",
|
||||
"seed.tbtc.petertodd.org",
|
||||
"seed.testnet.bitcoin.sprovoost.nl",
|
||||
"testnet-seed.bluematt.me",
|
||||
],
|
||||
Network::Regtest => &[],
|
||||
Network::Signet => &[],
|
||||
};
|
||||
|
||||
for seedhost in seedhosts.iter() {
|
||||
if let Ok(lookup) = (*seedhost, port).to_socket_addrs() {
|
||||
for host in lookup {
|
||||
if host.is_ipv4() {
|
||||
seeds.push_back(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
seeds
|
||||
}
|
||||
}
|
||||
|
||||
/// Crawler structure that will interface with Discovery and public bitcoin network
|
||||
///
|
||||
/// Address manager will spawn multiple crawlers in separate threads to discover new addresses.
|
||||
struct AddressWorker {
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
}
|
||||
|
||||
impl AddressWorker {
|
||||
fn new(
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
) -> AddressWorker {
|
||||
AddressWorker {
|
||||
discovery,
|
||||
sender,
|
||||
network,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_receive_addr(&mut self, peer: &Peer) -> Result<(), AddressManagerError> {
|
||||
if let Some(NetworkMessage::Addr(new_addresses)) =
|
||||
peer.recv("addr", Some(Duration::from_secs(1)))?
|
||||
{
|
||||
self.consume_addr(new_addresses)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn consume_addr(&mut self, addrs: Vec<(u32, Address)>) -> Result<(), AddressManagerError> {
|
||||
let mut discovery_lock = self.discovery.write().map_err(PeerError::from)?;
|
||||
let mut addresses = Vec::new();
|
||||
for network_addrs in addrs {
|
||||
if let Ok(socket_addrs) = network_addrs.1.socket_addr() {
|
||||
addresses.push(socket_addrs);
|
||||
}
|
||||
}
|
||||
discovery_lock.add_pendings(addresses);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn work(&mut self) -> Result<(), AddressManagerError> {
|
||||
loop {
|
||||
let next_address = {
|
||||
let mut address_discovery = self.discovery.write()?;
|
||||
address_discovery.get_next()
|
||||
};
|
||||
|
||||
match next_address {
|
||||
Some(address) => {
|
||||
let potential_peer = Peer::connect_with_timeout(
|
||||
address,
|
||||
Duration::from_secs(1),
|
||||
Arc::new(Mempool::default()),
|
||||
self.network,
|
||||
);
|
||||
|
||||
if let Ok(peer) = potential_peer {
|
||||
peer.send(NetworkMessage::GetAddr)?;
|
||||
self.try_receive_addr(&peer)?;
|
||||
self.try_receive_addr(&peer)?;
|
||||
self.sender.send((address, peer.get_version().services))?;
|
||||
// TODO: Investigate why close is being called on non existent connections
|
||||
// currently the errors are ignored
|
||||
peer.close().unwrap_or(());
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A dedicated cache structure, with cbf/non_cbf separation
|
||||
///
|
||||
/// [AddressCache] will interface with file i/o
|
||||
/// And can te turned into seeds. Generation of seed will put previously cached
|
||||
/// cbf addresses at front of the vec, to boost up cbf node findings
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AddressCache {
|
||||
banned_peers: HashSet<SocketAddr>,
|
||||
cbf: HashSet<SocketAddr>,
|
||||
non_cbf: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressCache {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
banned_peers: HashSet::new(),
|
||||
cbf: HashSet::new(),
|
||||
non_cbf: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_file(path: &str) -> Result<Option<Self>, AddressManagerError> {
|
||||
let serialized: Result<String, _> = std::fs::read_to_string(path);
|
||||
let serialized = match serialized {
|
||||
Ok(contents) => contents,
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
|
||||
let address_cache = serde_json::from_str(&serialized)?;
|
||||
Ok(Some(address_cache))
|
||||
}
|
||||
|
||||
fn write_to_file(&self, path: &str) -> Result<(), AddressManagerError> {
|
||||
let serialized = serde_json::to_string_pretty(&self)?;
|
||||
|
||||
let mut cache_file = File::create(path)?;
|
||||
|
||||
cache_file.write_all(serialized.as_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_seeds(&self) -> VecDeque<SocketAddr> {
|
||||
self.cbf
|
||||
.iter()
|
||||
.chain(self.non_cbf.iter())
|
||||
.copied()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) -> bool {
|
||||
if cbf {
|
||||
self.cbf.remove(addrs)
|
||||
} else {
|
||||
self.non_cbf.remove(addrs)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_address(&mut self, addrs: SocketAddr, cbf: bool) -> bool {
|
||||
if cbf {
|
||||
self.cbf.insert(addrs)
|
||||
} else {
|
||||
self.non_cbf.insert(addrs)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_to_banlist(&mut self, addrs: SocketAddr, cbf: bool) {
|
||||
if self.banned_peers.insert(addrs) {
|
||||
self.remove_address(&addrs, cbf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Live directory maintained by [AddressManager] of freshly found cbf and non_cbf nodes by workers
|
||||
///
|
||||
/// Each instance of new [AddressManager] with have fresh [AddressDirectory]
|
||||
/// This is independent from the cache and will be an in-memory database to
|
||||
/// fetch addresses to the user.
|
||||
struct AddressDirectory {
|
||||
cbf_nodes: HashSet<SocketAddr>,
|
||||
non_cbf_nodes: HashSet<SocketAddr>,
|
||||
|
||||
// List of addresses it has previously provided to the caller (PeerManager)
|
||||
previously_sent: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressDirectory {
|
||||
fn new() -> AddressDirectory {
|
||||
AddressDirectory {
|
||||
cbf_nodes: HashSet::new(),
|
||||
non_cbf_nodes: HashSet::new(),
|
||||
previously_sent: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_address(&mut self, addr: SocketAddr, cbf: bool) {
|
||||
if cbf {
|
||||
self.cbf_nodes.insert(addr);
|
||||
} else {
|
||||
self.non_cbf_nodes.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_new_address(&mut self, cbf: bool) -> Option<SocketAddr> {
|
||||
if cbf {
|
||||
if let Some(new_addresses) = self
|
||||
.cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.collect::<Vec<&SocketAddr>>()
|
||||
.pop()
|
||||
{
|
||||
self.previously_sent.insert(*new_addresses);
|
||||
Some(*new_addresses)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else if let Some(new_addresses) = self
|
||||
.non_cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.collect::<Vec<&SocketAddr>>()
|
||||
.pop()
|
||||
{
|
||||
self.previously_sent.insert(*new_addresses);
|
||||
Some(*new_addresses)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_address_count(&self) -> usize {
|
||||
self.cbf_nodes.len()
|
||||
}
|
||||
|
||||
fn get_non_cbf_address_count(&self) -> usize {
|
||||
self.non_cbf_nodes.len()
|
||||
}
|
||||
|
||||
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) {
|
||||
if cbf {
|
||||
self.cbf_nodes.remove(addrs);
|
||||
} else {
|
||||
self.non_cbf_nodes.remove(addrs);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_buffer(&self) -> usize {
|
||||
self.cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.count()
|
||||
}
|
||||
|
||||
fn get_non_cbf_buffer(&self) -> usize {
|
||||
self.non_cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.count()
|
||||
}
|
||||
}
|
||||
|
||||
/// Discovery statistics, useful for logging
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct DiscoveryData {
|
||||
queued: usize,
|
||||
visited: usize,
|
||||
non_cbf_count: usize,
|
||||
cbf_count: usize,
|
||||
}
|
||||
|
||||
/// Progress trait for discovery statistics logging
|
||||
pub trait DiscoveryProgress {
|
||||
/// Update progress
|
||||
fn update(&self, data: DiscoveryData);
|
||||
}
|
||||
|
||||
/// Used when progress updates are not desired
|
||||
#[derive(Clone)]
|
||||
pub struct NoDiscoveryProgress;
|
||||
|
||||
impl DiscoveryProgress for NoDiscoveryProgress {
|
||||
fn update(&self, _data: DiscoveryData) {}
|
||||
}
|
||||
|
||||
/// Used to log progress update
|
||||
#[derive(Clone)]
|
||||
pub struct LogDiscoveryProgress;
|
||||
|
||||
impl DiscoveryProgress for LogDiscoveryProgress {
|
||||
fn update(&self, data: DiscoveryData) {
|
||||
log::trace!(
|
||||
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
|
||||
data.queued,
|
||||
data.visited,
|
||||
data.non_cbf_count,
|
||||
data.cbf_count
|
||||
);
|
||||
|
||||
#[cfg(test)]
|
||||
println!(
|
||||
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
|
||||
data.queued, data.visited, data.non_cbf_count, data.cbf_count
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// A manager structure managing address discovery
|
||||
///
|
||||
/// Manager will try to maintain a given address buffer in its directory
|
||||
/// buffer = len(exiting addresses) - len(previously provided addresses)
|
||||
/// Manager will crawl the network until buffer criteria is satisfied
|
||||
/// Manager will bootstrap workers from a cache, to speed up discovery progress in
|
||||
/// subsequent call after the first crawl.
|
||||
/// Manager will keep track of the cache and only update it if previously
|
||||
/// unknown addresses are found.
|
||||
pub struct AddressManager<P: DiscoveryProgress> {
|
||||
directory: AddressDirectory,
|
||||
cache_filename: String,
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
threads: usize,
|
||||
receiver: Receiver<(SocketAddr, ServiceFlags)>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
cbf_buffer: usize,
|
||||
non_cbf_buffer: usize,
|
||||
progress: P,
|
||||
}
|
||||
|
||||
impl<P: DiscoveryProgress> AddressManager<P> {
|
||||
/// Create a new manager. Initiate Discovery seeds from the cache
|
||||
/// if it exists, else start with hardcoded seeds
|
||||
pub fn new(
|
||||
network: Network,
|
||||
cache_filename: String,
|
||||
threads: usize,
|
||||
cbf_buffer: Option<usize>,
|
||||
non_cbf_buffer: Option<usize>,
|
||||
progress: P,
|
||||
) -> Result<AddressManager<P>, AddressManagerError> {
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
let seeds = match AddressCache::from_file(&cache_filename)? {
|
||||
Some(cache) => cache.make_seeds(),
|
||||
None => VecDeque::new(),
|
||||
};
|
||||
|
||||
let min_cbf = cbf_buffer.unwrap_or(MIN_CBF_BUFFER);
|
||||
|
||||
let min_non_cbf = non_cbf_buffer.unwrap_or(MIN_NONCBF_BUFFER);
|
||||
|
||||
let discovery = AddressDiscovery::new(network, seeds);
|
||||
|
||||
Ok(AddressManager {
|
||||
cache_filename,
|
||||
directory: AddressDirectory::new(),
|
||||
discovery: Arc::new(RwLock::new(discovery)),
|
||||
sender,
|
||||
receiver,
|
||||
network,
|
||||
threads,
|
||||
cbf_buffer: min_cbf,
|
||||
non_cbf_buffer: min_non_cbf,
|
||||
progress,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get running address discovery progress
|
||||
fn get_progress(&self) -> Result<DiscoveryData, AddressManagerError> {
|
||||
let (queued_count, visited_count) = {
|
||||
let address_discovery = self.discovery.read()?;
|
||||
(
|
||||
address_discovery.pending.len(),
|
||||
address_discovery.visited.len(),
|
||||
)
|
||||
};
|
||||
|
||||
let cbf_node_count = self.directory.get_cbf_address_count();
|
||||
let other_node_count = self.directory.get_non_cbf_address_count();
|
||||
|
||||
Ok(DiscoveryData {
|
||||
queued: queued_count,
|
||||
visited: visited_count,
|
||||
non_cbf_count: cbf_node_count + other_node_count,
|
||||
cbf_count: cbf_node_count,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn [self.thread] no. of worker threads
|
||||
fn spawn_workers(&mut self) -> Vec<JoinHandle<()>> {
|
||||
let mut worker_handles: Vec<JoinHandle<()>> = vec![];
|
||||
for _ in 0..self.threads {
|
||||
let sender = self.sender.clone();
|
||||
let discovery = self.discovery.clone();
|
||||
let network = self.network;
|
||||
let worker_handle = thread::spawn(move || {
|
||||
let mut worker = AddressWorker::new(discovery, sender, network);
|
||||
worker.work().unwrap();
|
||||
});
|
||||
worker_handles.push(worker_handle);
|
||||
}
|
||||
worker_handles
|
||||
}
|
||||
|
||||
/// Crawl the Bitcoin network until required number of cbf/non_cbf nodes are found
|
||||
///
|
||||
/// - This will start a bunch of crawlers.
|
||||
/// - load up the existing cache.
|
||||
/// - Update the cache with new found peers.
|
||||
/// - check if address is in banlist
|
||||
/// - run crawlers until buffer requirement is matched
|
||||
/// - flush the current cache into disk
|
||||
pub fn fetch(&mut self) -> Result<(), AddressManagerError> {
|
||||
self.spawn_workers();
|
||||
|
||||
// Get already existing cache
|
||||
let mut cache = match AddressCache::from_file(&self.cache_filename)? {
|
||||
Some(cache) => cache,
|
||||
None => AddressCache::empty(),
|
||||
};
|
||||
|
||||
while self.directory.get_cbf_buffer() < self.cbf_buffer
|
||||
|| self.directory.get_non_cbf_buffer() < self.non_cbf_buffer
|
||||
{
|
||||
if let Ok(message) = self.receiver.recv() {
|
||||
let (addr, flag) = message;
|
||||
if !cache.banned_peers.contains(&addr) {
|
||||
let cbf = flag.has(ServiceFlags::COMPACT_FILTERS);
|
||||
self.directory.add_address(addr, cbf);
|
||||
cache.add_address(addr, cbf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.progress.update(self.get_progress()?);
|
||||
|
||||
// When completed, flush the cache
|
||||
cache.write_to_file(&self.cache_filename)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a new addresses not previously provided
|
||||
pub fn get_new_cbf_address(&mut self) -> Option<SocketAddr> {
|
||||
self.directory.get_new_address(true)
|
||||
}
|
||||
|
||||
/// Get a new non_cbf address
|
||||
pub fn get_new_non_cbf_address(&mut self) -> Option<SocketAddr> {
|
||||
self.directory.get_new_address(false)
|
||||
}
|
||||
|
||||
/// Ban an address
|
||||
pub fn ban_peer(&mut self, addrs: &SocketAddr, cbf: bool) -> Result<(), AddressManagerError> {
|
||||
let mut cache = AddressCache::from_file(&self.cache_filename)?.ok_or_else(|| {
|
||||
AddressManagerError::Generic("Address Cache file not found".to_string())
|
||||
})?;
|
||||
|
||||
cache.add_to_banlist(*addrs, cbf);
|
||||
|
||||
// When completed, flush the cache
|
||||
cache.write_to_file(&self.cache_filename).unwrap();
|
||||
|
||||
self.directory.remove_address(addrs, cbf);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all the known CBF addresses
|
||||
pub fn get_known_cbfs(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.cbf_nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all the known regular addresses
|
||||
pub fn get_known_non_cbfs(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.non_cbf_nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get previously tried addresses
|
||||
pub fn get_previously_tried(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.previously_sent
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AddressManagerError {
|
||||
/// Std I/O Error
|
||||
Io(std::io::Error),
|
||||
|
||||
/// Internal Peer error
|
||||
Peer(PeerError),
|
||||
|
||||
/// Internal Mutex poisoning error
|
||||
MutexPoisoned,
|
||||
|
||||
/// Internal Mutex wait timed out
|
||||
MutexTimedOut,
|
||||
|
||||
/// Internal RW read lock poisoned
|
||||
RwReadLockPoisined,
|
||||
|
||||
/// Internal RW write lock poisoned
|
||||
RwWriteLockPoisoned,
|
||||
|
||||
/// Internal MPSC sending error
|
||||
MpscSendError,
|
||||
|
||||
/// Serde Json Error
|
||||
SerdeJson(SerdeError),
|
||||
|
||||
/// Generic Errors
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl_error!(PeerError, Peer, AddressManagerError);
|
||||
impl_error!(std::io::Error, Io, AddressManagerError);
|
||||
impl_error!(SerdeError, SerdeJson, AddressManagerError);
|
||||
|
||||
impl<T> From<PoisonError<MutexGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::RwWriteLockPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::RwReadLockPoisined
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for AddressManagerError {
|
||||
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
|
||||
let (_, wait_result) = err.into_inner();
|
||||
if wait_result.timed_out() {
|
||||
AddressManagerError::MutexTimedOut
|
||||
} else {
|
||||
AddressManagerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<SendError<T>> for AddressManagerError {
|
||||
fn from(_: SendError<T>) -> Self {
|
||||
AddressManagerError::MpscSendError
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_address_manager() {
|
||||
// Initiate a manager with an non existent cache file name.
|
||||
// It will create a new cache file
|
||||
let mut manager = AddressManager::new(
|
||||
Network::Bitcoin,
|
||||
"addr_cache".to_string(),
|
||||
20,
|
||||
None,
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// start the crawlers and time them
|
||||
println!("Starting manager and initial fetch");
|
||||
let start = std::time::Instant::now();
|
||||
manager.fetch().unwrap();
|
||||
let duration1 = start.elapsed();
|
||||
println!("Completed Initial fetch");
|
||||
|
||||
// Create a new manager from existing cache and fetch again
|
||||
let mut manager = AddressManager::new(
|
||||
Network::Bitcoin,
|
||||
"addr_cache".to_string(),
|
||||
20,
|
||||
None,
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// start the crawlers and time them
|
||||
println!("Starting new fetch with previous cache");
|
||||
let start = std::time::Instant::now();
|
||||
manager.fetch().unwrap();
|
||||
let duration2 = start.elapsed();
|
||||
println!("Completed new fetch()");
|
||||
|
||||
println!("Time taken for initial crawl: {:#?}", duration1);
|
||||
println!("Time taken for next crawl {:#?}", duration2);
|
||||
|
||||
// Check Buffer Management
|
||||
|
||||
println!("Checking buffer management");
|
||||
// Fetch few new address and ensure buffer goes to zero
|
||||
let mut addrs_list = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let addr_cbf = manager.get_new_cbf_address().unwrap();
|
||||
let addrs_non_cbf = manager.get_new_non_cbf_address().unwrap();
|
||||
|
||||
addrs_list.push(addr_cbf);
|
||||
|
||||
addrs_list.push(addrs_non_cbf);
|
||||
}
|
||||
|
||||
assert_eq!(addrs_list.len(), 10);
|
||||
|
||||
// This should exhaust the cbf buffer
|
||||
assert_eq!(manager.directory.get_cbf_buffer(), 0);
|
||||
|
||||
// Calling fetch again should start crawlers until buffer
|
||||
// requirements are matched.
|
||||
println!("Address buffer exhausted, starting new fetch");
|
||||
manager.fetch().unwrap();
|
||||
println!("Fetch Complete");
|
||||
// It should again have a cbf buffer of 5
|
||||
assert_eq!(manager.directory.get_cbf_buffer(), 5);
|
||||
|
||||
println!("Buffer management passed");
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,9 @@ use bitcoin::{Network, OutPoint, Transaction, Txid};
|
||||
|
||||
use rocksdb::{Options, SliceTransform, DB};
|
||||
|
||||
mod address_manager;
|
||||
mod peer;
|
||||
mod peermngr;
|
||||
mod store;
|
||||
mod sync;
|
||||
|
||||
@@ -77,8 +79,11 @@ use peer::*;
|
||||
use store::*;
|
||||
use sync::*;
|
||||
|
||||
// Only added to avoid unused warnings in addrsmngr module
|
||||
pub use address_manager::{
|
||||
AddressManager, DiscoveryProgress, LogDiscoveryProgress, NoDiscoveryProgress,
|
||||
};
|
||||
pub use peer::{Mempool, Peer};
|
||||
|
||||
const SYNC_HEADERS_COST: f32 = 1.0;
|
||||
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
|
||||
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
|
||||
@@ -229,7 +234,6 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
#[allow(clippy::mutex_atomic)] // Mutex is easier to understand than a CAS loop.
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
_stop_gap: Option<usize>, // TODO: move to electrum and esplora only
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
@@ -372,10 +376,10 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
database.commit_batch(updates)?;
|
||||
|
||||
match first_peer.ask_for_mempool() {
|
||||
Err(CompactFiltersError::PeerBloomDisabled) => {
|
||||
Err(PeerError::PeerBloomDisabled(_)) => {
|
||||
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
|
||||
}
|
||||
e => e?,
|
||||
e => e.map_err(CompactFiltersError::from)?,
|
||||
};
|
||||
|
||||
let mut internal_max_deriv = None;
|
||||
@@ -393,7 +397,12 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
)?;
|
||||
}
|
||||
}
|
||||
for tx in first_peer.get_mempool().iter_txs().iter() {
|
||||
for tx in first_peer
|
||||
.get_mempool()
|
||||
.iter_txs()
|
||||
.map_err(CompactFiltersError::from)?
|
||||
.iter()
|
||||
{
|
||||
self.process_tx(
|
||||
database,
|
||||
tx,
|
||||
@@ -436,11 +445,14 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
Ok(self.peers[0]
|
||||
.get_mempool()
|
||||
.get_tx(&Inventory::Transaction(*txid)))
|
||||
.get_tx(&Inventory::Transaction(*txid))
|
||||
.map_err(CompactFiltersError::from)?)
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
|
||||
self.peers[0].broadcast_tx(tx.clone())?;
|
||||
self.peers[0]
|
||||
.broadcast_tx(tx.clone())
|
||||
.map_err(CompactFiltersError::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -488,7 +500,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
|
||||
.peers
|
||||
.iter()
|
||||
.map(|peer_conf| match &peer_conf.socks5 {
|
||||
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
|
||||
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network)
|
||||
.map_err(CompactFiltersError::from),
|
||||
Some(proxy) => Peer::connect_proxy(
|
||||
peer_conf.address.as_str(),
|
||||
proxy,
|
||||
@@ -498,7 +511,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
|
||||
.map(|(a, b)| (a.as_str(), b.as_str())),
|
||||
Arc::clone(&mempool),
|
||||
config.network,
|
||||
),
|
||||
)
|
||||
.map_err(CompactFiltersError::from),
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
@@ -547,6 +561,9 @@ pub enum CompactFiltersError {
|
||||
|
||||
/// Wrapper for [`crate::error::Error`]
|
||||
Global(Box<crate::error::Error>),
|
||||
|
||||
/// Internal Peer Error
|
||||
Peer(PeerError),
|
||||
}
|
||||
|
||||
impl fmt::Display for CompactFiltersError {
|
||||
@@ -561,6 +578,7 @@ impl_error!(rocksdb::Error, Db, CompactFiltersError);
|
||||
impl_error!(std::io::Error, Io, CompactFiltersError);
|
||||
impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError);
|
||||
impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
|
||||
impl_error!(PeerError, Peer, CompactFiltersError);
|
||||
|
||||
impl From<crate::error::Error> for CompactFiltersError {
|
||||
fn from(err: crate::error::Error) -> Self {
|
||||
|
||||
@@ -10,11 +10,15 @@
|
||||
// licenses.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::fmt;
|
||||
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
|
||||
|
||||
use socks::{Socks5Stream, ToTargetAddr};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
@@ -30,8 +34,6 @@ use bitcoin::network::stream_reader::StreamReader;
|
||||
use bitcoin::network::Address;
|
||||
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
|
||||
|
||||
use super::CompactFiltersError;
|
||||
|
||||
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
|
||||
|
||||
pub(crate) const TIMEOUT_SECS: u64 = 30;
|
||||
@@ -65,17 +67,18 @@ impl Mempool {
|
||||
///
|
||||
/// Note that this doesn't propagate the transaction to other
|
||||
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
|
||||
pub fn add_tx(&self, tx: Transaction) {
|
||||
let mut guard = self.0.write().unwrap();
|
||||
pub fn add_tx(&self, tx: Transaction) -> Result<(), PeerError> {
|
||||
let mut guard = self.0.write()?;
|
||||
|
||||
guard.wtxids.insert(tx.wtxid(), tx.txid());
|
||||
guard.txs.insert(tx.txid(), tx);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look-up a transaction in the mempool given an [`Inventory`] request
|
||||
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
|
||||
pub fn get_tx(&self, inventory: &Inventory) -> Result<Option<Transaction>, PeerError> {
|
||||
let identifer = match inventory {
|
||||
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
|
||||
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return Ok(None),
|
||||
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
|
||||
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
|
||||
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
|
||||
@@ -85,27 +88,34 @@ impl Mempool {
|
||||
inv_type,
|
||||
hash
|
||||
);
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let txid = match identifer {
|
||||
TxIdentifier::Txid(txid) => Some(txid),
|
||||
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
|
||||
TxIdentifier::Wtxid(wtxid) => self.0.read()?.wtxids.get(&wtxid).cloned(),
|
||||
};
|
||||
|
||||
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
|
||||
.flatten()
|
||||
let result = match txid {
|
||||
Some(txid) => {
|
||||
let read_lock = self.0.read()?;
|
||||
read_lock.txs.get(&txid).cloned()
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return whether or not the mempool contains a transaction with a given txid
|
||||
pub fn has_tx(&self, txid: &Txid) -> bool {
|
||||
self.0.read().unwrap().txs.contains_key(txid)
|
||||
pub fn has_tx(&self, txid: &Txid) -> Result<bool, PeerError> {
|
||||
Ok(self.0.read()?.txs.contains_key(txid))
|
||||
}
|
||||
|
||||
/// Return the list of transactions contained in the mempool
|
||||
pub fn iter_txs(&self) -> Vec<Transaction> {
|
||||
self.0.read().unwrap().txs.values().cloned().collect()
|
||||
pub fn iter_txs(&self) -> Result<Vec<Transaction>, PeerError> {
|
||||
Ok(self.0.read()?.txs.values().cloned().collect())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,12 +143,31 @@ impl Peer {
|
||||
address: A,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let stream = TcpStream::connect(address)?;
|
||||
|
||||
Peer::from_stream(stream, mempool, network)
|
||||
}
|
||||
|
||||
/// Connect to a peer over a plaintext TCP connection with a timeout
|
||||
///
|
||||
/// This function behaves exactly the same as `connect` except for two differences
|
||||
/// 1) It assumes your ToSocketAddrs will resolve to a single address
|
||||
/// 2) It lets you specify a connection timeout
|
||||
pub fn connect_with_timeout<A: ToSocketAddrs>(
|
||||
address: A,
|
||||
timeout: Duration,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, PeerError> {
|
||||
let socket_addr = address
|
||||
.to_socket_addrs()?
|
||||
.next()
|
||||
.ok_or(PeerError::AddresseResolution)?;
|
||||
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
|
||||
Peer::from_stream(stream, mempool, network)
|
||||
}
|
||||
|
||||
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
|
||||
/// as a tuple of `(username, password)`
|
||||
///
|
||||
@@ -150,7 +179,7 @@ impl Peer {
|
||||
credentials: Option<(&str, &str)>,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let socks_stream = if let Some((username, password)) = credentials {
|
||||
Socks5Stream::connect_with_password(proxy, target, username, password)?
|
||||
} else {
|
||||
@@ -165,12 +194,12 @@ impl Peer {
|
||||
stream: TcpStream,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let writer = Arc::new(Mutex::new(stream.try_clone()?));
|
||||
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
let connected = Arc::new(RwLock::new(true));
|
||||
|
||||
let mut locked_writer = writer.lock().unwrap();
|
||||
let mut locked_writer = writer.lock()?;
|
||||
|
||||
let reader_thread_responses = Arc::clone(&responses);
|
||||
let reader_thread_writer = Arc::clone(&writer);
|
||||
@@ -185,6 +214,7 @@ impl Peer {
|
||||
reader_thread_mempool,
|
||||
reader_thread_connected,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
|
||||
@@ -209,18 +239,20 @@ impl Peer {
|
||||
0,
|
||||
)),
|
||||
)?;
|
||||
let version = if let NetworkMessage::Version(version) =
|
||||
Self::_recv(&responses, "version", None).unwrap()
|
||||
{
|
||||
version
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
|
||||
let version = match Self::_recv(&responses, "version", Some(Duration::from_secs(1)))? {
|
||||
Some(NetworkMessage::Version(version)) => version,
|
||||
_ => {
|
||||
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
|
||||
}
|
||||
};
|
||||
|
||||
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
|
||||
if let Some(NetworkMessage::Verack) =
|
||||
Self::_recv(&responses, "verack", Some(Duration::from_secs(1)))?
|
||||
{
|
||||
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
|
||||
}
|
||||
|
||||
std::mem::drop(locked_writer);
|
||||
@@ -236,19 +268,26 @@ impl Peer {
|
||||
})
|
||||
}
|
||||
|
||||
/// Close the peer connection
|
||||
// Consume Self
|
||||
pub fn close(self) -> Result<(), PeerError> {
|
||||
let locked_writer = self.writer.lock()?;
|
||||
Ok((*locked_writer).shutdown(std::net::Shutdown::Both)?)
|
||||
}
|
||||
|
||||
/// Get the socket address of the remote peer
|
||||
pub fn get_address(&self) -> Result<SocketAddr, PeerError> {
|
||||
let locked_writer = self.writer.lock()?;
|
||||
Ok(locked_writer.peer_addr()?)
|
||||
}
|
||||
|
||||
/// Send a Bitcoin network message
|
||||
fn _send(
|
||||
writer: &mut TcpStream,
|
||||
magic: u32,
|
||||
payload: NetworkMessage,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
fn _send(writer: &mut TcpStream, magic: u32, payload: NetworkMessage) -> Result<(), PeerError> {
|
||||
log::trace!("==> {:?}", payload);
|
||||
|
||||
let raw_message = RawNetworkMessage { magic, payload };
|
||||
|
||||
raw_message
|
||||
.consensus_encode(writer)
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?;
|
||||
raw_message.consensus_encode(writer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -258,30 +297,30 @@ impl Peer {
|
||||
responses: &Arc<RwLock<ResponsesMap>>,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Option<NetworkMessage> {
|
||||
) -> Result<Option<NetworkMessage>, PeerError> {
|
||||
let message_resp = {
|
||||
let mut lock = responses.write().unwrap();
|
||||
let mut lock = responses.write()?;
|
||||
let message_resp = lock.entry(wait_for).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
|
||||
let mut messages = lock.lock().unwrap();
|
||||
let mut messages = lock.lock()?;
|
||||
while messages.is_empty() {
|
||||
match timeout {
|
||||
None => messages = cvar.wait(messages).unwrap(),
|
||||
None => messages = cvar.wait(messages)?,
|
||||
Some(t) => {
|
||||
let result = cvar.wait_timeout(messages, t).unwrap();
|
||||
let result = cvar.wait_timeout(messages, t)?;
|
||||
if result.1.timed_out() {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
messages = result.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
messages.pop()
|
||||
Ok(messages.pop())
|
||||
}
|
||||
|
||||
/// Return the [`VersionMessage`] sent by the peer
|
||||
@@ -300,8 +339,8 @@ impl Peer {
|
||||
}
|
||||
|
||||
/// Return whether or not the peer is still connected
|
||||
pub fn is_connected(&self) -> bool {
|
||||
*self.connected.read().unwrap()
|
||||
pub fn is_connected(&self) -> Result<bool, PeerError> {
|
||||
Ok(*self.connected.read()?)
|
||||
}
|
||||
|
||||
/// Internal function called once the `reader_thread` is spawned
|
||||
@@ -312,14 +351,14 @@ impl Peer {
|
||||
reader_thread_writer: Arc<Mutex<TcpStream>>,
|
||||
reader_thread_mempool: Arc<Mempool>,
|
||||
reader_thread_connected: Arc<RwLock<bool>>,
|
||||
) {
|
||||
) -> Result<(), PeerError> {
|
||||
macro_rules! check_disconnect {
|
||||
($call:expr) => {
|
||||
match $call {
|
||||
Ok(good) => good,
|
||||
Err(e) => {
|
||||
log::debug!("Error {:?}", e);
|
||||
*reader_thread_connected.write().unwrap() = false;
|
||||
*reader_thread_connected.write()? = false;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -328,7 +367,7 @@ impl Peer {
|
||||
}
|
||||
|
||||
let mut reader = StreamReader::new(connection, None);
|
||||
loop {
|
||||
while *reader_thread_connected.read()? {
|
||||
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
|
||||
|
||||
let in_message = if raw_message.magic != network.magic() {
|
||||
@@ -342,7 +381,7 @@ impl Peer {
|
||||
match in_message {
|
||||
NetworkMessage::Ping(nonce) => {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::Pong(nonce),
|
||||
));
|
||||
@@ -353,19 +392,21 @@ impl Peer {
|
||||
NetworkMessage::GetData(ref inv) => {
|
||||
let (found, not_found): (Vec<_>, Vec<_>) = inv
|
||||
.iter()
|
||||
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
|
||||
.map(|item| (*item, reader_thread_mempool.get_tx(item).unwrap()))
|
||||
.partition(|(_, d)| d.is_some());
|
||||
for (_, found_tx) in found {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::Tx(found_tx.unwrap()),
|
||||
NetworkMessage::Tx(found_tx.ok_or_else(|| PeerError::Generic(
|
||||
"Got None while expecting Transaction".to_string()
|
||||
))?),
|
||||
));
|
||||
}
|
||||
|
||||
if !not_found.is_empty() {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::NotFound(
|
||||
not_found.into_iter().map(|(i, _)| i).collect(),
|
||||
@@ -377,21 +418,23 @@ impl Peer {
|
||||
}
|
||||
|
||||
let message_resp = {
|
||||
let mut lock = reader_thread_responses.write().unwrap();
|
||||
let mut lock = reader_thread_responses.write()?;
|
||||
let message_resp = lock.entry(in_message.cmd()).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
let mut messages = lock.lock().unwrap();
|
||||
let mut messages = lock.lock()?;
|
||||
messages.push(in_message);
|
||||
cvar.notify_all();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a raw Bitcoin message to the peer
|
||||
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
pub fn send(&self, payload: NetworkMessage) -> Result<(), PeerError> {
|
||||
let mut writer = self.writer.lock()?;
|
||||
Self::_send(&mut writer, self.network.magic(), payload)
|
||||
}
|
||||
|
||||
@@ -400,30 +443,27 @@ impl Peer {
|
||||
&self,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
|
||||
Ok(Self::_recv(&self.responses, wait_for, timeout))
|
||||
) -> Result<Option<NetworkMessage>, PeerError> {
|
||||
Self::_recv(&self.responses, wait_for, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait CompactFiltersPeer {
|
||||
fn get_cf_checkpt(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError>;
|
||||
fn get_cf_checkpt(&self, filter_type: u8, stop_hash: BlockHash)
|
||||
-> Result<CFCheckpt, PeerError>;
|
||||
fn get_cf_headers(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError>;
|
||||
) -> Result<CFHeaders, PeerError>;
|
||||
fn get_cf_filters(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError>;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
|
||||
) -> Result<(), PeerError>;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError>;
|
||||
}
|
||||
|
||||
impl CompactFiltersPeer for Peer {
|
||||
@@ -431,22 +471,20 @@ impl CompactFiltersPeer for Peer {
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError> {
|
||||
) -> Result<CFCheckpt, PeerError> {
|
||||
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
|
||||
filter_type,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = self.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFCheckpt(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFCheckpt(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(self.get_address()?));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
@@ -457,35 +495,31 @@ impl CompactFiltersPeer for Peer {
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError> {
|
||||
) -> Result<CFHeaders, PeerError> {
|
||||
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
|
||||
filter_type,
|
||||
start_height,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = self.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFHeaders(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFHeaders(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(self.get_address()?));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
|
||||
let response = self
|
||||
.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError> {
|
||||
let response = self.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFilter(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFilter(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
@@ -496,7 +530,7 @@ impl CompactFiltersPeer for Peer {
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
) -> Result<(), PeerError> {
|
||||
self.send(NetworkMessage::GetCFilters(GetCFilters {
|
||||
filter_type,
|
||||
start_height,
|
||||
@@ -508,13 +542,13 @@ impl CompactFiltersPeer for Peer {
|
||||
}
|
||||
|
||||
pub trait InvPeer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError>;
|
||||
fn ask_for_mempool(&self) -> Result<(), PeerError>;
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError>;
|
||||
}
|
||||
|
||||
impl InvPeer for Peer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError> {
|
||||
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
|
||||
block_hash,
|
||||
)]))?;
|
||||
@@ -522,51 +556,126 @@ impl InvPeer for Peer {
|
||||
match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
|
||||
None => Ok(None),
|
||||
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
|
||||
_ => Err(CompactFiltersError::InvalidResponse),
|
||||
_ => Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
}
|
||||
}
|
||||
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
|
||||
fn ask_for_mempool(&self) -> Result<(), PeerError> {
|
||||
if !self.version.services.has(ServiceFlags::BLOOM) {
|
||||
return Err(CompactFiltersError::PeerBloomDisabled);
|
||||
return Err(PeerError::PeerBloomDisabled(self.get_address()?));
|
||||
}
|
||||
|
||||
self.send(NetworkMessage::MemPool)?;
|
||||
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
|
||||
None => return Ok(()), // empty mempool
|
||||
Some(NetworkMessage::Inv(inv)) => inv,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
let getdata = inv
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(
|
||||
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
|
||||
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid).unwrap()),
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
let num_txs = getdata.len();
|
||||
self.send(NetworkMessage::GetData(getdata))?;
|
||||
|
||||
for _ in 0..num_txs {
|
||||
let tx = self
|
||||
.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let tx = self.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let tx = match tx {
|
||||
NetworkMessage::Tx(tx) => tx,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::Tx(tx)) => tx,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
self.mempool.add_tx(tx);
|
||||
self.mempool.add_tx(tx)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
|
||||
self.mempool.add_tx(tx.clone());
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError> {
|
||||
self.mempool.add_tx(tx.clone())?;
|
||||
self.send(NetworkMessage::Tx(tx))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Peer Errors
|
||||
#[derive(Debug)]
|
||||
pub enum PeerError {
|
||||
/// Internal I/O error
|
||||
Io(std::io::Error),
|
||||
|
||||
/// Internal system time error
|
||||
Time(std::time::SystemTimeError),
|
||||
|
||||
/// A peer sent an invalid or unexpected response
|
||||
InvalidResponse(SocketAddr),
|
||||
|
||||
/// Peer had bloom filter disabled
|
||||
PeerBloomDisabled(SocketAddr),
|
||||
|
||||
/// Internal Mutex poisoning error
|
||||
MutexPoisoned,
|
||||
|
||||
/// Internal Mutex wait timed out
|
||||
MutexTimedout,
|
||||
|
||||
/// Internal RW read lock poisoned
|
||||
RwReadLockPoisined,
|
||||
|
||||
/// Internal RW write lock poisoned
|
||||
RwWriteLockPoisoned,
|
||||
|
||||
/// Mempool Mutex poisoned
|
||||
MempoolPoisoned,
|
||||
|
||||
/// Network address resolution Error
|
||||
AddresseResolution,
|
||||
|
||||
/// Generic Errors
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PeerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for PeerError {}
|
||||
|
||||
impl_error!(std::io::Error, Io, PeerError);
|
||||
impl_error!(std::time::SystemTimeError, Time, PeerError);
|
||||
|
||||
impl<T> From<PoisonError<MutexGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
|
||||
PeerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
|
||||
PeerError::RwWriteLockPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
|
||||
PeerError::RwReadLockPoisined
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for PeerError {
|
||||
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
|
||||
let (_, wait_result) = err.into_inner();
|
||||
if wait_result.timed_out() {
|
||||
PeerError::MutexTimedout
|
||||
} else {
|
||||
PeerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
578
src/blockchain/compact_filters/peermngr.rs
Normal file
578
src/blockchain/compact_filters/peermngr.rs
Normal file
@@ -0,0 +1,578 @@
|
||||
use super::address_manager::{AddressManager, AddressManagerError, DiscoveryProgress};
|
||||
use super::peer::{Mempool, Peer, PeerError, TIMEOUT_SECS};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time;
|
||||
|
||||
use bitcoin::network::constants::{Network, ServiceFlags};
|
||||
use bitcoin::network::message::NetworkMessage;
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
// Peer Manager Configuration constants
|
||||
const MIN_CBF_PEERS: usize = 2;
|
||||
const MIN_TOTAL_PEERS: usize = 5;
|
||||
const MIN_CRAWLER_THREADS: usize = 20;
|
||||
const BAN_SCORE_THRESHOLD: usize = 100;
|
||||
const RECEIVE_TIMEOUT: time::Duration = time::Duration::from_secs(TIMEOUT_SECS);
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// An Error structure describing Peer Management errors
|
||||
#[derive(Debug)]
|
||||
pub enum PeerManagerError {
|
||||
// Internal Peer Error
|
||||
Peer(PeerError),
|
||||
|
||||
// Internal AddressManager Error
|
||||
AddrsManager(AddressManagerError),
|
||||
|
||||
// Os String Error
|
||||
OsString(std::ffi::OsString),
|
||||
|
||||
// Peer not found in directory
|
||||
PeerNotFound,
|
||||
|
||||
// Generic Internal Error
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl_error!(PeerError, Peer, PeerManagerError);
|
||||
impl_error!(AddressManagerError, AddrsManager, PeerManagerError);
|
||||
impl_error!(std::ffi::OsString, OsString, PeerManagerError);
|
||||
|
||||
/// Peer Data stored in the manager's directory
|
||||
#[derive(Debug)]
|
||||
struct PeerData {
|
||||
peer: Peer,
|
||||
is_cbf: bool,
|
||||
ban_score: usize,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// A Directory structure to hold live Peers
|
||||
/// All peers in the directory have live ongoing connection
|
||||
/// Banning a peer removes it from the directory
|
||||
#[derive(Default, Debug)]
|
||||
struct PeerDirectory {
|
||||
peers: BTreeMap<SocketAddr, PeerData>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PeerDirectory {
|
||||
fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn get_cbf_peers(&self) -> Option<Vec<&PeerData>> {
|
||||
let cbf_peers = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter(|(_, peer)| peer.is_cbf)
|
||||
.map(|(_, peer)| peer)
|
||||
.collect::<Vec<&PeerData>>();
|
||||
|
||||
match cbf_peers.len() {
|
||||
0 => None,
|
||||
_ => Some(cbf_peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
let cbf_addrseses = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(addrs, peerdata)| {
|
||||
if peerdata.is_cbf {
|
||||
Some(addrs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match cbf_addrseses.len() {
|
||||
0 => None,
|
||||
_ => Some(cbf_addrseses),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_peers(&self) -> Option<Vec<&PeerData>> {
|
||||
let non_cbf_peers = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&PeerData>>();
|
||||
|
||||
match non_cbf_peers.len() {
|
||||
0 => None,
|
||||
_ => Some(non_cbf_peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(addrs, peerdata)| {
|
||||
if !peerdata.is_cbf {
|
||||
Some(addrs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
|
||||
let peers = self
|
||||
.peers
|
||||
.iter_mut()
|
||||
.filter(|(_, peerdata)| peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&mut PeerData>>();
|
||||
|
||||
match peers.len() {
|
||||
0 => None,
|
||||
_ => Some(peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
|
||||
let peers = self
|
||||
.peers
|
||||
.iter_mut()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&mut PeerData>>();
|
||||
|
||||
match peers.len() {
|
||||
0 => None,
|
||||
_ => Some(peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_count(&self) -> usize {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| peerdata.is_cbf)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn get_non_cbf_count(&self) -> usize {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn insert_peer(&mut self, peerdata: PeerData) -> Result<(), PeerManagerError> {
|
||||
let addrs = peerdata.peer.get_address()?;
|
||||
self.peers.entry(addrs).or_insert(peerdata);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_peer(&mut self, addrs: &SocketAddr) -> Option<PeerData> {
|
||||
self.peers.remove(addrs)
|
||||
}
|
||||
|
||||
fn get_peer_banscore(&self, addrs: &SocketAddr) -> Option<usize> {
|
||||
self.peers.get(addrs).map(|peerdata| peerdata.ban_score)
|
||||
}
|
||||
|
||||
fn get_peerdata_mut(&mut self, address: &SocketAddr) -> Option<&mut PeerData> {
|
||||
self.peers.get_mut(address)
|
||||
}
|
||||
|
||||
fn get_peerdata(&self, address: &SocketAddr) -> Option<&PeerData> {
|
||||
self.peers.get(address)
|
||||
}
|
||||
|
||||
fn is_cbf(&self, addrs: &SocketAddr) -> Option<bool> {
|
||||
if let Some(peer) = self.peers.get(addrs) {
|
||||
match peer.is_cbf {
|
||||
true => Some(true),
|
||||
false => Some(false),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PeerManager<P: DiscoveryProgress> {
|
||||
addrs_mngr: AddressManager<P>,
|
||||
directory: PeerDirectory,
|
||||
mempool: Arc<Mempool>,
|
||||
min_cbf: usize,
|
||||
min_total: usize,
|
||||
network: Network,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<P: DiscoveryProgress> PeerManager<P> {
|
||||
pub fn init(
|
||||
network: Network,
|
||||
cache_dir: &str,
|
||||
crawler_threads: Option<usize>,
|
||||
progress: P,
|
||||
cbf_peers: Option<usize>,
|
||||
total_peers: Option<usize>,
|
||||
) -> Result<Self, PeerManagerError> {
|
||||
let mut cache_filename = PathBuf::from(cache_dir);
|
||||
cache_filename.push("addr_cache");
|
||||
|
||||
// Fetch minimum peer requirements, either by user input, or via default
|
||||
let min_cbf = cbf_peers.unwrap_or(MIN_CBF_PEERS);
|
||||
|
||||
let min_total = total_peers.unwrap_or(MIN_TOTAL_PEERS);
|
||||
|
||||
let cbf_buff = min_cbf * 2;
|
||||
let non_cbf_buff = (min_total - min_cbf) * 2;
|
||||
|
||||
// Create internal items
|
||||
let addrs_mngr = AddressManager::new(
|
||||
network,
|
||||
cache_filename.into_os_string().into_string()?,
|
||||
crawler_threads.unwrap_or(MIN_CRAWLER_THREADS),
|
||||
Some(cbf_buff),
|
||||
Some(non_cbf_buff),
|
||||
progress,
|
||||
)?;
|
||||
|
||||
let mempool = Arc::new(Mempool::new());
|
||||
|
||||
let peer_dir = PeerDirectory::new();
|
||||
|
||||
// Create self and update
|
||||
let mut manager = Self {
|
||||
addrs_mngr,
|
||||
directory: peer_dir,
|
||||
mempool,
|
||||
min_cbf,
|
||||
min_total,
|
||||
network,
|
||||
};
|
||||
|
||||
manager.update_directory()?;
|
||||
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
fn update_directory(&mut self) -> Result<(), PeerManagerError> {
|
||||
while self.directory.get_cbf_count() < self.min_cbf
|
||||
|| self.directory.get_non_cbf_count() < (self.min_total - self.min_cbf)
|
||||
{
|
||||
// First connect with cbf peers, then with non_cbf
|
||||
let cbf_fetch = self.directory.get_cbf_count() < self.min_cbf;
|
||||
|
||||
// Try to get an address
|
||||
// if not present start crawlers
|
||||
let target_addrs = match cbf_fetch {
|
||||
true => {
|
||||
if let Some(addrs) = self.addrs_mngr.get_new_cbf_address() {
|
||||
addrs
|
||||
} else {
|
||||
self.addrs_mngr.fetch()?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
false => {
|
||||
if let Some(addrs) = self.addrs_mngr.get_new_non_cbf_address() {
|
||||
addrs
|
||||
} else {
|
||||
self.addrs_mngr.fetch()?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(peer) = Peer::connect(target_addrs, Arc::clone(&self.mempool), self.network) {
|
||||
let address = peer.get_address()?;
|
||||
|
||||
assert_eq!(address, target_addrs);
|
||||
|
||||
let is_cbf = peer
|
||||
.get_version()
|
||||
.services
|
||||
.has(ServiceFlags::COMPACT_FILTERS);
|
||||
|
||||
let peerdata = PeerData {
|
||||
peer,
|
||||
is_cbf,
|
||||
ban_score: 0,
|
||||
};
|
||||
|
||||
self.directory.insert_peer(peerdata)?;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_banscore(
|
||||
&mut self,
|
||||
increase_by: usize,
|
||||
address: &SocketAddr,
|
||||
) -> Result<(), PeerManagerError> {
|
||||
let mut current_score = if let Some(peer) = self.directory.get_peerdata_mut(address) {
|
||||
peer.ban_score
|
||||
} else {
|
||||
return Err(PeerManagerError::PeerNotFound);
|
||||
};
|
||||
|
||||
current_score += increase_by;
|
||||
|
||||
let mut banned = false;
|
||||
|
||||
if current_score >= BAN_SCORE_THRESHOLD {
|
||||
match (
|
||||
self.directory.is_cbf(address),
|
||||
self.directory.remove_peer(address),
|
||||
) {
|
||||
(Some(true), Some(_)) => {
|
||||
self.addrs_mngr.ban_peer(address, true)?;
|
||||
banned = true;
|
||||
}
|
||||
(Some(false), Some(_)) => {
|
||||
self.addrs_mngr.ban_peer(address, false)?;
|
||||
banned = true;
|
||||
}
|
||||
_ => {
|
||||
return Err(PeerManagerError::Generic(
|
||||
"data inconsistency in directory, should not happen".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if banned {
|
||||
self.update_directory()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send_to(
|
||||
&self,
|
||||
address: &SocketAddr,
|
||||
message: NetworkMessage,
|
||||
) -> Result<(), PeerManagerError> {
|
||||
if let Some(peerdata) = self.directory.get_peerdata(address) {
|
||||
peerdata.peer.send(message)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(PeerManagerError::PeerNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive_from(
|
||||
&self,
|
||||
address: &SocketAddr,
|
||||
wait_for: &'static str,
|
||||
) -> Result<Option<NetworkMessage>, PeerManagerError> {
|
||||
if let Some(peerdata) = self.directory.get_peerdata(address) {
|
||||
if let Some(response) = peerdata.peer.recv(wait_for, Some(RECEIVE_TIMEOUT))? {
|
||||
Ok(Some(response))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Err(PeerManagerError::PeerNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connected_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.directory.get_cbf_addresses()
|
||||
}
|
||||
|
||||
pub fn connected_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.directory.get_non_cbf_addresses()
|
||||
}
|
||||
|
||||
pub fn known_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_known_cbfs()
|
||||
}
|
||||
|
||||
pub fn known_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_known_non_cbfs()
|
||||
}
|
||||
|
||||
pub fn previously_tried_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_previously_tried()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::super::LogDiscoveryProgress;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ban() {
|
||||
let mut manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let connected_cbfs = manager.connected_cbf_addresses().unwrap();
|
||||
let connected_non_cbfs = manager.connected_non_cbf_addresses().unwrap();
|
||||
|
||||
println!("Currently Connected CBFs: {:#?}", connected_cbfs);
|
||||
assert_eq!(connected_cbfs.len(), 2);
|
||||
assert_eq!(connected_non_cbfs.len(), 3);
|
||||
|
||||
let to_banned = &connected_cbfs[0];
|
||||
|
||||
println!("Banning address : {}", to_banned);
|
||||
|
||||
manager.set_banscore(100, to_banned).unwrap();
|
||||
|
||||
let newly_connected = manager.connected_cbf_addresses().unwrap();
|
||||
|
||||
println!("Newly Connected CBFs: {:#?}", newly_connected);
|
||||
|
||||
assert_eq!(newly_connected.len(), 2);
|
||||
|
||||
assert_ne!(newly_connected, connected_cbfs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_send_recv() {
|
||||
let manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let target_address = manager.connected_cbf_addresses().unwrap()[0];
|
||||
|
||||
let ping = NetworkMessage::Ping(30);
|
||||
|
||||
println!("Asking peer {}", target_address);
|
||||
|
||||
manager.send_to(&target_address, ping).unwrap();
|
||||
|
||||
let response = manager
|
||||
.receive_from(&target_address, "pong")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let value = value.unwrap();
|
||||
|
||||
println!("Got value {:#?}", value);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_connect_all() {
|
||||
let manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cbf_pings = vec![100u64; manager.min_cbf];
|
||||
let non_cbf_pings = vec![200u64; manager.min_total - manager.min_cbf];
|
||||
|
||||
let cbf_peers = manager.connected_cbf_addresses().unwrap();
|
||||
let non_cbf_peers = manager.connected_non_cbf_addresses().unwrap();
|
||||
|
||||
let sent_cbf: Vec<bool> = cbf_pings
|
||||
.iter()
|
||||
.zip(cbf_peers.iter())
|
||||
.map(|(ping, address)| {
|
||||
let message = NetworkMessage::Ping(*ping);
|
||||
manager.send_to(address, message).unwrap();
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(sent_cbf, vec![true; manager.min_cbf]);
|
||||
|
||||
println!("Sent pings to cbf peers");
|
||||
|
||||
let sent_noncbf: Vec<bool> = non_cbf_pings
|
||||
.iter()
|
||||
.zip(non_cbf_peers.iter())
|
||||
.map(|(ping, address)| {
|
||||
let message = NetworkMessage::Ping(*ping);
|
||||
manager.send_to(address, message).unwrap();
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(sent_noncbf, vec![true; manager.min_total - manager.min_cbf]);
|
||||
|
||||
println!("Sent pings to non cbf peers");
|
||||
|
||||
let cbf_received: Vec<u64> = cbf_peers
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let response = manager.receive_from(address, "pong").unwrap().unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
value.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let non_cbf_received: Vec<u64> = non_cbf_peers
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let response = manager.receive_from(address, "pong").unwrap().unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
value.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(cbf_pings, cbf_received);
|
||||
|
||||
assert_eq!(non_cbf_pings, non_cbf_received);
|
||||
}
|
||||
}
|
||||
@@ -43,11 +43,17 @@ use crate::FeeRate;
|
||||
///
|
||||
/// ## Example
|
||||
/// See the [`blockchain::electrum`](crate::blockchain::electrum) module for a usage example.
|
||||
pub struct ElectrumBlockchain(Client);
|
||||
pub struct ElectrumBlockchain {
|
||||
client: Client,
|
||||
stop_gap: usize,
|
||||
}
|
||||
|
||||
impl std::convert::From<Client> for ElectrumBlockchain {
|
||||
fn from(client: Client) -> Self {
|
||||
ElectrumBlockchain(client)
|
||||
ElectrumBlockchain {
|
||||
client,
|
||||
stop_gap: 20,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,34 +70,33 @@ impl Blockchain for ElectrumBlockchain {
|
||||
|
||||
fn setup<D: BatchDatabase, P: Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
self.0
|
||||
.electrum_like_setup(stop_gap, database, progress_update)
|
||||
self.client
|
||||
.electrum_like_setup(self.stop_gap, database, progress_update)
|
||||
}
|
||||
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
Ok(self.0.transaction_get(txid).map(Option::Some)?)
|
||||
Ok(self.client.transaction_get(txid).map(Option::Some)?)
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
|
||||
Ok(self.0.transaction_broadcast(tx).map(|_| ())?)
|
||||
Ok(self.client.transaction_broadcast(tx).map(|_| ())?)
|
||||
}
|
||||
|
||||
fn get_height(&self) -> Result<u32, Error> {
|
||||
// TODO: unsubscribe when added to the client, or is there a better call to use here?
|
||||
|
||||
Ok(self
|
||||
.0
|
||||
.client
|
||||
.block_headers_subscribe()
|
||||
.map(|data| data.height as u32)?)
|
||||
}
|
||||
|
||||
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
|
||||
Ok(FeeRate::from_btc_per_kvb(
|
||||
self.0.estimate_fee(target)? as f32
|
||||
self.client.estimate_fee(target)? as f32
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -149,6 +154,8 @@ pub struct ElectrumBlockchainConfig {
|
||||
pub retry: u8,
|
||||
/// Request timeout (seconds)
|
||||
pub timeout: Option<u8>,
|
||||
/// Stop searching addresses for transactions after finding an unused gap of this length
|
||||
pub stop_gap: usize,
|
||||
}
|
||||
|
||||
impl ConfigurableBlockchain for ElectrumBlockchain {
|
||||
@@ -162,16 +169,17 @@ impl ConfigurableBlockchain for ElectrumBlockchain {
|
||||
.socks5(socks5)?
|
||||
.build();
|
||||
|
||||
Ok(ElectrumBlockchain(Client::from_config(
|
||||
config.url.as_str(),
|
||||
electrum_config,
|
||||
)?))
|
||||
Ok(ElectrumBlockchain {
|
||||
client: Client::from_config(config.url.as_str(), electrum_config)?,
|
||||
stop_gap: config.stop_gap,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-blockchains")]
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "test-electrum")]
|
||||
crate::bdk_blockchain_tests! {
|
||||
fn test_instance() -> ElectrumBlockchain {
|
||||
ElectrumBlockchain::from(Client::new(&testutils::blockchain_tests::get_electrum_url()).unwrap())
|
||||
fn test_instance(test_client: &TestClient) -> ElectrumBlockchain {
|
||||
ElectrumBlockchain::from(Client::new(&test_client.electrsd.electrum_url).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,34 +18,32 @@
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # use bdk::blockchain::esplora::EsploraBlockchain;
|
||||
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", None);
|
||||
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/api", None, 20);
|
||||
//! # Ok::<(), bdk::Error>(())
|
||||
//! ```
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
|
||||
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace};
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
use reqwest::{Client, StatusCode};
|
||||
|
||||
use bitcoin::consensus::{self, deserialize, serialize};
|
||||
use bitcoin::hashes::hex::{FromHex, ToHex};
|
||||
use bitcoin::hashes::{sha256, Hash};
|
||||
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, Txid};
|
||||
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace};
|
||||
use reqwest::{Client, StatusCode};
|
||||
use serde::Deserialize;
|
||||
|
||||
use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
|
||||
use super::*;
|
||||
use crate::database::BatchDatabase;
|
||||
use crate::error::Error;
|
||||
use crate::wallet::utils::ChunksIterator;
|
||||
use crate::FeeRate;
|
||||
|
||||
use super::*;
|
||||
|
||||
use self::utils::{ElectrumLikeSync, ElsGetHistoryRes};
|
||||
|
||||
const DEFAULT_CONCURRENT_REQUESTS: u8 = 4;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -62,22 +60,31 @@ struct UrlClient {
|
||||
/// ## Example
|
||||
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
|
||||
#[derive(Debug)]
|
||||
pub struct EsploraBlockchain(UrlClient);
|
||||
pub struct EsploraBlockchain {
|
||||
url_client: UrlClient,
|
||||
stop_gap: usize,
|
||||
}
|
||||
|
||||
impl std::convert::From<UrlClient> for EsploraBlockchain {
|
||||
fn from(url_client: UrlClient) -> Self {
|
||||
EsploraBlockchain(url_client)
|
||||
EsploraBlockchain {
|
||||
url_client,
|
||||
stop_gap: 20,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EsploraBlockchain {
|
||||
/// Create a new instance of the client from a base URL
|
||||
pub fn new(base_url: &str, concurrency: Option<u8>) -> Self {
|
||||
EsploraBlockchain(UrlClient {
|
||||
url: base_url.to_string(),
|
||||
client: Client::new(),
|
||||
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENT_REQUESTS),
|
||||
})
|
||||
pub fn new(base_url: &str, concurrency: Option<u8>, stop_gap: usize) -> Self {
|
||||
EsploraBlockchain {
|
||||
url_client: UrlClient {
|
||||
url: base_url.to_string(),
|
||||
client: Client::new(),
|
||||
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENT_REQUESTS),
|
||||
},
|
||||
stop_gap,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,29 +102,28 @@ impl Blockchain for EsploraBlockchain {
|
||||
|
||||
fn setup<D: BatchDatabase, P: Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(self
|
||||
.0
|
||||
.electrum_like_setup(stop_gap, database, progress_update))
|
||||
.url_client
|
||||
.electrum_like_setup(self.stop_gap, database, progress_update))
|
||||
}
|
||||
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
Ok(await_or_block!(self.0._get_tx(txid))?)
|
||||
Ok(await_or_block!(self.url_client._get_tx(txid))?)
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
|
||||
Ok(await_or_block!(self.0._broadcast(tx))?)
|
||||
Ok(await_or_block!(self.url_client._broadcast(tx))?)
|
||||
}
|
||||
|
||||
fn get_height(&self) -> Result<u32, Error> {
|
||||
Ok(await_or_block!(self.0._get_height())?)
|
||||
Ok(await_or_block!(self.url_client._get_height())?)
|
||||
}
|
||||
|
||||
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error> {
|
||||
let estimates = await_or_block!(self.0._get_fee_estimates())?;
|
||||
let estimates = await_or_block!(self.url_client._get_fee_estimates())?;
|
||||
|
||||
let fee_val = estimates
|
||||
.into_iter()
|
||||
@@ -369,6 +375,8 @@ pub struct EsploraBlockchainConfig {
|
||||
pub base_url: String,
|
||||
/// Number of parallel requests sent to the esplora service (default: 4)
|
||||
pub concurrency: Option<u8>,
|
||||
/// Stop searching addresses for transactions after finding an unused gap of this length
|
||||
pub stop_gap: usize,
|
||||
}
|
||||
|
||||
impl ConfigurableBlockchain for EsploraBlockchain {
|
||||
@@ -378,6 +386,7 @@ impl ConfigurableBlockchain for EsploraBlockchain {
|
||||
Ok(EsploraBlockchain::new(
|
||||
config.base_url.as_str(),
|
||||
config.concurrency,
|
||||
config.stop_gap,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -415,9 +424,10 @@ impl_error!(std::num::ParseIntError, Parsing, EsploraError);
|
||||
impl_error!(consensus::encode::Error, BitcoinEncoding, EsploraError);
|
||||
impl_error!(bitcoin::hashes::hex::Error, Hex, EsploraError);
|
||||
|
||||
#[cfg(feature = "test-blockchains")]
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "test-esplora")]
|
||||
crate::bdk_blockchain_tests! {
|
||||
fn test_instance() -> EsploraBlockchain {
|
||||
EsploraBlockchain::new(std::env::var("BDK_ESPLORA_URL").unwrap_or("127.0.0.1:3002".into()).as_str(), None)
|
||||
fn test_instance(test_client: &TestClient) -> EsploraBlockchain {
|
||||
EsploraBlockchain::new(&format!("http://{}",test_client.electrsd.esplora_url.as_ref().unwrap()), None, 20)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ pub use self::electrum::ElectrumBlockchain;
|
||||
pub use self::electrum::ElectrumBlockchainConfig;
|
||||
|
||||
#[cfg(feature = "rpc")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "rpc")))]
|
||||
pub mod rpc;
|
||||
#[cfg(feature = "rpc")]
|
||||
pub use self::rpc::RpcBlockchain;
|
||||
@@ -92,7 +93,6 @@ pub trait Blockchain {
|
||||
/// [`Blockchain::sync`] defaults to calling this internally if not overridden.
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error>;
|
||||
@@ -117,11 +117,10 @@ pub trait Blockchain {
|
||||
/// [`BatchOperations::del_utxo`]: crate::database::BatchOperations::del_utxo
|
||||
fn sync<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(self.setup(stop_gap, database, progress_update))
|
||||
maybe_await!(self.setup(database, progress_update))
|
||||
}
|
||||
|
||||
/// Fetch a transaction from the blockchain given its txid
|
||||
@@ -217,20 +216,18 @@ impl<T: Blockchain> Blockchain for Arc<T> {
|
||||
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(self.deref().setup(stop_gap, database, progress_update))
|
||||
maybe_await!(self.deref().setup(database, progress_update))
|
||||
}
|
||||
|
||||
fn sync<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
maybe_await!(self.deref().sync(stop_gap, database, progress_update))
|
||||
maybe_await!(self.deref().sync(database, progress_update))
|
||||
}
|
||||
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
|
||||
@@ -106,7 +106,6 @@ impl Blockchain for RpcBlockchain {
|
||||
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
@@ -150,12 +149,11 @@ impl Blockchain for RpcBlockchain {
|
||||
|
||||
self.set_node_synced_height(current_height)?;
|
||||
|
||||
self.sync(stop_gap, database, progress_update)
|
||||
self.sync(database, progress_update)
|
||||
}
|
||||
|
||||
fn sync<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
_stop_gap: Option<usize>,
|
||||
db: &mut D,
|
||||
_progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
@@ -422,27 +420,14 @@ fn list_wallet_dir(client: &Client) -> Result<Vec<String>, Error> {
|
||||
Ok(result.wallets.into_iter().map(|n| n.name).collect())
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-blockchains")]
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "test-rpc")]
|
||||
crate::bdk_blockchain_tests! {
|
||||
|
||||
fn test_instance() -> RpcBlockchain {
|
||||
let url = std::env::var("BDK_RPC_URL").unwrap_or_else(|_| "127.0.0.1:18443".to_string());
|
||||
let url = format!("http://{}", url);
|
||||
|
||||
// TODO same code in `fn get_auth` in testutils, make it public there
|
||||
let auth = match std::env::var("BDK_RPC_AUTH").as_ref().map(String::as_ref) {
|
||||
Ok("USER_PASS") => Auth::UserPass(
|
||||
std::env::var("BDK_RPC_USER").unwrap(),
|
||||
std::env::var("BDK_RPC_PASS").unwrap(),
|
||||
),
|
||||
_ => Auth::CookieFile(std::path::PathBuf::from(
|
||||
std::env::var("BDK_RPC_COOKIEFILE")
|
||||
.unwrap_or_else(|_| "/home/user/.bitcoin/regtest/.cookie".to_string()),
|
||||
)),
|
||||
};
|
||||
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
|
||||
let config = RpcConfig {
|
||||
url,
|
||||
auth,
|
||||
url: test_client.bitcoind.rpc_url(),
|
||||
auth: Auth::CookieFile(test_client.bitcoind.params.cookie_file.clone()),
|
||||
network: Network::Regtest,
|
||||
wallet_name: format!("client-wallet-test-{:?}", std::time::SystemTime::now() ),
|
||||
skip_blocks: None,
|
||||
@@ -450,227 +435,3 @@ crate::bdk_blockchain_tests! {
|
||||
RpcBlockchain::from_config(&config).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-rpc")]
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::{RpcBlockchain, RpcConfig};
|
||||
use crate::bitcoin::consensus::deserialize;
|
||||
use crate::bitcoin::{Address, Amount, Network, Transaction};
|
||||
use crate::blockchain::rpc::wallet_name_from_descriptor;
|
||||
use crate::blockchain::{noop_progress, Blockchain, Capability, ConfigurableBlockchain};
|
||||
use crate::database::MemoryDatabase;
|
||||
use crate::wallet::AddressIndex;
|
||||
use crate::Wallet;
|
||||
use bitcoin::secp256k1::Secp256k1;
|
||||
use bitcoin::Txid;
|
||||
use bitcoincore_rpc::json::CreateRawTransactionInput;
|
||||
use bitcoincore_rpc::RawTx;
|
||||
use bitcoincore_rpc::{Auth, RpcApi};
|
||||
use bitcoind::BitcoinD;
|
||||
use std::collections::HashMap;
|
||||
|
||||
fn create_rpc(
|
||||
bitcoind: &BitcoinD,
|
||||
desc: &str,
|
||||
network: Network,
|
||||
) -> Result<RpcBlockchain, crate::Error> {
|
||||
let secp = Secp256k1::new();
|
||||
let wallet_name = wallet_name_from_descriptor(desc, None, network, &secp).unwrap();
|
||||
|
||||
let config = RpcConfig {
|
||||
url: bitcoind.rpc_url(),
|
||||
auth: Auth::CookieFile(bitcoind.config.cookie_file.clone()),
|
||||
network,
|
||||
wallet_name,
|
||||
skip_blocks: None,
|
||||
};
|
||||
RpcBlockchain::from_config(&config)
|
||||
}
|
||||
fn create_bitcoind(args: Vec<String>) -> BitcoinD {
|
||||
let exe = std::env::var("BITCOIND_EXE").unwrap();
|
||||
bitcoind::BitcoinD::with_args(exe, args, false, bitcoind::P2P::No).unwrap()
|
||||
}
|
||||
|
||||
const DESCRIPTOR_PUB: &'static str = "wpkh(tpubD6NzVbkrYhZ4X2yy78HWrr1M9NT8dKeWfzNiQqDdMqqa9UmmGztGGz6TaLFGsLfdft5iu32gxq1T4eMNxExNNWzVCpf9Y6JZi5TnqoC9wJq/*)";
|
||||
const DESCRIPTOR_PRIV: &'static str = "wpkh(tprv8ZgxMBicQKsPdZxBDUcvTSMEaLwCTzTc6gmw8KBKwa3BJzWzec4g6VUbQBHJcutDH6mMEmBeVyN27H1NF3Nu8isZ1Sts4SufWyfLE6Mf1MB/*)";
|
||||
|
||||
#[test]
|
||||
fn test_rpc_wallet_setup() {
|
||||
env_logger::try_init().unwrap();
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let node_address = bitcoind.client.get_new_address(None, None).unwrap();
|
||||
let blockchain = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
let db = MemoryDatabase::new();
|
||||
let wallet = Wallet::new(DESCRIPTOR_PRIV, None, Network::Regtest, db, blockchain).unwrap();
|
||||
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
generate(&bitcoind, 101);
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
let address = wallet.get_address(AddressIndex::New).unwrap();
|
||||
let expected_address = "bcrt1q8dyvgt4vhr8ald4xuwewcxhdjha9a5k78wxm5t";
|
||||
assert_eq!(expected_address, address.to_string());
|
||||
send_to_address(&bitcoind, &address, 100_000);
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 100_000);
|
||||
|
||||
let mut builder = wallet.build_tx();
|
||||
builder.add_recipient(node_address.script_pubkey(), 50_000);
|
||||
let (mut psbt, details) = builder.finish().unwrap();
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
let tx = psbt.extract_tx();
|
||||
wallet.broadcast(tx).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(
|
||||
wallet.get_balance().unwrap(),
|
||||
100_000 - 50_000 - details.fee.unwrap_or(0)
|
||||
);
|
||||
drop(wallet);
|
||||
|
||||
// test skip_blocks
|
||||
generate(&bitcoind, 5);
|
||||
let config = RpcConfig {
|
||||
url: bitcoind.rpc_url(),
|
||||
auth: Auth::CookieFile(bitcoind.config.cookie_file.clone()),
|
||||
network: Network::Regtest,
|
||||
wallet_name: "another-name".to_string(),
|
||||
skip_blocks: Some(103),
|
||||
};
|
||||
let blockchain_skip = RpcBlockchain::from_config(&config).unwrap();
|
||||
let db = MemoryDatabase::new();
|
||||
let wallet_skip =
|
||||
Wallet::new(DESCRIPTOR_PRIV, None, Network::Regtest, db, blockchain_skip).unwrap();
|
||||
wallet_skip.sync(noop_progress(), None).unwrap();
|
||||
send_to_address(&bitcoind, &address, 100_000);
|
||||
wallet_skip.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet_skip.get_balance().unwrap(), 100_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_from_config() {
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let blockchain = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest);
|
||||
assert!(blockchain.is_ok());
|
||||
let blockchain = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Testnet);
|
||||
assert!(blockchain.is_err(), "wrong network doesn't error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_capabilities_get_tx() {
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let rpc = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
let capabilities = rpc.get_capabilities();
|
||||
assert!(capabilities.contains(&Capability::FullHistory) && capabilities.len() == 1);
|
||||
let bitcoind_indexed = create_bitcoind(vec!["-txindex".to_string()]);
|
||||
let rpc_indexed = create_rpc(&bitcoind_indexed, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
assert_eq!(rpc_indexed.get_capabilities().len(), 3);
|
||||
let address = generate(&bitcoind_indexed, 101);
|
||||
let txid = send_to_address(&bitcoind_indexed, &address, 100_000);
|
||||
assert!(rpc_indexed.get_tx(&txid).unwrap().is_some());
|
||||
assert!(rpc.get_tx(&txid).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_estimate_fee_get_height() {
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let rpc = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
let result = rpc.estimate_fee(2);
|
||||
assert!(result.is_err());
|
||||
let address = generate(&bitcoind, 100);
|
||||
// create enough tx so that core give some fee estimation
|
||||
for _ in 0..15 {
|
||||
let _ = bitcoind.client.generate_to_address(1, &address).unwrap();
|
||||
for _ in 0..2 {
|
||||
send_to_address(&bitcoind, &address, 100_000);
|
||||
}
|
||||
}
|
||||
let result = rpc.estimate_fee(2);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(rpc.get_height().unwrap(), 115);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_node_synced_height() {
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let rpc = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
let synced_height = rpc.get_node_synced_height().unwrap();
|
||||
|
||||
assert_eq!(synced_height, 0);
|
||||
rpc.set_node_synced_height(1).unwrap();
|
||||
|
||||
let synced_height = rpc.get_node_synced_height().unwrap();
|
||||
assert_eq!(synced_height, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_broadcast() {
|
||||
let bitcoind = create_bitcoind(vec![]);
|
||||
let rpc = create_rpc(&bitcoind, DESCRIPTOR_PUB, Network::Regtest).unwrap();
|
||||
let address = generate(&bitcoind, 101);
|
||||
let utxo = bitcoind
|
||||
.client
|
||||
.list_unspent(None, None, None, None, None)
|
||||
.unwrap();
|
||||
let input = CreateRawTransactionInput {
|
||||
txid: utxo[0].txid,
|
||||
vout: utxo[0].vout,
|
||||
sequence: None,
|
||||
};
|
||||
|
||||
let out: HashMap<_, _> = vec![(
|
||||
address.to_string(),
|
||||
utxo[0].amount - Amount::from_sat(100_000),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let tx = bitcoind
|
||||
.client
|
||||
.create_raw_transaction(&[input], &out, None, None)
|
||||
.unwrap();
|
||||
let signed_tx = bitcoind
|
||||
.client
|
||||
.sign_raw_transaction_with_wallet(tx.raw_hex(), None, None)
|
||||
.unwrap();
|
||||
let parsed_tx: Transaction = deserialize(&signed_tx.hex).unwrap();
|
||||
rpc.broadcast(&parsed_tx).unwrap();
|
||||
assert!(bitcoind
|
||||
.client
|
||||
.get_raw_mempool()
|
||||
.unwrap()
|
||||
.contains(&tx.txid()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rpc_wallet_name() {
|
||||
let secp = Secp256k1::new();
|
||||
let name =
|
||||
wallet_name_from_descriptor(DESCRIPTOR_PUB, None, Network::Regtest, &secp).unwrap();
|
||||
assert_eq!("tmg7aqay", name);
|
||||
}
|
||||
|
||||
fn generate(bitcoind: &BitcoinD, blocks: u64) -> Address {
|
||||
let address = bitcoind.client.get_new_address(None, None).unwrap();
|
||||
bitcoind
|
||||
.client
|
||||
.generate_to_address(blocks, &address)
|
||||
.unwrap();
|
||||
address
|
||||
}
|
||||
|
||||
fn send_to_address(bitcoind: &BitcoinD, address: &Address, amount: u64) -> Txid {
|
||||
bitcoind
|
||||
.client
|
||||
.send_to_address(
|
||||
&address,
|
||||
Amount::from_sat(amount),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ pub trait ElectrumLikeSync {
|
||||
|
||||
fn electrum_like_setup<D: BatchDatabase, P: Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
stop_gap: usize,
|
||||
db: &mut D,
|
||||
_progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
@@ -61,7 +61,6 @@ pub trait ElectrumLikeSync {
|
||||
let start = Instant::new();
|
||||
debug!("start setup");
|
||||
|
||||
let stop_gap = stop_gap.unwrap_or(20);
|
||||
let chunk_size = stop_gap;
|
||||
|
||||
let mut history_txs_id = HashSet::new();
|
||||
|
||||
@@ -383,6 +383,7 @@ impl BatchDatabase for Tree {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use lazy_static::lazy_static;
|
||||
use std::sync::{Arc, Condvar, Mutex, Once};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
|
||||
@@ -24,10 +24,6 @@ pub enum Error {
|
||||
Generic(String),
|
||||
/// This error is thrown when trying to convert Bare and Public key script to address
|
||||
ScriptDoesntHaveAddressForm,
|
||||
/// Found multiple outputs when `single_recipient` option has been specified
|
||||
SingleRecipientMultipleOutputs,
|
||||
/// `single_recipient` option is selected but neither `drain_wallet` nor `manually_selected_only` are
|
||||
SingleRecipientNoInputs,
|
||||
/// Cannot build a tx without recipients
|
||||
NoRecipients,
|
||||
/// `manually_selected_only` option is selected but no utxo has been passed
|
||||
|
||||
@@ -235,12 +235,6 @@ pub extern crate reqwest;
|
||||
pub extern crate sled;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
#[cfg(test)]
|
||||
#[allow(unused_imports)]
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
pub extern crate serial_test;
|
||||
|
||||
#[macro_use]
|
||||
pub(crate) mod error;
|
||||
pub mod blockchain;
|
||||
|
||||
@@ -6,38 +6,49 @@ use bitcoin::{Address, Amount, Script, Transaction, Txid};
|
||||
pub use bitcoincore_rpc::bitcoincore_rpc_json::AddressType;
|
||||
pub use bitcoincore_rpc::{Auth, Client as RpcClient, RpcApi};
|
||||
use core::str::FromStr;
|
||||
use electrsd::bitcoind::BitcoinD;
|
||||
use electrsd::{bitcoind, ElectrsD};
|
||||
pub use electrum_client::{Client as ElectrumClient, ElectrumApi};
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace};
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct TestClient {
|
||||
client: RpcClient,
|
||||
electrum: ElectrumClient,
|
||||
pub bitcoind: BitcoinD,
|
||||
pub electrsd: ElectrsD,
|
||||
}
|
||||
|
||||
impl TestClient {
|
||||
pub fn new(rpc_host_and_wallet: String, rpc_wallet_name: String) -> Self {
|
||||
let client = RpcClient::new(
|
||||
format!("http://{}/wallet/{}", rpc_host_and_wallet, rpc_wallet_name),
|
||||
get_auth(),
|
||||
)
|
||||
.unwrap();
|
||||
let electrum = ElectrumClient::new(&get_electrum_url()).unwrap();
|
||||
pub fn new(bitcoind_exe: String, electrs_exe: String) -> Self {
|
||||
debug!("launching {} and {}", &bitcoind_exe, &electrs_exe);
|
||||
let bitcoind = BitcoinD::new(bitcoind_exe).unwrap();
|
||||
|
||||
TestClient { client, electrum }
|
||||
let http_enabled = cfg!(feature = "test-esplora");
|
||||
|
||||
let electrsd = ElectrsD::new(electrs_exe, &bitcoind, false, http_enabled).unwrap();
|
||||
|
||||
let node_address = bitcoind.client.get_new_address(None, None).unwrap();
|
||||
bitcoind
|
||||
.client
|
||||
.generate_to_address(101, &node_address)
|
||||
.unwrap();
|
||||
|
||||
let mut test_client = TestClient { bitcoind, electrsd };
|
||||
TestClient::wait_for_block(&mut test_client, 101);
|
||||
test_client
|
||||
}
|
||||
|
||||
fn wait_for_tx(&mut self, txid: Txid, monitor_script: &Script) {
|
||||
// wait for electrs to index the tx
|
||||
exponential_backoff_poll(|| {
|
||||
self.electrsd.trigger().unwrap();
|
||||
trace!("wait_for_tx {}", txid);
|
||||
|
||||
self.electrum
|
||||
self.electrsd
|
||||
.client
|
||||
.script_get_history(monitor_script)
|
||||
.unwrap()
|
||||
.iter()
|
||||
@@ -46,12 +57,13 @@ impl TestClient {
|
||||
}
|
||||
|
||||
fn wait_for_block(&mut self, min_height: usize) {
|
||||
self.electrum.block_headers_subscribe().unwrap();
|
||||
self.electrsd.client.block_headers_subscribe().unwrap();
|
||||
|
||||
loop {
|
||||
let header = exponential_backoff_poll(|| {
|
||||
self.electrum.ping().unwrap();
|
||||
self.electrum.block_headers_pop().unwrap()
|
||||
self.electrsd.trigger().unwrap();
|
||||
self.electrsd.client.ping().unwrap();
|
||||
self.electrsd.client.block_headers_pop().unwrap()
|
||||
});
|
||||
if header.height >= min_height {
|
||||
break;
|
||||
@@ -96,10 +108,13 @@ impl TestClient {
|
||||
.unwrap();
|
||||
|
||||
// broadcast through electrum so that it caches the tx immediately
|
||||
|
||||
let txid = self
|
||||
.electrum
|
||||
.electrsd
|
||||
.client
|
||||
.transaction_broadcast(&deserialize(&tx.hex).unwrap())
|
||||
.unwrap();
|
||||
debug!("broadcasted to electrum {}", txid);
|
||||
|
||||
if let Some(num) = meta_tx.min_confirmations {
|
||||
self.generate(num, None);
|
||||
@@ -209,7 +224,7 @@ impl TestClient {
|
||||
let block_hex: String = serialize(&block).to_hex();
|
||||
debug!("generated block hex: {}", block_hex);
|
||||
|
||||
self.electrum.block_headers_subscribe().unwrap();
|
||||
self.electrsd.client.block_headers_subscribe().unwrap();
|
||||
|
||||
let submit_result: serde_json::Value =
|
||||
self.call("submitblock", &[block_hex.into()]).unwrap();
|
||||
@@ -237,7 +252,7 @@ impl TestClient {
|
||||
}
|
||||
|
||||
pub fn invalidate(&mut self, num_blocks: u64) {
|
||||
self.electrum.block_headers_subscribe().unwrap();
|
||||
self.electrsd.client.block_headers_subscribe().unwrap();
|
||||
|
||||
let best_hash = self.get_best_block_hash().unwrap();
|
||||
let initial_height = self.get_block_info(&best_hash).unwrap().height;
|
||||
@@ -288,16 +303,25 @@ impl Deref for TestClient {
|
||||
type Target = RpcClient;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.client
|
||||
&self.bitcoind.client
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TestClient {
|
||||
fn default() -> Self {
|
||||
let rpc_host_and_port =
|
||||
env::var("BDK_RPC_URL").unwrap_or_else(|_| "127.0.0.1:18443".to_string());
|
||||
let wallet = env::var("BDK_RPC_WALLET").unwrap_or_else(|_| "bdk-test".to_string());
|
||||
Self::new(rpc_host_and_port, wallet)
|
||||
let bitcoind_exe = env::var("BITCOIND_EXE")
|
||||
.ok()
|
||||
.or(bitcoind::downloaded_exe_path())
|
||||
.expect(
|
||||
"you should provide env var BITCOIND_EXE or specifiy a bitcoind version feature",
|
||||
);
|
||||
let electrs_exe = env::var("ELECTRS_EXE")
|
||||
.ok()
|
||||
.or(electrsd::downloaded_exe_path())
|
||||
.expect(
|
||||
"you should provide env var ELECTRS_EXE or specifiy a electrsd version feature",
|
||||
);
|
||||
Self::new(bitcoind_exe, electrs_exe)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,27 +341,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: we currently only support env vars, we could also parse a toml file
|
||||
fn get_auth() -> Auth {
|
||||
match env::var("BDK_RPC_AUTH").as_ref().map(String::as_ref) {
|
||||
Ok("USER_PASS") => Auth::UserPass(
|
||||
env::var("BDK_RPC_USER").unwrap(),
|
||||
env::var("BDK_RPC_PASS").unwrap(),
|
||||
),
|
||||
_ => Auth::CookieFile(PathBuf::from(
|
||||
env::var("BDK_RPC_COOKIEFILE")
|
||||
.unwrap_or_else(|_| "/home/user/.bitcoin/regtest/.cookie".to_string()),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// This macro runs blockchain tests against a `Blockchain` implementation. It requires access to a
|
||||
/// Bitcoin core wallet via RPC. At the moment you have to dig into the code yourself and look at
|
||||
/// the setup required to run the tests yourself.
|
||||
#[macro_export]
|
||||
macro_rules! bdk_blockchain_tests {
|
||||
(
|
||||
fn test_instance() -> $blockchain:ty $block:block) => {
|
||||
fn $_fn_name:ident ( $( $test_client:ident : &TestClient )? $(,)? ) -> $blockchain:ty $block:block) => {
|
||||
#[cfg(test)]
|
||||
mod bdk_blockchain_tests {
|
||||
use $crate::bitcoin::Network;
|
||||
@@ -347,16 +357,17 @@ macro_rules! bdk_blockchain_tests {
|
||||
use $crate::types::KeychainKind;
|
||||
use $crate::{Wallet, FeeRate};
|
||||
use $crate::testutils;
|
||||
use $crate::serial_test::serial;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn get_blockchain() -> $blockchain {
|
||||
#[allow(unused_variables)]
|
||||
fn get_blockchain(test_client: &TestClient) -> $blockchain {
|
||||
$( let $test_client = test_client; )?
|
||||
$block
|
||||
}
|
||||
|
||||
fn get_wallet_from_descriptors(descriptors: &(String, Option<String>)) -> Wallet<$blockchain, MemoryDatabase> {
|
||||
Wallet::new(&descriptors.0.to_string(), descriptors.1.as_ref(), Network::Regtest, MemoryDatabase::new(), get_blockchain()).unwrap()
|
||||
fn get_wallet_from_descriptors(descriptors: &(String, Option<String>), test_client: &TestClient) -> Wallet<$blockchain, MemoryDatabase> {
|
||||
Wallet::new(&descriptors.0.to_string(), descriptors.1.as_ref(), Network::Regtest, MemoryDatabase::new(), get_blockchain(test_client)).unwrap()
|
||||
}
|
||||
|
||||
fn init_single_sig() -> (Wallet<$blockchain, MemoryDatabase>, (String, Option<String>), TestClient) {
|
||||
@@ -367,7 +378,7 @@ macro_rules! bdk_blockchain_tests {
|
||||
};
|
||||
|
||||
let test_client = TestClient::default();
|
||||
let wallet = get_wallet_from_descriptors(&descriptors);
|
||||
let wallet = get_wallet_from_descriptors(&descriptors, &test_client);
|
||||
|
||||
// rpc need to call import_multi before receiving any tx, otherwise will not see tx in the mempool
|
||||
#[cfg(feature = "test-rpc")]
|
||||
@@ -377,7 +388,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_simple() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -400,7 +410,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_stop_gap_20() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -418,7 +427,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_before_and_after_receive() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -436,7 +444,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_multiple_outputs_same_tx() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -458,7 +465,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_receive_multi() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -477,7 +483,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_address_reuse() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -497,7 +502,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_receive_rbf_replaced() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -536,7 +540,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
// doesn't work for some reason.
|
||||
#[cfg(not(feature = "esplora"))]
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_reorg_block() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -567,7 +570,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_after_send() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
println!("{}", descriptors.0);
|
||||
@@ -588,7 +590,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
let tx = psbt.extract_tx();
|
||||
println!("{}", bitcoin::consensus::encode::serialize_hex(&tx));
|
||||
wallet.broadcast(tx).unwrap();
|
||||
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received, "incorrect balance after send");
|
||||
|
||||
@@ -597,7 +598,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_update_confirmation_time_after_generate() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
println!("{}", descriptors.0);
|
||||
@@ -623,9 +623,7 @@ macro_rules! bdk_blockchain_tests {
|
||||
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_outgoing_from_scratch() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -648,7 +646,7 @@ macro_rules! bdk_blockchain_tests {
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received, "incorrect balance after receive");
|
||||
|
||||
// empty wallet
|
||||
let wallet = get_wallet_from_descriptors(&descriptors);
|
||||
let wallet = get_wallet_from_descriptors(&descriptors, &test_client);
|
||||
|
||||
#[cfg(feature = "rpc")] // rpc cannot see mempool tx before importmulti
|
||||
test_client.generate(1, Some(node_addr));
|
||||
@@ -667,7 +665,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_long_change_chain() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -698,7 +695,7 @@ macro_rules! bdk_blockchain_tests {
|
||||
|
||||
// empty wallet
|
||||
|
||||
let wallet = get_wallet_from_descriptors(&descriptors);
|
||||
let wallet = get_wallet_from_descriptors(&descriptors, &test_client);
|
||||
|
||||
#[cfg(feature = "rpc")] // rpc cannot see mempool tx before importmulti
|
||||
test_client.generate(1, Some(node_addr));
|
||||
@@ -709,7 +706,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_bump_fee_basic() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -745,7 +741,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_bump_fee_remove_change() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -781,7 +776,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_bump_fee_add_input_simple() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -815,7 +809,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_bump_fee_add_input_no_change() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
let node_addr = test_client.get_node_address(None);
|
||||
@@ -852,7 +845,6 @@ macro_rules! bdk_blockchain_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_sync_receive_coinbase() {
|
||||
let (wallet, _, mut test_client) = init_single_sig();
|
||||
|
||||
@@ -875,5 +867,10 @@ macro_rules! bdk_blockchain_tests {
|
||||
assert!(wallet.get_balance().unwrap() > 0, "incorrect balance after receiving coinbase");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
( fn $fn_name:ident ($( $tt:tt )+) -> $blockchain:ty $block:block) => {
|
||||
compile_error!(concat!("Invalid arguments `", stringify!($($tt)*), "` in the blockchain tests fn."));
|
||||
compile_error!("Only the exact `&TestClient` type is supported, **without** any leading path items.");
|
||||
};
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
// licenses.
|
||||
#![allow(missing_docs)]
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "test-blockchains")]
|
||||
pub mod blockchain_tests;
|
||||
|
||||
|
||||
@@ -160,7 +160,10 @@ pub struct TransactionDetails {
|
||||
pub received: u64,
|
||||
/// Sent value (sats)
|
||||
pub sent: u64,
|
||||
/// Fee value (sats) if available
|
||||
/// Fee value (sats) if available.
|
||||
/// The availability of the fee depends on the backend. It's never `None` with an Electrum
|
||||
/// Server backend, but it could be `None` with a Bitcoin RPC node without txindex that receive
|
||||
/// funds while offline.
|
||||
pub fee: Option<u64>,
|
||||
/// If the transaction is confirmed, contains height and timestamp of the block containing the
|
||||
/// transaction, unconfirmed transaction contains `None`.
|
||||
|
||||
@@ -90,6 +90,7 @@
|
||||
//! ```
|
||||
|
||||
use crate::types::FeeRate;
|
||||
use crate::wallet::Vbytes;
|
||||
use crate::{database::Database, WeightedUtxo};
|
||||
use crate::{error::Error, Utxo};
|
||||
|
||||
@@ -257,8 +258,8 @@ struct OutputGroup {
|
||||
|
||||
impl OutputGroup {
|
||||
fn new(weighted_utxo: WeightedUtxo, fee_rate: FeeRate) -> Self {
|
||||
let fee = (TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight) as f32 / 4.0
|
||||
* fee_rate.as_sat_vb();
|
||||
let fee =
|
||||
(TXIN_BASE_WEIGHT + weighted_utxo.satisfaction_weight).vbytes() * fee_rate.as_sat_vb();
|
||||
let effective_value = weighted_utxo.utxo.txout().value as i64 - fee.ceil() as i64;
|
||||
OutputGroup {
|
||||
weighted_utxo,
|
||||
@@ -862,7 +863,7 @@ mod test {
|
||||
|
||||
assert_eq!(result.selected.len(), 1);
|
||||
assert_eq!(result.selected_amount(), 100_000);
|
||||
let input_size = (TXIN_BASE_WEIGHT as f32) / 4.0 + P2WPKH_WITNESS_SIZE as f32 / 4.0;
|
||||
let input_size = (TXIN_BASE_WEIGHT + P2WPKH_WITNESS_SIZE).vbytes();
|
||||
let epsilon = 0.5;
|
||||
assert!((1.0 - (result.fee_amount / input_size)).abs() < epsilon);
|
||||
}
|
||||
|
||||
@@ -565,21 +565,6 @@ where
|
||||
output: vec![],
|
||||
};
|
||||
|
||||
let recipients = match ¶ms.single_recipient {
|
||||
Some(recipient) => vec![(recipient, 0)],
|
||||
None => params.recipients.iter().map(|(r, v)| (r, *v)).collect(),
|
||||
};
|
||||
if params.single_recipient.is_some()
|
||||
&& !params.manually_selected_only
|
||||
&& !params.drain_wallet
|
||||
&& params.bumping_fee.is_none()
|
||||
{
|
||||
return Err(Error::SingleRecipientNoInputs);
|
||||
}
|
||||
if recipients.is_empty() {
|
||||
return Err(Error::NoRecipients);
|
||||
}
|
||||
|
||||
if params.manually_selected_only && params.utxos.is_empty() {
|
||||
return Err(Error::NoUtxosSelected);
|
||||
}
|
||||
@@ -591,12 +576,12 @@ where
|
||||
let calc_fee_bytes = |wu| (wu as f32) * fee_rate.as_sat_vb() / 4.0;
|
||||
fee_amount += calc_fee_bytes(tx.get_weight());
|
||||
|
||||
for (index, (script_pubkey, satoshi)) in recipients.into_iter().enumerate() {
|
||||
let value = match params.single_recipient {
|
||||
Some(_) => 0,
|
||||
None if satoshi.is_dust() => return Err(Error::OutputBelowDustLimit(index)),
|
||||
None => satoshi,
|
||||
};
|
||||
let recipients = params.recipients.iter().map(|(r, v)| (r, *v));
|
||||
|
||||
for (index, (script_pubkey, value)) in recipients.enumerate() {
|
||||
if value.is_dust() {
|
||||
return Err(Error::OutputBelowDustLimit(index));
|
||||
}
|
||||
|
||||
if self.is_mine(script_pubkey)? {
|
||||
received += value;
|
||||
@@ -651,54 +636,47 @@ where
|
||||
})
|
||||
.collect();
|
||||
|
||||
// prepare the change output
|
||||
let change_output = match params.single_recipient {
|
||||
Some(_) => None,
|
||||
None => {
|
||||
let change_script = self.get_change_address()?;
|
||||
let change_output = TxOut {
|
||||
script_pubkey: change_script,
|
||||
value: 0,
|
||||
};
|
||||
// prepare the drain output
|
||||
let mut drain_output = {
|
||||
let script_pubkey = match params.drain_to {
|
||||
Some(ref drain_recipient) => drain_recipient.clone(),
|
||||
None => self.get_change_address()?,
|
||||
};
|
||||
|
||||
// take the change into account for fees
|
||||
fee_amount += calc_fee_bytes(serialize(&change_output).len() * 4);
|
||||
Some(change_output)
|
||||
TxOut {
|
||||
script_pubkey,
|
||||
value: 0,
|
||||
}
|
||||
};
|
||||
|
||||
fee_amount += calc_fee_bytes(serialize(&drain_output).len() * 4);
|
||||
|
||||
let mut fee_amount = fee_amount.ceil() as u64;
|
||||
let change_val = (coin_selection.selected_amount() - outgoing).saturating_sub(fee_amount);
|
||||
let drain_val = (coin_selection.selected_amount() - outgoing).saturating_sub(fee_amount);
|
||||
|
||||
match change_output {
|
||||
None if change_val.is_dust() => {
|
||||
// single recipient, but the only output would be below dust limit
|
||||
// TODO: or OutputBelowDustLimit?
|
||||
return Err(Error::InsufficientFunds {
|
||||
needed: DUST_LIMIT_SATOSHI,
|
||||
available: change_val,
|
||||
});
|
||||
}
|
||||
Some(_) if change_val.is_dust() => {
|
||||
// skip the change output because it's dust -- just include it in the fee.
|
||||
fee_amount += change_val;
|
||||
}
|
||||
Some(mut change_output) => {
|
||||
change_output.value = change_val;
|
||||
received += change_val;
|
||||
|
||||
tx.output.push(change_output);
|
||||
}
|
||||
None => {
|
||||
// there's only one output, send everything to it
|
||||
tx.output[0].value = change_val;
|
||||
|
||||
// the single recipient is our address
|
||||
if self.is_mine(&tx.output[0].script_pubkey)? {
|
||||
received = change_val;
|
||||
if tx.output.is_empty() {
|
||||
if params.drain_to.is_some() {
|
||||
if drain_val.is_dust() {
|
||||
return Err(Error::InsufficientFunds {
|
||||
needed: DUST_LIMIT_SATOSHI,
|
||||
available: drain_val,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
return Err(Error::NoRecipients);
|
||||
}
|
||||
}
|
||||
|
||||
if drain_val.is_dust() {
|
||||
fee_amount += drain_val;
|
||||
} else {
|
||||
drain_output.value = drain_val;
|
||||
if self.is_mine(&drain_output.script_pubkey)? {
|
||||
received += drain_val;
|
||||
}
|
||||
tx.output.push(drain_output);
|
||||
}
|
||||
|
||||
// sort input/outputs according to the chosen algorithm
|
||||
params.ordering.sort_tx(&mut tx);
|
||||
|
||||
@@ -725,10 +703,6 @@ where
|
||||
/// *repalce by fee* (RBF). If the transaction can be fee bumped then it returns a [`TxBuilder`]
|
||||
/// pre-populated with the inputs and outputs of the original transaction.
|
||||
///
|
||||
/// **NOTE**: if the original transaction was made with [`TxBuilder::set_single_recipient`],
|
||||
/// the [`TxBuilder::maintain_single_recipient`] flag should be enabled to correctly reduce the
|
||||
/// only output's value in order to increase the fees.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// ```no_run
|
||||
@@ -780,7 +754,7 @@ where
|
||||
return Err(Error::IrreplaceableTransaction);
|
||||
}
|
||||
|
||||
let vbytes = tx.get_weight() as f32 / 4.0;
|
||||
let vbytes = tx.get_weight().vbytes();
|
||||
let feerate = details.fee.ok_or(Error::FeeRateUnavailable)? as f32 / vbytes;
|
||||
|
||||
// remove the inputs from the tx and process them
|
||||
@@ -1526,17 +1500,13 @@ where
|
||||
// TODO: what if i generate an address first and cache some addresses?
|
||||
// TODO: we should sync if generating an address triggers a new batch to be stored
|
||||
if run_setup {
|
||||
maybe_await!(self.client.setup(
|
||||
None,
|
||||
self.database.borrow_mut().deref_mut(),
|
||||
progress_update,
|
||||
))?;
|
||||
maybe_await!(self
|
||||
.client
|
||||
.setup(self.database.borrow_mut().deref_mut(), progress_update,))?;
|
||||
} else {
|
||||
maybe_await!(self.client.sync(
|
||||
None,
|
||||
self.database.borrow_mut().deref_mut(),
|
||||
progress_update,
|
||||
))?;
|
||||
maybe_await!(self
|
||||
.client
|
||||
.sync(self.database.borrow_mut().deref_mut(), progress_update,))?;
|
||||
}
|
||||
|
||||
#[cfg(feature = "verify")]
|
||||
@@ -1578,6 +1548,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait implemented by types that can be used to measure weight units.
|
||||
pub trait Vbytes {
|
||||
/// Convert weight units to virtual bytes.
|
||||
fn vbytes(self) -> f32;
|
||||
}
|
||||
|
||||
impl Vbytes for usize {
|
||||
fn vbytes(self) -> f32 {
|
||||
self as f32 / 4.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
use std::str::FromStr;
|
||||
@@ -1764,7 +1746,7 @@ pub(crate) mod test {
|
||||
dust_change = true;
|
||||
)*
|
||||
|
||||
let tx_fee_rate = $fees as f32 / (tx.get_weight() as f32 / 4.0);
|
||||
let tx_fee_rate = $fees as f32 / (tx.get_weight().vbytes());
|
||||
let fee_rate = $fee_rate.as_sat_vb();
|
||||
|
||||
if !dust_change {
|
||||
@@ -1996,13 +1978,11 @@ pub(crate) mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_tx_single_recipient_drain_wallet() {
|
||||
fn test_create_tx_drain_wallet_and_drain_to() {
|
||||
let (wallet, _, _) = get_funded_wallet(get_test_wpkh());
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
|
||||
assert_eq!(psbt.global.unsigned_tx.output.len(), 1);
|
||||
@@ -2012,6 +1992,33 @@ pub(crate) mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_tx_drain_wallet_and_drain_to_and_with_recipient() {
|
||||
let (wallet, _, _) = get_funded_wallet(get_test_wpkh());
|
||||
let addr = Address::from_str("2N4eQYCbKUHCCTUjBJeHcJp9ok6J2GZsTDt").unwrap();
|
||||
let drain_addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.add_recipient(addr.script_pubkey(), 20_000)
|
||||
.drain_to(drain_addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
dbg!(&psbt);
|
||||
let outputs = psbt.global.unsigned_tx.output;
|
||||
|
||||
assert_eq!(outputs.len(), 2);
|
||||
let main_output = outputs
|
||||
.iter()
|
||||
.find(|x| x.script_pubkey == addr.script_pubkey())
|
||||
.unwrap();
|
||||
let drain_output = outputs
|
||||
.iter()
|
||||
.find(|x| x.script_pubkey == drain_addr.script_pubkey())
|
||||
.unwrap();
|
||||
assert_eq!(main_output.value, 20_000,);
|
||||
assert_eq!(drain_output.value, 30_000 - details.fee.unwrap_or(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_tx_default_fee_rate() {
|
||||
let (wallet, _, _) = get_funded_wallet(get_test_wpkh());
|
||||
@@ -2042,7 +2049,7 @@ pub(crate) mod test {
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.fee_absolute(100);
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
@@ -2061,7 +2068,7 @@ pub(crate) mod test {
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.fee_absolute(0);
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
@@ -2081,7 +2088,7 @@ pub(crate) mod test {
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.fee_absolute(60_000);
|
||||
let (_psbt, _details) = builder.finish().unwrap();
|
||||
@@ -2122,13 +2129,13 @@ pub(crate) mod test {
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "InsufficientFunds")]
|
||||
fn test_create_tx_single_recipient_dust_amount() {
|
||||
fn test_create_tx_drain_to_dust_amount() {
|
||||
let (wallet, _, _) = get_funded_wallet(get_test_wpkh());
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
// very high fee rate, so that the only output would be below dust
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.fee_rate(FeeRate::from_sat_per_vb(453.0));
|
||||
builder.finish().unwrap();
|
||||
@@ -2189,9 +2196,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("wpkh([d34db33f/44'/0'/0']tpubDEnoLuPdBep9bzw5LoGYpsxUQYheRQ9gcgrJhJEcdKFB9cWQRyYmkCyRoTqeD4tJYiVVgt6A3rN6rWn9RYhR9sBsGxji29LYWHuKKbdb1ev/0/*)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert_eq!(psbt.inputs[0].bip32_derivation.len(), 1);
|
||||
@@ -2215,9 +2220,7 @@ pub(crate) mod test {
|
||||
|
||||
let addr = testutils!(@external descriptors, 5);
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert_eq!(psbt.outputs[0].bip32_derivation.len(), 1);
|
||||
@@ -2238,9 +2241,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("sh(pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
@@ -2263,9 +2264,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("wsh(pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert_eq!(psbt.inputs[0].redeem_script, None);
|
||||
@@ -2288,9 +2287,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("sh(wsh(pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW)))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let script = Script::from(
|
||||
@@ -2310,9 +2307,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("sh(pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert!(psbt.inputs[0].non_witness_utxo.is_some());
|
||||
@@ -2326,7 +2321,7 @@ pub(crate) mod test {
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.only_witness_utxo()
|
||||
.drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
@@ -2341,9 +2336,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("sh(wpkh(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert!(psbt.inputs[0].witness_utxo.is_some());
|
||||
@@ -2355,9 +2348,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("wsh(pk(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (psbt, _) = builder.finish().unwrap();
|
||||
|
||||
assert!(psbt.inputs[0].non_witness_utxo.is_some());
|
||||
@@ -2991,7 +2982,7 @@ pub(crate) mod test {
|
||||
let addr = Address::from_str("2N1Ffz3WaNzbeLFBb51xyFMHYSEUXcbiSoX").unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.enable_rbf();
|
||||
let (psbt, mut original_details) = builder.finish().unwrap();
|
||||
@@ -3015,7 +3006,7 @@ pub(crate) mod test {
|
||||
let mut builder = wallet.build_fee_bump(txid).unwrap();
|
||||
builder
|
||||
.fee_rate(FeeRate::from_sat_per_vb(2.5))
|
||||
.maintain_single_recipient()
|
||||
.allow_shrinking(addr.script_pubkey())
|
||||
.unwrap();
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
|
||||
@@ -3035,7 +3026,7 @@ pub(crate) mod test {
|
||||
let addr = Address::from_str("2N1Ffz3WaNzbeLFBb51xyFMHYSEUXcbiSoX").unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.drain_wallet()
|
||||
.enable_rbf();
|
||||
let (psbt, mut original_details) = builder.finish().unwrap();
|
||||
@@ -3058,7 +3049,7 @@ pub(crate) mod test {
|
||||
|
||||
let mut builder = wallet.build_fee_bump(txid).unwrap();
|
||||
builder
|
||||
.maintain_single_recipient()
|
||||
.allow_shrinking(addr.script_pubkey())
|
||||
.unwrap()
|
||||
.fee_absolute(300);
|
||||
let (psbt, details) = builder.finish().unwrap();
|
||||
@@ -3089,7 +3080,7 @@ pub(crate) mod test {
|
||||
let addr = Address::from_str("2N1Ffz3WaNzbeLFBb51xyFMHYSEUXcbiSoX").unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.add_utxo(outpoint)
|
||||
.unwrap()
|
||||
.manually_selected_only()
|
||||
@@ -3118,7 +3109,7 @@ pub(crate) mod test {
|
||||
let mut builder = wallet.build_fee_bump(txid).unwrap();
|
||||
builder
|
||||
.drain_wallet()
|
||||
.maintain_single_recipient()
|
||||
.allow_shrinking(addr.script_pubkey())
|
||||
.unwrap()
|
||||
.fee_rate(FeeRate::from_sat_per_vb(5.0));
|
||||
let (_, details) = builder.finish().unwrap();
|
||||
@@ -3133,7 +3124,7 @@ pub(crate) mod test {
|
||||
// them, and make sure that `bump_fee` doesn't try to add more. This fails because we've
|
||||
// told the wallet it's not allowed to add more inputs AND it can't reduce the value of the
|
||||
// existing output. In other words, bump_fee + manually_selected_only is always an error
|
||||
// unless you've also set "maintain_single_recipient".
|
||||
// unless you've also set "allow_shrinking" OR there is a change output.
|
||||
let incoming_txid = crate::populate_test_db!(
|
||||
wallet.database.borrow_mut(),
|
||||
testutils! (@tx ( (@external descriptors, 0) => 25_000 ) (@confirmations 1)),
|
||||
@@ -3146,7 +3137,7 @@ pub(crate) mod test {
|
||||
let addr = Address::from_str("2N1Ffz3WaNzbeLFBb51xyFMHYSEUXcbiSoX").unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.add_utxo(outpoint)
|
||||
.unwrap()
|
||||
.manually_selected_only()
|
||||
@@ -3312,11 +3303,11 @@ pub(crate) mod test {
|
||||
Some(100),
|
||||
);
|
||||
|
||||
// initially make a tx without change by using `set_single_recipient`
|
||||
// initially make a tx without change by using `drain_to`
|
||||
let addr = Address::from_str("2N1Ffz3WaNzbeLFBb51xyFMHYSEUXcbiSoX").unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.add_utxo(OutPoint {
|
||||
txid: incoming_txid,
|
||||
vout: 0,
|
||||
@@ -3344,7 +3335,7 @@ pub(crate) mod test {
|
||||
.set_tx(&original_details)
|
||||
.unwrap();
|
||||
|
||||
// now bump the fees without using `maintain_single_recipient`. the wallet should add an
|
||||
// now bump the fees without using `allow_shrinking`. the wallet should add an
|
||||
// extra input and a change output, and leave the original output untouched
|
||||
let mut builder = wallet.build_fee_bump(txid).unwrap();
|
||||
builder.fee_rate(FeeRate::from_sat_per_vb(50.0));
|
||||
@@ -3590,9 +3581,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("wpkh(tprv8ZgxMBicQKsPd3EupYiPRhaMooHKUHJxNsTfYuScep13go8QFfHdtkG9nRkFGb7busX4isf6X9dURGCoKgitaApQ6MupRhZMcELAxTBRJgS/*)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
@@ -3607,9 +3596,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("wpkh([d34db33f/84h/1h/0h]tprv8ZgxMBicQKsPd3EupYiPRhaMooHKUHJxNsTfYuScep13go8QFfHdtkG9nRkFGb7busX4isf6X9dURGCoKgitaApQ6MupRhZMcELAxTBRJgS/*)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
@@ -3624,9 +3611,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("wpkh(tprv8ZgxMBicQKsPd3EupYiPRhaMooHKUHJxNsTfYuScep13go8QFfHdtkG9nRkFGb7busX4isf6X9dURGCoKgitaApQ6MupRhZMcELAxTBRJgS/44'/0'/0'/0/*)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
@@ -3641,9 +3626,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("sh(wpkh(tprv8ZgxMBicQKsPd3EupYiPRhaMooHKUHJxNsTfYuScep13go8QFfHdtkG9nRkFGb7busX4isf6X9dURGCoKgitaApQ6MupRhZMcELAxTBRJgS/*))");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
@@ -3659,9 +3642,7 @@ pub(crate) mod test {
|
||||
get_funded_wallet("wpkh(cVpPVruEDdmutPzisEsYvtST1usBR3ntr8pXSyt6D2YYqXRyPcFW)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
let finalized = wallet.sign(&mut psbt, Default::default()).unwrap();
|
||||
@@ -3676,9 +3657,7 @@ pub(crate) mod test {
|
||||
let (wallet, _, _) = get_funded_wallet("wpkh(tprv8ZgxMBicQKsPd3EupYiPRhaMooHKUHJxNsTfYuScep13go8QFfHdtkG9nRkFGb7busX4isf6X9dURGCoKgitaApQ6MupRhZMcELAxTBRJgS/*)");
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_wallet();
|
||||
builder.drain_to(addr.script_pubkey()).drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
psbt.inputs[0].bip32_derivation.clear();
|
||||
@@ -3760,7 +3739,7 @@ pub(crate) mod test {
|
||||
let addr = wallet.get_address(New).unwrap();
|
||||
let mut builder = wallet.build_tx();
|
||||
builder
|
||||
.set_single_recipient(addr.script_pubkey())
|
||||
.drain_to(addr.script_pubkey())
|
||||
.sighash(sighash)
|
||||
.drain_wallet();
|
||||
let (mut psbt, _) = builder.finish().unwrap();
|
||||
|
||||
@@ -133,7 +133,7 @@ pub struct TxBuilder<'a, B, D, Cs, Ctx> {
|
||||
pub(crate) struct TxParams {
|
||||
pub(crate) recipients: Vec<(Script, u64)>,
|
||||
pub(crate) drain_wallet: bool,
|
||||
pub(crate) single_recipient: Option<Script>,
|
||||
pub(crate) drain_to: Option<Script>,
|
||||
pub(crate) fee_policy: Option<FeePolicy>,
|
||||
pub(crate) internal_policy_path: Option<BTreeMap<String, Vec<usize>>>,
|
||||
pub(crate) external_policy_path: Option<BTreeMap<String, Vec<usize>>>,
|
||||
@@ -560,49 +560,81 @@ impl<'a, B, D: BatchDatabase, Cs: CoinSelectionAlgorithm<D>> TxBuilder<'a, B, D,
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a single recipient that will get all the selected funds minus the fee. No change will
|
||||
/// be created
|
||||
/// Sets the address to *drain* excess coins to.
|
||||
///
|
||||
/// This method overrides any recipient set with [`set_recipients`](Self::set_recipients) or
|
||||
/// [`add_recipient`](Self::add_recipient).
|
||||
/// Usually, when there are excess coins they are sent to a change address generated by the
|
||||
/// wallet. This option replaces the usual change address with an arbitrary `script_pubkey` of
|
||||
/// your choosing. Just as with a change output, if the drain output is not needed (the excess
|
||||
/// coins are too small) it will not be included in the resulting transaction. The only
|
||||
/// difference is that it is valid to use `drain_to` without setting any ordinary recipients
|
||||
/// with [`add_recipient`] (but it is perfectly fine to add recipients as well).
|
||||
///
|
||||
/// It can only be used in conjunction with [`drain_wallet`](Self::drain_wallet) to send the
|
||||
/// entire content of the wallet (minus filters) to a single recipient or with a
|
||||
/// list of manually selected UTXOs by enabling [`manually_selected_only`](Self::manually_selected_only)
|
||||
/// and selecting them with or [`add_utxo`](Self::add_utxo).
|
||||
/// When bumping the fees of a transaction made with this option, you probably want to
|
||||
/// use [`allow_shrinking`] to allow this output to be reduced to pay for the extra fees.
|
||||
///
|
||||
/// When bumping the fees of a transaction made with this option, the user should remeber to
|
||||
/// add [`maintain_single_recipient`](Self::maintain_single_recipient) to correctly update the
|
||||
/// single output instead of adding one more for the change.
|
||||
pub fn set_single_recipient(&mut self, recipient: Script) -> &mut Self {
|
||||
self.params.single_recipient = Some(recipient);
|
||||
self.params.recipients.clear();
|
||||
|
||||
/// # Example
|
||||
///
|
||||
/// `drain_to` is very useful for draining all the coins in a wallet with [`drain_wallet`] to a
|
||||
/// single address.
|
||||
///
|
||||
/// ```
|
||||
/// # use std::str::FromStr;
|
||||
/// # use bitcoin::*;
|
||||
/// # use bdk::*;
|
||||
/// # use bdk::wallet::tx_builder::CreateTx;
|
||||
/// # let to_address = Address::from_str("2N4eQYCbKUHCCTUjBJeHcJp9ok6J2GZsTDt").unwrap();
|
||||
/// # let wallet = doctest_wallet!();
|
||||
/// let mut tx_builder = wallet.build_tx();
|
||||
///
|
||||
/// tx_builder
|
||||
/// // Spend all outputs in this wallet.
|
||||
/// .drain_wallet()
|
||||
/// // Send the excess (which is all the coins minus the fee) to this address.
|
||||
/// .drain_to(to_address.script_pubkey())
|
||||
/// .fee_rate(FeeRate::from_sat_per_vb(5.0))
|
||||
/// .enable_rbf();
|
||||
/// let (psbt, tx_details) = tx_builder.finish()?;
|
||||
/// # Ok::<(), bdk::Error>(())
|
||||
/// ```
|
||||
///
|
||||
/// [`allow_shrinking`]: Self::allow_shrinking
|
||||
/// [`add_recipient`]: Self::add_recipient
|
||||
/// [`drain_wallet`]: Self::drain_wallet
|
||||
pub fn drain_to(&mut self, script_pubkey: Script) -> &mut Self {
|
||||
self.params.drain_to = Some(script_pubkey);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
// methods supported only by bump_fee
|
||||
impl<'a, B, D: BatchDatabase> TxBuilder<'a, B, D, DefaultCoinSelectionAlgorithm, BumpFee> {
|
||||
/// Bump the fees of a transaction made with [`set_single_recipient`](Self::set_single_recipient)
|
||||
/// Explicitly tells the wallet that it is allowed to reduce the fee of the output matching this
|
||||
/// `script_pubkey` in order to bump the transaction fee. Without specifying this the wallet
|
||||
/// will attempt to find a change output to shrink instead.
|
||||
///
|
||||
/// Unless extra inputs are specified with [`add_utxo`], this flag will make
|
||||
/// `bump_fee` reduce the value of the existing output, or fail if it would be consumed
|
||||
/// entirely given the higher new fee rate.
|
||||
/// **Note** that the output may shrink to below the dust limit and therefore be removed. If it is
|
||||
/// preserved then it is currently not guaranteed to be in the same position as it was
|
||||
/// originally.
|
||||
///
|
||||
/// If extra inputs are added and they are not entirely consumed in fees, a change output will not
|
||||
/// be added; the existing output will simply grow in value.
|
||||
///
|
||||
/// Fails if the transaction has more than one outputs.
|
||||
///
|
||||
/// [`add_utxo`]: Self::add_utxo
|
||||
pub fn maintain_single_recipient(&mut self) -> Result<&mut Self, Error> {
|
||||
let mut recipients = self.params.recipients.drain(..).collect::<Vec<_>>();
|
||||
if recipients.len() != 1 {
|
||||
return Err(Error::SingleRecipientMultipleOutputs);
|
||||
/// Returns an `Err` if `script_pubkey` can't be found among the recipients of the
|
||||
/// transaction we are bumping.
|
||||
pub fn allow_shrinking(&mut self, script_pubkey: Script) -> Result<&mut Self, Error> {
|
||||
match self
|
||||
.params
|
||||
.recipients
|
||||
.iter()
|
||||
.position(|(recipient_script, _)| *recipient_script == script_pubkey)
|
||||
{
|
||||
Some(position) => {
|
||||
self.params.recipients.remove(position);
|
||||
self.params.drain_to = Some(script_pubkey);
|
||||
Ok(self)
|
||||
}
|
||||
None => Err(Error::Generic(format!(
|
||||
"{} was not in the original transaction",
|
||||
script_pubkey
|
||||
))),
|
||||
}
|
||||
self.params.single_recipient = Some(recipients.pop().unwrap().0);
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -124,7 +124,6 @@ mod test {
|
||||
}
|
||||
fn setup<D: BatchDatabase, P: 'static + Progress>(
|
||||
&self,
|
||||
_stop_gap: Option<usize>,
|
||||
_database: &mut D,
|
||||
_progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
Reference in New Issue
Block a user