Compare commits

...

7 Commits

Author SHA1 Message Date
codeShark149
d2da3755f4
Add Peer Manager
The Peer Manager structure is responsible for maintaining a live
directory of connected CBF/Non-CBF peers. It can send and receive
messages from one or multiple peers.

It manages an underlying address manager to fetch network addresses,
and it maintains a ban score for each peer. A threshold banscore (100)
is used to signal banning of a peer. Once a peer is banned it will not
be connected to in subsequent runs.

Peer manager will be used to handle parallel fetching of filter data
from multiple peers.
2021-07-22 00:00:55 +05:30
Steve Myers
6acb4d9796
Ignore wip test 2021-07-20 09:45:34 -07:00
codeShark149
377e5cdd49
Update address manager test
The two tests have been combined into a single one. Added extra
log points for better debugging in CI if it gets stuck.
2021-07-20 19:55:16 +05:30
rajarshimaitra
70d2a0ee6b
Return Option when getting addresses
When getting addresses return Option with Some vectors, or None in
case of empty vectors.

This makes handling the case of getting empty result easier.
2021-07-20 19:55:16 +05:30
codeShark149
de1fc2a677
Update compact_filters module
Compact filters module is updated to include the new Address Manager.

The new PeerError structure is added into CompactFiltersError to have
compatibility with existing APIs.

Minor cargo fmt nit fixes
2021-07-20 19:55:16 +05:30
codeShark149
671d90e57c
Add Address Manager
Add the final address manager structures. Address Manager is responsible
for finding new peers from the network. It maintains a directory with
cbf/non_cbf catagories of peer. A Peer Manager will query this
Address Manager to provide it with fresh addresses.

Currently Address Manager is single threaded. When a user calls it to
fetch the network, it will block the call until fetch completes.

It knows when to stop fetching via config variable MIN_CBF_BUFFER and
MIN_NONCBF_BUFFER. And it stops fetching when buffer requirement is met.

Co-authored-by: John Cantrell <johncantrell97@protonmail.com>
2021-07-20 19:55:15 +05:30
codeShark149
9480faa5d3
Add PeerError structure in peer module
This adds a new PeerError structure in the peer module. To handle all
the peer related errors. PeerErrors contains all the mempool errors too,
for now. Later if we have a more complex mempool, we might decide to
have its own dedicated error.

PeerError is to be included in the global CompactFiltersError type.
2021-07-20 19:55:15 +05:30
4 changed files with 1582 additions and 112 deletions

View File

@ -0,0 +1,764 @@
// Bitcoin Dev Kit
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
// John Cantrell <johncantrell97@protonmail.com>
//
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.
use std::fs::File;
use std::io::prelude::*;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, SendError, Sender},
Arc, RwLock,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use super::{Mempool, Peer, PeerError};
use bitcoin::network::{
constants::{Network, ServiceFlags},
message::NetworkMessage,
Address,
};
/// Default address pool minimums
const MIN_CBF_BUFFER: usize = 5;
const MIN_NONCBF_BUFFER: usize = 5;
/// A Discovery structure used by workers
///
/// Discovery can be initiated via a cache,
/// Or it will start with default hardcoded seeds
pub struct AddressDiscovery {
pending: VecDeque<SocketAddr>,
visited: HashSet<SocketAddr>,
}
impl AddressDiscovery {
fn new(network: Network, seeds: VecDeque<SocketAddr>) -> AddressDiscovery {
let mut network_seeds = AddressDiscovery::seeds(network);
let mut total_seeds = seeds;
total_seeds.append(&mut network_seeds);
AddressDiscovery {
pending: total_seeds,
visited: HashSet::new(),
}
}
fn add_pendings(&mut self, addresses: Vec<SocketAddr>) {
for addr in addresses {
if !self.pending.contains(&addr) && !self.visited.contains(&addr) {
self.pending.push_back(addr);
}
}
}
fn get_next(&mut self) -> Option<SocketAddr> {
match self.pending.pop_front() {
None => None,
Some(next) => {
self.visited.insert(next);
Some(next)
}
}
}
fn seeds(network: Network) -> VecDeque<SocketAddr> {
let mut seeds = VecDeque::new();
let port: u16 = match network {
Network::Bitcoin => 8333,
Network::Testnet => 18333,
Network::Regtest => 18444,
Network::Signet => 38333,
};
let seedhosts: &[&str] = match network {
Network::Bitcoin => &[
"seed.bitcoin.sipa.be",
"dnsseed.bluematt.me",
"dnsseed.bitcoin.dashjr.org",
"seed.bitcoinstats.com",
"seed.bitcoin.jonasschnelli.ch",
"seed.btc.petertodd.org",
"seed.bitcoin.sprovoost.nl",
"dnsseed.emzy.de",
"seed.bitcoin.wiz.biz",
],
Network::Testnet => &[
"testnet-seed.bitcoin.jonasschnelli.ch",
"seed.tbtc.petertodd.org",
"seed.testnet.bitcoin.sprovoost.nl",
"testnet-seed.bluematt.me",
],
Network::Regtest => &[],
Network::Signet => &[],
};
for seedhost in seedhosts.iter() {
if let Ok(lookup) = (*seedhost, port).to_socket_addrs() {
for host in lookup {
if host.is_ipv4() {
seeds.push_back(host);
}
}
}
}
seeds
}
}
/// Crawler structure that will interface with Discovery and public bitcoin network
///
/// Address manager will spawn multiple crawlers in separate threads to discover new addresses.
struct AddressWorker {
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
}
impl AddressWorker {
fn new(
discovery: Arc<RwLock<AddressDiscovery>>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
) -> AddressWorker {
AddressWorker {
discovery,
sender,
network,
}
}
fn try_receive_addr(&mut self, peer: &Peer) -> Result<(), AddressManagerError> {
if let Some(NetworkMessage::Addr(new_addresses)) =
peer.recv("addr", Some(Duration::from_secs(1)))?
{
self.consume_addr(new_addresses)?;
}
Ok(())
}
fn consume_addr(&mut self, addrs: Vec<(u32, Address)>) -> Result<(), AddressManagerError> {
let mut discovery_lock = self.discovery.write().map_err(PeerError::from)?;
let mut addresses = Vec::new();
for network_addrs in addrs {
if let Ok(socket_addrs) = network_addrs.1.socket_addr() {
addresses.push(socket_addrs);
}
}
discovery_lock.add_pendings(addresses);
Ok(())
}
fn work(&mut self) -> Result<(), AddressManagerError> {
loop {
let next_address = {
let mut address_discovery = self.discovery.write()?;
address_discovery.get_next()
};
match next_address {
Some(address) => {
let potential_peer = Peer::connect_with_timeout(
address,
Duration::from_secs(1),
Arc::new(Mempool::default()),
self.network,
);
if let Ok(peer) = potential_peer {
peer.send(NetworkMessage::GetAddr)?;
self.try_receive_addr(&peer)?;
self.try_receive_addr(&peer)?;
self.sender.send((address, peer.get_version().services))?;
// TODO: Investigate why close is being called on non existent connections
// currently the errors are ignored
peer.close().unwrap_or(());
}
}
None => continue,
}
}
}
}
/// A dedicated cache structure, with cbf/non_cbf separation
///
/// [AddressCache] will interface with file i/o
/// And can te turned into seeds. Generation of seed will put previously cached
/// cbf addresses at front of the vec, to boost up cbf node findings
#[derive(Serialize, Deserialize)]
struct AddressCache {
banned_peers: HashSet<SocketAddr>,
cbf: HashSet<SocketAddr>,
non_cbf: HashSet<SocketAddr>,
}
impl AddressCache {
fn empty() -> Self {
Self {
banned_peers: HashSet::new(),
cbf: HashSet::new(),
non_cbf: HashSet::new(),
}
}
fn from_file(path: &str) -> Result<Option<Self>, AddressManagerError> {
let serialized: Result<String, _> = std::fs::read_to_string(path);
let serialized = match serialized {
Ok(contents) => contents,
Err(_) => return Ok(None),
};
let address_cache = serde_json::from_str(&serialized)?;
Ok(Some(address_cache))
}
fn write_to_file(&self, path: &str) -> Result<(), AddressManagerError> {
let serialized = serde_json::to_string_pretty(&self)?;
let mut cache_file = File::create(path)?;
cache_file.write_all(serialized.as_bytes())?;
Ok(())
}
fn make_seeds(&self) -> VecDeque<SocketAddr> {
self.cbf
.iter()
.chain(self.non_cbf.iter())
.copied()
.collect()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.remove(addrs)
} else {
self.non_cbf.remove(addrs)
}
}
fn add_address(&mut self, addrs: SocketAddr, cbf: bool) -> bool {
if cbf {
self.cbf.insert(addrs)
} else {
self.non_cbf.insert(addrs)
}
}
fn add_to_banlist(&mut self, addrs: SocketAddr, cbf: bool) {
if self.banned_peers.insert(addrs) {
self.remove_address(&addrs, cbf);
}
}
}
/// A Live directory maintained by [AddressManager] of freshly found cbf and non_cbf nodes by workers
///
/// Each instance of new [AddressManager] with have fresh [AddressDirectory]
/// This is independent from the cache and will be an in-memory database to
/// fetch addresses to the user.
struct AddressDirectory {
cbf_nodes: HashSet<SocketAddr>,
non_cbf_nodes: HashSet<SocketAddr>,
// List of addresses it has previously provided to the caller (PeerManager)
previously_sent: HashSet<SocketAddr>,
}
impl AddressDirectory {
fn new() -> AddressDirectory {
AddressDirectory {
cbf_nodes: HashSet::new(),
non_cbf_nodes: HashSet::new(),
previously_sent: HashSet::new(),
}
}
fn add_address(&mut self, addr: SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.insert(addr);
} else {
self.non_cbf_nodes.insert(addr);
}
}
fn get_new_address(&mut self, cbf: bool) -> Option<SocketAddr> {
if cbf {
if let Some(new_addresses) = self
.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
} else if let Some(new_addresses) = self
.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.collect::<Vec<&SocketAddr>>()
.pop()
{
self.previously_sent.insert(*new_addresses);
Some(*new_addresses)
} else {
None
}
}
fn get_cbf_address_count(&self) -> usize {
self.cbf_nodes.len()
}
fn get_non_cbf_address_count(&self) -> usize {
self.non_cbf_nodes.len()
}
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) {
if cbf {
self.cbf_nodes.remove(addrs);
} else {
self.non_cbf_nodes.remove(addrs);
}
}
fn get_cbf_buffer(&self) -> usize {
self.cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
fn get_non_cbf_buffer(&self) -> usize {
self.non_cbf_nodes
.iter()
.filter(|item| !self.previously_sent.contains(item))
.count()
}
}
/// Discovery statistics, useful for logging
#[derive(Clone, Copy)]
pub struct DiscoveryData {
queued: usize,
visited: usize,
non_cbf_count: usize,
cbf_count: usize,
}
/// Progress trait for discovery statistics logging
pub trait DiscoveryProgress {
/// Update progress
fn update(&self, data: DiscoveryData);
}
/// Used when progress updates are not desired
#[derive(Clone)]
pub struct NoDiscoveryProgress;
impl DiscoveryProgress for NoDiscoveryProgress {
fn update(&self, _data: DiscoveryData) {}
}
/// Used to log progress update
#[derive(Clone)]
pub struct LogDiscoveryProgress;
impl DiscoveryProgress for LogDiscoveryProgress {
fn update(&self, data: DiscoveryData) {
log::trace!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued,
data.visited,
data.non_cbf_count,
data.cbf_count
);
#[cfg(test)]
println!(
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
data.queued, data.visited, data.non_cbf_count, data.cbf_count
);
}
}
/// A manager structure managing address discovery
///
/// Manager will try to maintain a given address buffer in its directory
/// buffer = len(exiting addresses) - len(previously provided addresses)
/// Manager will crawl the network until buffer criteria is satisfied
/// Manager will bootstrap workers from a cache, to speed up discovery progress in
/// subsequent call after the first crawl.
/// Manager will keep track of the cache and only update it if previously
/// unknown addresses are found.
pub struct AddressManager<P: DiscoveryProgress> {
directory: AddressDirectory,
cache_filename: String,
discovery: Arc<RwLock<AddressDiscovery>>,
threads: usize,
receiver: Receiver<(SocketAddr, ServiceFlags)>,
sender: Sender<(SocketAddr, ServiceFlags)>,
network: Network,
cbf_buffer: usize,
non_cbf_buffer: usize,
progress: P,
}
impl<P: DiscoveryProgress> AddressManager<P> {
/// Create a new manager. Initiate Discovery seeds from the cache
/// if it exists, else start with hardcoded seeds
pub fn new(
network: Network,
cache_filename: String,
threads: usize,
cbf_buffer: Option<usize>,
non_cbf_buffer: Option<usize>,
progress: P,
) -> Result<AddressManager<P>, AddressManagerError> {
let (sender, receiver) = channel();
let seeds = match AddressCache::from_file(&cache_filename)? {
Some(cache) => cache.make_seeds(),
None => VecDeque::new(),
};
let min_cbf = cbf_buffer.unwrap_or(MIN_CBF_BUFFER);
let min_non_cbf = non_cbf_buffer.unwrap_or(MIN_NONCBF_BUFFER);
let discovery = AddressDiscovery::new(network, seeds);
Ok(AddressManager {
cache_filename,
directory: AddressDirectory::new(),
discovery: Arc::new(RwLock::new(discovery)),
sender,
receiver,
network,
threads,
cbf_buffer: min_cbf,
non_cbf_buffer: min_non_cbf,
progress,
})
}
/// Get running address discovery progress
fn get_progress(&self) -> Result<DiscoveryData, AddressManagerError> {
let (queued_count, visited_count) = {
let address_discovery = self.discovery.read()?;
(
address_discovery.pending.len(),
address_discovery.visited.len(),
)
};
let cbf_node_count = self.directory.get_cbf_address_count();
let other_node_count = self.directory.get_non_cbf_address_count();
Ok(DiscoveryData {
queued: queued_count,
visited: visited_count,
non_cbf_count: cbf_node_count + other_node_count,
cbf_count: cbf_node_count,
})
}
/// Spawn [self.thread] no. of worker threads
fn spawn_workers(&mut self) -> Vec<JoinHandle<()>> {
let mut worker_handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..self.threads {
let sender = self.sender.clone();
let discovery = self.discovery.clone();
let network = self.network;
let worker_handle = thread::spawn(move || {
let mut worker = AddressWorker::new(discovery, sender, network);
worker.work().unwrap();
});
worker_handles.push(worker_handle);
}
worker_handles
}
/// Crawl the Bitcoin network until required number of cbf/non_cbf nodes are found
///
/// - This will start a bunch of crawlers.
/// - load up the existing cache.
/// - Update the cache with new found peers.
/// - check if address is in banlist
/// - run crawlers until buffer requirement is matched
/// - flush the current cache into disk
pub fn fetch(&mut self) -> Result<(), AddressManagerError> {
self.spawn_workers();
// Get already existing cache
let mut cache = match AddressCache::from_file(&self.cache_filename)? {
Some(cache) => cache,
None => AddressCache::empty(),
};
while self.directory.get_cbf_buffer() < self.cbf_buffer
|| self.directory.get_non_cbf_buffer() < self.non_cbf_buffer
{
if let Ok(message) = self.receiver.recv() {
let (addr, flag) = message;
if !cache.banned_peers.contains(&addr) {
let cbf = flag.has(ServiceFlags::COMPACT_FILTERS);
self.directory.add_address(addr, cbf);
cache.add_address(addr, cbf);
}
}
}
self.progress.update(self.get_progress()?);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename)?;
Ok(())
}
/// Get a new addresses not previously provided
pub fn get_new_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(true)
}
/// Get a new non_cbf address
pub fn get_new_non_cbf_address(&mut self) -> Option<SocketAddr> {
self.directory.get_new_address(false)
}
/// Ban an address
pub fn ban_peer(&mut self, addrs: &SocketAddr, cbf: bool) -> Result<(), AddressManagerError> {
let mut cache = AddressCache::from_file(&self.cache_filename)?.ok_or_else(|| {
AddressManagerError::Generic("Address Cache file not found".to_string())
})?;
cache.add_to_banlist(*addrs, cbf);
// When completed, flush the cache
cache.write_to_file(&self.cache_filename).unwrap();
self.directory.remove_address(addrs, cbf);
Ok(())
}
/// Get all the known CBF addresses
pub fn get_known_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get all the known regular addresses
pub fn get_known_non_cbfs(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.non_cbf_nodes
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
/// Get previously tried addresses
pub fn get_previously_tried(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.directory
.previously_sent
.iter()
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
}
#[derive(Debug)]
pub enum AddressManagerError {
/// Std I/O Error
Io(std::io::Error),
/// Internal Peer error
Peer(PeerError),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedOut,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Internal MPSC sending error
MpscSendError,
/// Serde Json Error
SerdeJson(SerdeError),
/// Generic Errors
Generic(String),
}
impl_error!(PeerError, Peer, AddressManagerError);
impl_error!(std::io::Error, Io, AddressManagerError);
impl_error!(SerdeError, SerdeJson, AddressManagerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
AddressManagerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
AddressManagerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for AddressManagerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
AddressManagerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for AddressManagerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
AddressManagerError::MutexTimedOut
} else {
AddressManagerError::MutexPoisoned
}
}
}
impl<T> From<SendError<T>> for AddressManagerError {
fn from(_: SendError<T>) -> Self {
AddressManagerError::MpscSendError
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
#[ignore]
fn test_address_manager() {
// Initiate a manager with an non existent cache file name.
// It will create a new cache file
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting manager and initial fetch");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration1 = start.elapsed();
println!("Completed Initial fetch");
// Create a new manager from existing cache and fetch again
let mut manager = AddressManager::new(
Network::Bitcoin,
"addr_cache".to_string(),
20,
None,
None,
LogDiscoveryProgress,
)
.unwrap();
// start the crawlers and time them
println!("Starting new fetch with previous cache");
let start = std::time::Instant::now();
manager.fetch().unwrap();
let duration2 = start.elapsed();
println!("Completed new fetch()");
println!("Time taken for initial crawl: {:#?}", duration1);
println!("Time taken for next crawl {:#?}", duration2);
// Check Buffer Management
println!("Checking buffer management");
// Fetch few new address and ensure buffer goes to zero
let mut addrs_list = Vec::new();
for _ in 0..5 {
let addr_cbf = manager.get_new_cbf_address().unwrap();
let addrs_non_cbf = manager.get_new_non_cbf_address().unwrap();
addrs_list.push(addr_cbf);
addrs_list.push(addrs_non_cbf);
}
assert_eq!(addrs_list.len(), 10);
// This should exhaust the cbf buffer
assert_eq!(manager.directory.get_cbf_buffer(), 0);
// Calling fetch again should start crawlers until buffer
// requirements are matched.
println!("Address buffer exhausted, starting new fetch");
manager.fetch().unwrap();
println!("Fetch Complete");
// It should again have a cbf buffer of 5
assert_eq!(manager.directory.get_cbf_buffer(), 5);
println!("Buffer management passed");
}
}

View File

@ -63,7 +63,9 @@ use bitcoin::{Network, OutPoint, Transaction, Txid};
use rocksdb::{Options, SliceTransform, DB};
mod address_manager;
mod peer;
mod peermngr;
mod store;
mod sync;
@ -77,8 +79,11 @@ use peer::*;
use store::*;
use sync::*;
// Only added to avoid unused warnings in addrsmngr module
pub use address_manager::{
AddressManager, DiscoveryProgress, LogDiscoveryProgress, NoDiscoveryProgress,
};
pub use peer::{Mempool, Peer};
const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
@ -371,10 +376,10 @@ impl Blockchain for CompactFiltersBlockchain {
database.commit_batch(updates)?;
match first_peer.ask_for_mempool() {
Err(CompactFiltersError::PeerBloomDisabled) => {
Err(PeerError::PeerBloomDisabled(_)) => {
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
}
e => e?,
e => e.map_err(CompactFiltersError::from)?,
};
let mut internal_max_deriv = None;
@ -392,7 +397,12 @@ impl Blockchain for CompactFiltersBlockchain {
)?;
}
}
for tx in first_peer.get_mempool().iter_txs().iter() {
for tx in first_peer
.get_mempool()
.iter_txs()
.map_err(CompactFiltersError::from)?
.iter()
{
self.process_tx(
database,
tx,
@ -435,11 +445,14 @@ impl Blockchain for CompactFiltersBlockchain {
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
Ok(self.peers[0]
.get_mempool()
.get_tx(&Inventory::Transaction(*txid)))
.get_tx(&Inventory::Transaction(*txid))
.map_err(CompactFiltersError::from)?)
}
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
self.peers[0].broadcast_tx(tx.clone())?;
self.peers[0]
.broadcast_tx(tx.clone())
.map_err(CompactFiltersError::from)?;
Ok(())
}
@ -487,7 +500,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.peers
.iter()
.map(|peer_conf| match &peer_conf.socks5 {
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network)
.map_err(CompactFiltersError::from),
Some(proxy) => Peer::connect_proxy(
peer_conf.address.as_str(),
proxy,
@ -497,7 +511,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
.map(|(a, b)| (a.as_str(), b.as_str())),
Arc::clone(&mempool),
config.network,
),
)
.map_err(CompactFiltersError::from),
})
.collect::<Result<_, _>>()?;
@ -546,6 +561,9 @@ pub enum CompactFiltersError {
/// Wrapper for [`crate::error::Error`]
Global(Box<crate::error::Error>),
/// Internal Peer Error
Peer(PeerError),
}
impl fmt::Display for CompactFiltersError {
@ -560,6 +578,7 @@ impl_error!(rocksdb::Error, Db, CompactFiltersError);
impl_error!(std::io::Error, Io, CompactFiltersError);
impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError);
impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
impl_error!(PeerError, Peer, CompactFiltersError);
impl From<crate::error::Error> for CompactFiltersError {
fn from(err: crate::error::Error) -> Self {

View File

@ -10,11 +10,15 @@
// licenses.
use std::collections::HashMap;
use std::net::{TcpStream, ToSocketAddrs};
use std::fmt;
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::PoisonError;
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
use socks::{Socks5Stream, ToTargetAddr};
use rand::{thread_rng, Rng};
@ -30,8 +34,6 @@ use bitcoin::network::stream_reader::StreamReader;
use bitcoin::network::Address;
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
use super::CompactFiltersError;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
pub(crate) const TIMEOUT_SECS: u64 = 30;
@ -65,17 +67,18 @@ impl Mempool {
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
let mut guard = self.0.write().unwrap();
pub fn add_tx(&self, tx: Transaction) -> Result<(), PeerError> {
let mut guard = self.0.write()?;
guard.wtxids.insert(tx.wtxid(), tx.txid());
guard.txs.insert(tx.txid(), tx);
Ok(())
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
pub fn get_tx(&self, inventory: &Inventory) -> Result<Option<Transaction>, PeerError> {
let identifer = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return Ok(None),
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
@ -85,27 +88,34 @@ impl Mempool {
inv_type,
hash
);
return None;
return Ok(None);
}
};
let txid = match identifer {
TxIdentifier::Txid(txid) => Some(txid),
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
TxIdentifier::Wtxid(wtxid) => self.0.read()?.wtxids.get(&wtxid).cloned(),
};
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
.flatten()
let result = match txid {
Some(txid) => {
let read_lock = self.0.read()?;
read_lock.txs.get(&txid).cloned()
}
None => None,
};
Ok(result)
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
self.0.read().unwrap().txs.contains_key(txid)
pub fn has_tx(&self, txid: &Txid) -> Result<bool, PeerError> {
Ok(self.0.read()?.txs.contains_key(txid))
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
self.0.read().unwrap().txs.values().cloned().collect()
pub fn iter_txs(&self) -> Result<Vec<Transaction>, PeerError> {
Ok(self.0.read()?.txs.values().cloned().collect())
}
}
@ -133,12 +143,31 @@ impl Peer {
address: A,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let stream = TcpStream::connect(address)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer over a plaintext TCP connection with a timeout
///
/// This function behaves exactly the same as `connect` except for two differences
/// 1) It assumes your ToSocketAddrs will resolve to a single address
/// 2) It lets you specify a connection timeout
pub fn connect_with_timeout<A: ToSocketAddrs>(
address: A,
timeout: Duration,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, PeerError> {
let socket_addr = address
.to_socket_addrs()?
.next()
.ok_or(PeerError::AddresseResolution)?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
Peer::from_stream(stream, mempool, network)
}
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
/// as a tuple of `(username, password)`
///
@ -150,7 +179,7 @@ impl Peer {
credentials: Option<(&str, &str)>,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let socks_stream = if let Some((username, password)) = credentials {
Socks5Stream::connect_with_password(proxy, target, username, password)?
} else {
@ -165,12 +194,12 @@ impl Peer {
stream: TcpStream,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
) -> Result<Self, PeerError> {
let writer = Arc::new(Mutex::new(stream.try_clone()?));
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
let connected = Arc::new(RwLock::new(true));
let mut locked_writer = writer.lock().unwrap();
let mut locked_writer = writer.lock()?;
let reader_thread_responses = Arc::clone(&responses);
let reader_thread_writer = Arc::clone(&writer);
@ -185,6 +214,7 @@ impl Peer {
reader_thread_mempool,
reader_thread_connected,
)
.unwrap()
});
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
@ -209,18 +239,20 @@ impl Peer {
0,
)),
)?;
let version = if let NetworkMessage::Version(version) =
Self::_recv(&responses, "version", None).unwrap()
{
version
} else {
return Err(CompactFiltersError::InvalidResponse);
let version = match Self::_recv(&responses, "version", Some(Duration::from_secs(1)))? {
Some(NetworkMessage::Version(version)) => version,
_ => {
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
};
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
if let Some(NetworkMessage::Verack) =
Self::_recv(&responses, "verack", Some(Duration::from_secs(1)))?
{
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
} else {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
}
std::mem::drop(locked_writer);
@ -236,19 +268,26 @@ impl Peer {
})
}
/// Close the peer connection
// Consume Self
pub fn close(self) -> Result<(), PeerError> {
let locked_writer = self.writer.lock()?;
Ok((*locked_writer).shutdown(std::net::Shutdown::Both)?)
}
/// Get the socket address of the remote peer
pub fn get_address(&self) -> Result<SocketAddr, PeerError> {
let locked_writer = self.writer.lock()?;
Ok(locked_writer.peer_addr()?)
}
/// Send a Bitcoin network message
fn _send(
writer: &mut TcpStream,
magic: u32,
payload: NetworkMessage,
) -> Result<(), CompactFiltersError> {
fn _send(writer: &mut TcpStream, magic: u32, payload: NetworkMessage) -> Result<(), PeerError> {
log::trace!("==> {:?}", payload);
let raw_message = RawNetworkMessage { magic, payload };
raw_message
.consensus_encode(writer)
.map_err(|_| CompactFiltersError::DataCorruption)?;
raw_message.consensus_encode(writer)?;
Ok(())
}
@ -258,30 +297,30 @@ impl Peer {
responses: &Arc<RwLock<ResponsesMap>>,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Option<NetworkMessage> {
) -> Result<Option<NetworkMessage>, PeerError> {
let message_resp = {
let mut lock = responses.write().unwrap();
let mut lock = responses.write()?;
let message_resp = lock.entry(wait_for).or_default();
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
while messages.is_empty() {
match timeout {
None => messages = cvar.wait(messages).unwrap(),
None => messages = cvar.wait(messages)?,
Some(t) => {
let result = cvar.wait_timeout(messages, t).unwrap();
let result = cvar.wait_timeout(messages, t)?;
if result.1.timed_out() {
return None;
return Ok(None);
}
messages = result.0;
}
}
}
messages.pop()
Ok(messages.pop())
}
/// Return the [`VersionMessage`] sent by the peer
@ -300,8 +339,8 @@ impl Peer {
}
/// Return whether or not the peer is still connected
pub fn is_connected(&self) -> bool {
*self.connected.read().unwrap()
pub fn is_connected(&self) -> Result<bool, PeerError> {
Ok(*self.connected.read()?)
}
/// Internal function called once the `reader_thread` is spawned
@ -312,14 +351,14 @@ impl Peer {
reader_thread_writer: Arc<Mutex<TcpStream>>,
reader_thread_mempool: Arc<Mempool>,
reader_thread_connected: Arc<RwLock<bool>>,
) {
) -> Result<(), PeerError> {
macro_rules! check_disconnect {
($call:expr) => {
match $call {
Ok(good) => good,
Err(e) => {
log::debug!("Error {:?}", e);
*reader_thread_connected.write().unwrap() = false;
*reader_thread_connected.write()? = false;
break;
}
@ -328,7 +367,7 @@ impl Peer {
}
let mut reader = StreamReader::new(connection, None);
loop {
while *reader_thread_connected.read()? {
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
let in_message = if raw_message.magic != network.magic() {
@ -342,7 +381,7 @@ impl Peer {
match in_message {
NetworkMessage::Ping(nonce) => {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Pong(nonce),
));
@ -353,19 +392,21 @@ impl Peer {
NetworkMessage::GetData(ref inv) => {
let (found, not_found): (Vec<_>, Vec<_>) = inv
.iter()
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
.map(|item| (*item, reader_thread_mempool.get_tx(item).unwrap()))
.partition(|(_, d)| d.is_some());
for (_, found_tx) in found {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::Tx(found_tx.unwrap()),
NetworkMessage::Tx(found_tx.ok_or_else(|| PeerError::Generic(
"Got None while expecting Transaction".to_string()
))?),
));
}
if !not_found.is_empty() {
check_disconnect!(Self::_send(
&mut reader_thread_writer.lock().unwrap(),
&mut *reader_thread_writer.lock()?,
network.magic(),
NetworkMessage::NotFound(
not_found.into_iter().map(|(i, _)| i).collect(),
@ -377,21 +418,23 @@ impl Peer {
}
let message_resp = {
let mut lock = reader_thread_responses.write().unwrap();
let mut lock = reader_thread_responses.write()?;
let message_resp = lock.entry(in_message.cmd()).or_default();
Arc::clone(&message_resp)
};
let (lock, cvar) = &*message_resp;
let mut messages = lock.lock().unwrap();
let mut messages = lock.lock()?;
messages.push(in_message);
cvar.notify_all();
}
Ok(())
}
/// Send a raw Bitcoin message to the peer
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
let mut writer = self.writer.lock().unwrap();
pub fn send(&self, payload: NetworkMessage) -> Result<(), PeerError> {
let mut writer = self.writer.lock()?;
Self::_send(&mut writer, self.network.magic(), payload)
}
@ -400,30 +443,27 @@ impl Peer {
&self,
wait_for: &'static str,
timeout: Option<Duration>,
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
Ok(Self::_recv(&self.responses, wait_for, timeout))
) -> Result<Option<NetworkMessage>, PeerError> {
Self::_recv(&self.responses, wait_for, timeout)
}
}
pub trait CompactFiltersPeer {
fn get_cf_checkpt(
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError>;
fn get_cf_checkpt(&self, filter_type: u8, stop_hash: BlockHash)
-> Result<CFCheckpt, PeerError>;
fn get_cf_headers(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError>;
) -> Result<CFHeaders, PeerError>;
fn get_cf_filters(
&self,
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
) -> Result<(), PeerError>;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError>;
}
impl CompactFiltersPeer for Peer {
@ -431,22 +471,20 @@ impl CompactFiltersPeer for Peer {
&self,
filter_type: u8,
stop_hash: BlockHash,
) -> Result<CFCheckpt, CompactFiltersError> {
) -> Result<CFCheckpt, PeerError> {
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
filter_type,
stop_hash,
}))?;
let response = self
.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFCheckpt(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFCheckpt(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
@ -457,35 +495,31 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<CFHeaders, CompactFiltersError> {
) -> Result<CFHeaders, PeerError> {
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
filter_type,
start_height,
stop_hash,
}))?;
let response = self
.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let response = self.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFHeaders(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFHeaders(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
if response.filter_type != filter_type {
return Err(CompactFiltersError::InvalidResponse);
return Err(PeerError::InvalidResponse(self.get_address()?));
}
Ok(response)
}
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
let response = self
.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError> {
let response = self.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let response = match response {
NetworkMessage::CFilter(response) => response,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::CFilter(response)) => response,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
Ok(response)
@ -496,7 +530,7 @@ impl CompactFiltersPeer for Peer {
filter_type: u8,
start_height: u32,
stop_hash: BlockHash,
) -> Result<(), CompactFiltersError> {
) -> Result<(), PeerError> {
self.send(NetworkMessage::GetCFilters(GetCFilters {
filter_type,
start_height,
@ -508,13 +542,13 @@ impl CompactFiltersPeer for Peer {
}
pub trait InvPeer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError>;
fn ask_for_mempool(&self) -> Result<(), PeerError>;
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError>;
}
impl InvPeer for Peer {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError> {
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
block_hash,
)]))?;
@ -522,51 +556,126 @@ impl InvPeer for Peer {
match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
None => Ok(None),
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
_ => Err(CompactFiltersError::InvalidResponse),
_ => Err(PeerError::InvalidResponse(self.get_address()?)),
}
}
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
fn ask_for_mempool(&self) -> Result<(), PeerError> {
if !self.version.services.has(ServiceFlags::BLOOM) {
return Err(CompactFiltersError::PeerBloomDisabled);
return Err(PeerError::PeerBloomDisabled(self.get_address()?));
}
self.send(NetworkMessage::MemPool)?;
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
None => return Ok(()), // empty mempool
Some(NetworkMessage::Inv(inv)) => inv,
_ => return Err(CompactFiltersError::InvalidResponse),
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
let getdata = inv
.iter()
.cloned()
.filter(
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid).unwrap()),
)
.collect::<Vec<_>>();
let num_txs = getdata.len();
self.send(NetworkMessage::GetData(getdata))?;
for _ in 0..num_txs {
let tx = self
.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?;
let tx = self.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?;
let tx = match tx {
NetworkMessage::Tx(tx) => tx,
_ => return Err(CompactFiltersError::InvalidResponse),
Some(NetworkMessage::Tx(tx)) => tx,
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
};
self.mempool.add_tx(tx);
self.mempool.add_tx(tx)?;
}
Ok(())
}
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
self.mempool.add_tx(tx.clone());
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError> {
self.mempool.add_tx(tx.clone())?;
self.send(NetworkMessage::Tx(tx))?;
Ok(())
}
}
/// Peer Errors
#[derive(Debug)]
pub enum PeerError {
/// Internal I/O error
Io(std::io::Error),
/// Internal system time error
Time(std::time::SystemTimeError),
/// A peer sent an invalid or unexpected response
InvalidResponse(SocketAddr),
/// Peer had bloom filter disabled
PeerBloomDisabled(SocketAddr),
/// Internal Mutex poisoning error
MutexPoisoned,
/// Internal Mutex wait timed out
MutexTimedout,
/// Internal RW read lock poisoned
RwReadLockPoisined,
/// Internal RW write lock poisoned
RwWriteLockPoisoned,
/// Mempool Mutex poisoned
MempoolPoisoned,
/// Network address resolution Error
AddresseResolution,
/// Generic Errors
Generic(String),
}
impl std::fmt::Display for PeerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for PeerError {}
impl_error!(std::io::Error, Io, PeerError);
impl_error!(std::time::SystemTimeError, Time, PeerError);
impl<T> From<PoisonError<MutexGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
PeerError::MutexPoisoned
}
}
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
PeerError::RwWriteLockPoisoned
}
}
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for PeerError {
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
PeerError::RwReadLockPoisined
}
}
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for PeerError {
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
let (_, wait_result) = err.into_inner();
if wait_result.timed_out() {
PeerError::MutexTimedout
} else {
PeerError::MutexPoisoned
}
}
}

View File

@ -0,0 +1,578 @@
use super::address_manager::{AddressManager, AddressManagerError, DiscoveryProgress};
use super::peer::{Mempool, Peer, PeerError, TIMEOUT_SECS};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time;
use bitcoin::network::constants::{Network, ServiceFlags};
use bitcoin::network::message::NetworkMessage;
use std::path::PathBuf;
use std::collections::BTreeMap;
// Peer Manager Configuration constants
const MIN_CBF_PEERS: usize = 2;
const MIN_TOTAL_PEERS: usize = 5;
const MIN_CRAWLER_THREADS: usize = 20;
const BAN_SCORE_THRESHOLD: usize = 100;
const RECEIVE_TIMEOUT: time::Duration = time::Duration::from_secs(TIMEOUT_SECS);
#[allow(dead_code)]
/// An Error structure describing Peer Management errors
#[derive(Debug)]
pub enum PeerManagerError {
// Internal Peer Error
Peer(PeerError),
// Internal AddressManager Error
AddrsManager(AddressManagerError),
// Os String Error
OsString(std::ffi::OsString),
// Peer not found in directory
PeerNotFound,
// Generic Internal Error
Generic(String),
}
impl_error!(PeerError, Peer, PeerManagerError);
impl_error!(AddressManagerError, AddrsManager, PeerManagerError);
impl_error!(std::ffi::OsString, OsString, PeerManagerError);
/// Peer Data stored in the manager's directory
#[derive(Debug)]
struct PeerData {
peer: Peer,
is_cbf: bool,
ban_score: usize,
}
#[allow(dead_code)]
/// A Directory structure to hold live Peers
/// All peers in the directory have live ongoing connection
/// Banning a peer removes it from the directory
#[derive(Default, Debug)]
struct PeerDirectory {
peers: BTreeMap<SocketAddr, PeerData>,
}
#[allow(dead_code)]
impl PeerDirectory {
fn new() -> Self {
Self::default()
}
fn get_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let cbf_peers = self
.peers
.iter()
.filter(|(_, peer)| peer.is_cbf)
.map(|(_, peer)| peer)
.collect::<Vec<&PeerData>>();
match cbf_peers.len() {
0 => None,
_ => Some(cbf_peers),
}
}
fn get_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let cbf_addrseses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match cbf_addrseses.len() {
0 => None,
_ => Some(cbf_addrseses),
}
}
fn get_non_cbf_peers(&self) -> Option<Vec<&PeerData>> {
let non_cbf_peers = self
.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&PeerData>>();
match non_cbf_peers.len() {
0 => None,
_ => Some(non_cbf_peers),
}
}
fn get_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
let addresses = self
.peers
.iter()
.filter_map(
|(addrs, peerdata)| {
if !peerdata.is_cbf {
Some(addrs)
} else {
None
}
},
)
.copied()
.collect::<Vec<SocketAddr>>();
match addresses.len() {
0 => None,
_ => Some(addresses),
}
}
fn get_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_non_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
let peers = self
.peers
.iter_mut()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.map(|(_, peerdata)| peerdata)
.collect::<Vec<&mut PeerData>>();
match peers.len() {
0 => None,
_ => Some(peers),
}
}
fn get_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| peerdata.is_cbf)
.count()
}
fn get_non_cbf_count(&self) -> usize {
self.peers
.iter()
.filter(|(_, peerdata)| !peerdata.is_cbf)
.count()
}
fn insert_peer(&mut self, peerdata: PeerData) -> Result<(), PeerManagerError> {
let addrs = peerdata.peer.get_address()?;
self.peers.entry(addrs).or_insert(peerdata);
Ok(())
}
fn remove_peer(&mut self, addrs: &SocketAddr) -> Option<PeerData> {
self.peers.remove(addrs)
}
fn get_peer_banscore(&self, addrs: &SocketAddr) -> Option<usize> {
self.peers.get(addrs).map(|peerdata| peerdata.ban_score)
}
fn get_peerdata_mut(&mut self, address: &SocketAddr) -> Option<&mut PeerData> {
self.peers.get_mut(address)
}
fn get_peerdata(&self, address: &SocketAddr) -> Option<&PeerData> {
self.peers.get(address)
}
fn is_cbf(&self, addrs: &SocketAddr) -> Option<bool> {
if let Some(peer) = self.peers.get(addrs) {
match peer.is_cbf {
true => Some(true),
false => Some(false),
}
} else {
None
}
}
}
#[allow(dead_code)]
pub struct PeerManager<P: DiscoveryProgress> {
addrs_mngr: AddressManager<P>,
directory: PeerDirectory,
mempool: Arc<Mempool>,
min_cbf: usize,
min_total: usize,
network: Network,
}
#[allow(dead_code)]
impl<P: DiscoveryProgress> PeerManager<P> {
pub fn init(
network: Network,
cache_dir: &str,
crawler_threads: Option<usize>,
progress: P,
cbf_peers: Option<usize>,
total_peers: Option<usize>,
) -> Result<Self, PeerManagerError> {
let mut cache_filename = PathBuf::from(cache_dir);
cache_filename.push("addr_cache");
// Fetch minimum peer requirements, either by user input, or via default
let min_cbf = cbf_peers.unwrap_or(MIN_CBF_PEERS);
let min_total = total_peers.unwrap_or(MIN_TOTAL_PEERS);
let cbf_buff = min_cbf * 2;
let non_cbf_buff = (min_total - min_cbf) * 2;
// Create internal items
let addrs_mngr = AddressManager::new(
network,
cache_filename.into_os_string().into_string()?,
crawler_threads.unwrap_or(MIN_CRAWLER_THREADS),
Some(cbf_buff),
Some(non_cbf_buff),
progress,
)?;
let mempool = Arc::new(Mempool::new());
let peer_dir = PeerDirectory::new();
// Create self and update
let mut manager = Self {
addrs_mngr,
directory: peer_dir,
mempool,
min_cbf,
min_total,
network,
};
manager.update_directory()?;
Ok(manager)
}
fn update_directory(&mut self) -> Result<(), PeerManagerError> {
while self.directory.get_cbf_count() < self.min_cbf
|| self.directory.get_non_cbf_count() < (self.min_total - self.min_cbf)
{
// First connect with cbf peers, then with non_cbf
let cbf_fetch = self.directory.get_cbf_count() < self.min_cbf;
// Try to get an address
// if not present start crawlers
let target_addrs = match cbf_fetch {
true => {
if let Some(addrs) = self.addrs_mngr.get_new_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
false => {
if let Some(addrs) = self.addrs_mngr.get_new_non_cbf_address() {
addrs
} else {
self.addrs_mngr.fetch()?;
continue;
}
}
};
if let Ok(peer) = Peer::connect(target_addrs, Arc::clone(&self.mempool), self.network) {
let address = peer.get_address()?;
assert_eq!(address, target_addrs);
let is_cbf = peer
.get_version()
.services
.has(ServiceFlags::COMPACT_FILTERS);
let peerdata = PeerData {
peer,
is_cbf,
ban_score: 0,
};
self.directory.insert_peer(peerdata)?;
} else {
continue;
}
}
Ok(())
}
pub fn set_banscore(
&mut self,
increase_by: usize,
address: &SocketAddr,
) -> Result<(), PeerManagerError> {
let mut current_score = if let Some(peer) = self.directory.get_peerdata_mut(address) {
peer.ban_score
} else {
return Err(PeerManagerError::PeerNotFound);
};
current_score += increase_by;
let mut banned = false;
if current_score >= BAN_SCORE_THRESHOLD {
match (
self.directory.is_cbf(address),
self.directory.remove_peer(address),
) {
(Some(true), Some(_)) => {
self.addrs_mngr.ban_peer(address, true)?;
banned = true;
}
(Some(false), Some(_)) => {
self.addrs_mngr.ban_peer(address, false)?;
banned = true;
}
_ => {
return Err(PeerManagerError::Generic(
"data inconsistency in directory, should not happen".to_string(),
))
}
}
}
if banned {
self.update_directory()?;
}
Ok(())
}
pub fn send_to(
&self,
address: &SocketAddr,
message: NetworkMessage,
) -> Result<(), PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
peerdata.peer.send(message)?;
Ok(())
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn receive_from(
&self,
address: &SocketAddr,
wait_for: &'static str,
) -> Result<Option<NetworkMessage>, PeerManagerError> {
if let Some(peerdata) = self.directory.get_peerdata(address) {
if let Some(response) = peerdata.peer.recv(wait_for, Some(RECEIVE_TIMEOUT))? {
Ok(Some(response))
} else {
Ok(None)
}
} else {
Err(PeerManagerError::PeerNotFound)
}
}
pub fn connected_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_cbf_addresses()
}
pub fn connected_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.directory.get_non_cbf_addresses()
}
pub fn known_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_cbfs()
}
pub fn known_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_known_non_cbfs()
}
pub fn previously_tried_addresses(&self) -> Option<Vec<SocketAddr>> {
self.addrs_mngr.get_previously_tried()
}
}
#[cfg(test)]
mod test {
use super::super::LogDiscoveryProgress;
use super::*;
#[test]
#[ignore]
fn test_ban() {
let mut manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let connected_cbfs = manager.connected_cbf_addresses().unwrap();
let connected_non_cbfs = manager.connected_non_cbf_addresses().unwrap();
println!("Currently Connected CBFs: {:#?}", connected_cbfs);
assert_eq!(connected_cbfs.len(), 2);
assert_eq!(connected_non_cbfs.len(), 3);
let to_banned = &connected_cbfs[0];
println!("Banning address : {}", to_banned);
manager.set_banscore(100, to_banned).unwrap();
let newly_connected = manager.connected_cbf_addresses().unwrap();
println!("Newly Connected CBFs: {:#?}", newly_connected);
assert_eq!(newly_connected.len(), 2);
assert_ne!(newly_connected, connected_cbfs);
}
#[test]
#[ignore]
fn test_send_recv() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let target_address = manager.connected_cbf_addresses().unwrap()[0];
let ping = NetworkMessage::Ping(30);
println!("Asking peer {}", target_address);
manager.send_to(&target_address, ping).unwrap();
let response = manager
.receive_from(&target_address, "pong")
.unwrap()
.unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
let value = value.unwrap();
println!("Got value {:#?}", value);
}
#[test]
#[ignore]
fn test_connect_all() {
let manager = PeerManager::init(
Network::Bitcoin,
".",
None,
LogDiscoveryProgress,
None,
None,
)
.unwrap();
let cbf_pings = vec![100u64; manager.min_cbf];
let non_cbf_pings = vec![200u64; manager.min_total - manager.min_cbf];
let cbf_peers = manager.connected_cbf_addresses().unwrap();
let non_cbf_peers = manager.connected_non_cbf_addresses().unwrap();
let sent_cbf: Vec<bool> = cbf_pings
.iter()
.zip(cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_cbf, vec![true; manager.min_cbf]);
println!("Sent pings to cbf peers");
let sent_noncbf: Vec<bool> = non_cbf_pings
.iter()
.zip(non_cbf_peers.iter())
.map(|(ping, address)| {
let message = NetworkMessage::Ping(*ping);
manager.send_to(address, message).unwrap();
true
})
.collect();
assert_eq!(sent_noncbf, vec![true; manager.min_total - manager.min_cbf]);
println!("Sent pings to non cbf peers");
let cbf_received: Vec<u64> = cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
let non_cbf_received: Vec<u64> = non_cbf_peers
.iter()
.map(|address| {
let response = manager.receive_from(address, "pong").unwrap().unwrap();
let value = match response {
NetworkMessage::Pong(v) => Some(v),
_ => None,
};
value.unwrap()
})
.collect();
assert_eq!(cbf_pings, cbf_received);
assert_eq!(non_cbf_pings, non_cbf_received);
}
}