Compare commits
7 Commits
frost
...
compact_fi
Author | SHA1 | Date | |
---|---|---|---|
|
d2da3755f4 | ||
|
6acb4d9796 | ||
|
377e5cdd49 | ||
|
70d2a0ee6b | ||
|
de1fc2a677 | ||
|
671d90e57c | ||
|
9480faa5d3 |
764
src/blockchain/compact_filters/address_manager.rs
Normal file
764
src/blockchain/compact_filters/address_manager.rs
Normal file
@ -0,0 +1,764 @@
|
||||
// Bitcoin Dev Kit
|
||||
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
|
||||
// John Cantrell <johncantrell97@protonmail.com>
|
||||
//
|
||||
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
|
||||
//
|
||||
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
|
||||
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
|
||||
// You may not use this file except in accordance with one or both of these
|
||||
// licenses.
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{
|
||||
mpsc::{channel, Receiver, SendError, Sender},
|
||||
Arc, RwLock,
|
||||
};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Error as SerdeError;
|
||||
|
||||
use super::{Mempool, Peer, PeerError};
|
||||
|
||||
use bitcoin::network::{
|
||||
constants::{Network, ServiceFlags},
|
||||
message::NetworkMessage,
|
||||
Address,
|
||||
};
|
||||
|
||||
/// Default address pool minimums
|
||||
const MIN_CBF_BUFFER: usize = 5;
|
||||
const MIN_NONCBF_BUFFER: usize = 5;
|
||||
|
||||
/// A Discovery structure used by workers
|
||||
///
|
||||
/// Discovery can be initiated via a cache,
|
||||
/// Or it will start with default hardcoded seeds
|
||||
pub struct AddressDiscovery {
|
||||
pending: VecDeque<SocketAddr>,
|
||||
visited: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressDiscovery {
|
||||
fn new(network: Network, seeds: VecDeque<SocketAddr>) -> AddressDiscovery {
|
||||
let mut network_seeds = AddressDiscovery::seeds(network);
|
||||
let mut total_seeds = seeds;
|
||||
total_seeds.append(&mut network_seeds);
|
||||
AddressDiscovery {
|
||||
pending: total_seeds,
|
||||
visited: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_pendings(&mut self, addresses: Vec<SocketAddr>) {
|
||||
for addr in addresses {
|
||||
if !self.pending.contains(&addr) && !self.visited.contains(&addr) {
|
||||
self.pending.push_back(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_next(&mut self) -> Option<SocketAddr> {
|
||||
match self.pending.pop_front() {
|
||||
None => None,
|
||||
Some(next) => {
|
||||
self.visited.insert(next);
|
||||
Some(next)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn seeds(network: Network) -> VecDeque<SocketAddr> {
|
||||
let mut seeds = VecDeque::new();
|
||||
|
||||
let port: u16 = match network {
|
||||
Network::Bitcoin => 8333,
|
||||
Network::Testnet => 18333,
|
||||
Network::Regtest => 18444,
|
||||
Network::Signet => 38333,
|
||||
};
|
||||
|
||||
let seedhosts: &[&str] = match network {
|
||||
Network::Bitcoin => &[
|
||||
"seed.bitcoin.sipa.be",
|
||||
"dnsseed.bluematt.me",
|
||||
"dnsseed.bitcoin.dashjr.org",
|
||||
"seed.bitcoinstats.com",
|
||||
"seed.bitcoin.jonasschnelli.ch",
|
||||
"seed.btc.petertodd.org",
|
||||
"seed.bitcoin.sprovoost.nl",
|
||||
"dnsseed.emzy.de",
|
||||
"seed.bitcoin.wiz.biz",
|
||||
],
|
||||
Network::Testnet => &[
|
||||
"testnet-seed.bitcoin.jonasschnelli.ch",
|
||||
"seed.tbtc.petertodd.org",
|
||||
"seed.testnet.bitcoin.sprovoost.nl",
|
||||
"testnet-seed.bluematt.me",
|
||||
],
|
||||
Network::Regtest => &[],
|
||||
Network::Signet => &[],
|
||||
};
|
||||
|
||||
for seedhost in seedhosts.iter() {
|
||||
if let Ok(lookup) = (*seedhost, port).to_socket_addrs() {
|
||||
for host in lookup {
|
||||
if host.is_ipv4() {
|
||||
seeds.push_back(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
seeds
|
||||
}
|
||||
}
|
||||
|
||||
/// Crawler structure that will interface with Discovery and public bitcoin network
|
||||
///
|
||||
/// Address manager will spawn multiple crawlers in separate threads to discover new addresses.
|
||||
struct AddressWorker {
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
}
|
||||
|
||||
impl AddressWorker {
|
||||
fn new(
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
) -> AddressWorker {
|
||||
AddressWorker {
|
||||
discovery,
|
||||
sender,
|
||||
network,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_receive_addr(&mut self, peer: &Peer) -> Result<(), AddressManagerError> {
|
||||
if let Some(NetworkMessage::Addr(new_addresses)) =
|
||||
peer.recv("addr", Some(Duration::from_secs(1)))?
|
||||
{
|
||||
self.consume_addr(new_addresses)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn consume_addr(&mut self, addrs: Vec<(u32, Address)>) -> Result<(), AddressManagerError> {
|
||||
let mut discovery_lock = self.discovery.write().map_err(PeerError::from)?;
|
||||
let mut addresses = Vec::new();
|
||||
for network_addrs in addrs {
|
||||
if let Ok(socket_addrs) = network_addrs.1.socket_addr() {
|
||||
addresses.push(socket_addrs);
|
||||
}
|
||||
}
|
||||
discovery_lock.add_pendings(addresses);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn work(&mut self) -> Result<(), AddressManagerError> {
|
||||
loop {
|
||||
let next_address = {
|
||||
let mut address_discovery = self.discovery.write()?;
|
||||
address_discovery.get_next()
|
||||
};
|
||||
|
||||
match next_address {
|
||||
Some(address) => {
|
||||
let potential_peer = Peer::connect_with_timeout(
|
||||
address,
|
||||
Duration::from_secs(1),
|
||||
Arc::new(Mempool::default()),
|
||||
self.network,
|
||||
);
|
||||
|
||||
if let Ok(peer) = potential_peer {
|
||||
peer.send(NetworkMessage::GetAddr)?;
|
||||
self.try_receive_addr(&peer)?;
|
||||
self.try_receive_addr(&peer)?;
|
||||
self.sender.send((address, peer.get_version().services))?;
|
||||
// TODO: Investigate why close is being called on non existent connections
|
||||
// currently the errors are ignored
|
||||
peer.close().unwrap_or(());
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A dedicated cache structure, with cbf/non_cbf separation
|
||||
///
|
||||
/// [AddressCache] will interface with file i/o
|
||||
/// And can te turned into seeds. Generation of seed will put previously cached
|
||||
/// cbf addresses at front of the vec, to boost up cbf node findings
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AddressCache {
|
||||
banned_peers: HashSet<SocketAddr>,
|
||||
cbf: HashSet<SocketAddr>,
|
||||
non_cbf: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressCache {
|
||||
fn empty() -> Self {
|
||||
Self {
|
||||
banned_peers: HashSet::new(),
|
||||
cbf: HashSet::new(),
|
||||
non_cbf: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_file(path: &str) -> Result<Option<Self>, AddressManagerError> {
|
||||
let serialized: Result<String, _> = std::fs::read_to_string(path);
|
||||
let serialized = match serialized {
|
||||
Ok(contents) => contents,
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
|
||||
let address_cache = serde_json::from_str(&serialized)?;
|
||||
Ok(Some(address_cache))
|
||||
}
|
||||
|
||||
fn write_to_file(&self, path: &str) -> Result<(), AddressManagerError> {
|
||||
let serialized = serde_json::to_string_pretty(&self)?;
|
||||
|
||||
let mut cache_file = File::create(path)?;
|
||||
|
||||
cache_file.write_all(serialized.as_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_seeds(&self) -> VecDeque<SocketAddr> {
|
||||
self.cbf
|
||||
.iter()
|
||||
.chain(self.non_cbf.iter())
|
||||
.copied()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) -> bool {
|
||||
if cbf {
|
||||
self.cbf.remove(addrs)
|
||||
} else {
|
||||
self.non_cbf.remove(addrs)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_address(&mut self, addrs: SocketAddr, cbf: bool) -> bool {
|
||||
if cbf {
|
||||
self.cbf.insert(addrs)
|
||||
} else {
|
||||
self.non_cbf.insert(addrs)
|
||||
}
|
||||
}
|
||||
|
||||
fn add_to_banlist(&mut self, addrs: SocketAddr, cbf: bool) {
|
||||
if self.banned_peers.insert(addrs) {
|
||||
self.remove_address(&addrs, cbf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Live directory maintained by [AddressManager] of freshly found cbf and non_cbf nodes by workers
|
||||
///
|
||||
/// Each instance of new [AddressManager] with have fresh [AddressDirectory]
|
||||
/// This is independent from the cache and will be an in-memory database to
|
||||
/// fetch addresses to the user.
|
||||
struct AddressDirectory {
|
||||
cbf_nodes: HashSet<SocketAddr>,
|
||||
non_cbf_nodes: HashSet<SocketAddr>,
|
||||
|
||||
// List of addresses it has previously provided to the caller (PeerManager)
|
||||
previously_sent: HashSet<SocketAddr>,
|
||||
}
|
||||
|
||||
impl AddressDirectory {
|
||||
fn new() -> AddressDirectory {
|
||||
AddressDirectory {
|
||||
cbf_nodes: HashSet::new(),
|
||||
non_cbf_nodes: HashSet::new(),
|
||||
previously_sent: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_address(&mut self, addr: SocketAddr, cbf: bool) {
|
||||
if cbf {
|
||||
self.cbf_nodes.insert(addr);
|
||||
} else {
|
||||
self.non_cbf_nodes.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_new_address(&mut self, cbf: bool) -> Option<SocketAddr> {
|
||||
if cbf {
|
||||
if let Some(new_addresses) = self
|
||||
.cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.collect::<Vec<&SocketAddr>>()
|
||||
.pop()
|
||||
{
|
||||
self.previously_sent.insert(*new_addresses);
|
||||
Some(*new_addresses)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else if let Some(new_addresses) = self
|
||||
.non_cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.collect::<Vec<&SocketAddr>>()
|
||||
.pop()
|
||||
{
|
||||
self.previously_sent.insert(*new_addresses);
|
||||
Some(*new_addresses)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_address_count(&self) -> usize {
|
||||
self.cbf_nodes.len()
|
||||
}
|
||||
|
||||
fn get_non_cbf_address_count(&self) -> usize {
|
||||
self.non_cbf_nodes.len()
|
||||
}
|
||||
|
||||
fn remove_address(&mut self, addrs: &SocketAddr, cbf: bool) {
|
||||
if cbf {
|
||||
self.cbf_nodes.remove(addrs);
|
||||
} else {
|
||||
self.non_cbf_nodes.remove(addrs);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_buffer(&self) -> usize {
|
||||
self.cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.count()
|
||||
}
|
||||
|
||||
fn get_non_cbf_buffer(&self) -> usize {
|
||||
self.non_cbf_nodes
|
||||
.iter()
|
||||
.filter(|item| !self.previously_sent.contains(item))
|
||||
.count()
|
||||
}
|
||||
}
|
||||
|
||||
/// Discovery statistics, useful for logging
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct DiscoveryData {
|
||||
queued: usize,
|
||||
visited: usize,
|
||||
non_cbf_count: usize,
|
||||
cbf_count: usize,
|
||||
}
|
||||
|
||||
/// Progress trait for discovery statistics logging
|
||||
pub trait DiscoveryProgress {
|
||||
/// Update progress
|
||||
fn update(&self, data: DiscoveryData);
|
||||
}
|
||||
|
||||
/// Used when progress updates are not desired
|
||||
#[derive(Clone)]
|
||||
pub struct NoDiscoveryProgress;
|
||||
|
||||
impl DiscoveryProgress for NoDiscoveryProgress {
|
||||
fn update(&self, _data: DiscoveryData) {}
|
||||
}
|
||||
|
||||
/// Used to log progress update
|
||||
#[derive(Clone)]
|
||||
pub struct LogDiscoveryProgress;
|
||||
|
||||
impl DiscoveryProgress for LogDiscoveryProgress {
|
||||
fn update(&self, data: DiscoveryData) {
|
||||
log::trace!(
|
||||
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
|
||||
data.queued,
|
||||
data.visited,
|
||||
data.non_cbf_count,
|
||||
data.cbf_count
|
||||
);
|
||||
|
||||
#[cfg(test)]
|
||||
println!(
|
||||
"P2P Discovery: {} queued, {} visited, {} connected, {} cbf_enabled",
|
||||
data.queued, data.visited, data.non_cbf_count, data.cbf_count
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// A manager structure managing address discovery
|
||||
///
|
||||
/// Manager will try to maintain a given address buffer in its directory
|
||||
/// buffer = len(exiting addresses) - len(previously provided addresses)
|
||||
/// Manager will crawl the network until buffer criteria is satisfied
|
||||
/// Manager will bootstrap workers from a cache, to speed up discovery progress in
|
||||
/// subsequent call after the first crawl.
|
||||
/// Manager will keep track of the cache and only update it if previously
|
||||
/// unknown addresses are found.
|
||||
pub struct AddressManager<P: DiscoveryProgress> {
|
||||
directory: AddressDirectory,
|
||||
cache_filename: String,
|
||||
discovery: Arc<RwLock<AddressDiscovery>>,
|
||||
threads: usize,
|
||||
receiver: Receiver<(SocketAddr, ServiceFlags)>,
|
||||
sender: Sender<(SocketAddr, ServiceFlags)>,
|
||||
network: Network,
|
||||
cbf_buffer: usize,
|
||||
non_cbf_buffer: usize,
|
||||
progress: P,
|
||||
}
|
||||
|
||||
impl<P: DiscoveryProgress> AddressManager<P> {
|
||||
/// Create a new manager. Initiate Discovery seeds from the cache
|
||||
/// if it exists, else start with hardcoded seeds
|
||||
pub fn new(
|
||||
network: Network,
|
||||
cache_filename: String,
|
||||
threads: usize,
|
||||
cbf_buffer: Option<usize>,
|
||||
non_cbf_buffer: Option<usize>,
|
||||
progress: P,
|
||||
) -> Result<AddressManager<P>, AddressManagerError> {
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
let seeds = match AddressCache::from_file(&cache_filename)? {
|
||||
Some(cache) => cache.make_seeds(),
|
||||
None => VecDeque::new(),
|
||||
};
|
||||
|
||||
let min_cbf = cbf_buffer.unwrap_or(MIN_CBF_BUFFER);
|
||||
|
||||
let min_non_cbf = non_cbf_buffer.unwrap_or(MIN_NONCBF_BUFFER);
|
||||
|
||||
let discovery = AddressDiscovery::new(network, seeds);
|
||||
|
||||
Ok(AddressManager {
|
||||
cache_filename,
|
||||
directory: AddressDirectory::new(),
|
||||
discovery: Arc::new(RwLock::new(discovery)),
|
||||
sender,
|
||||
receiver,
|
||||
network,
|
||||
threads,
|
||||
cbf_buffer: min_cbf,
|
||||
non_cbf_buffer: min_non_cbf,
|
||||
progress,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get running address discovery progress
|
||||
fn get_progress(&self) -> Result<DiscoveryData, AddressManagerError> {
|
||||
let (queued_count, visited_count) = {
|
||||
let address_discovery = self.discovery.read()?;
|
||||
(
|
||||
address_discovery.pending.len(),
|
||||
address_discovery.visited.len(),
|
||||
)
|
||||
};
|
||||
|
||||
let cbf_node_count = self.directory.get_cbf_address_count();
|
||||
let other_node_count = self.directory.get_non_cbf_address_count();
|
||||
|
||||
Ok(DiscoveryData {
|
||||
queued: queued_count,
|
||||
visited: visited_count,
|
||||
non_cbf_count: cbf_node_count + other_node_count,
|
||||
cbf_count: cbf_node_count,
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn [self.thread] no. of worker threads
|
||||
fn spawn_workers(&mut self) -> Vec<JoinHandle<()>> {
|
||||
let mut worker_handles: Vec<JoinHandle<()>> = vec![];
|
||||
for _ in 0..self.threads {
|
||||
let sender = self.sender.clone();
|
||||
let discovery = self.discovery.clone();
|
||||
let network = self.network;
|
||||
let worker_handle = thread::spawn(move || {
|
||||
let mut worker = AddressWorker::new(discovery, sender, network);
|
||||
worker.work().unwrap();
|
||||
});
|
||||
worker_handles.push(worker_handle);
|
||||
}
|
||||
worker_handles
|
||||
}
|
||||
|
||||
/// Crawl the Bitcoin network until required number of cbf/non_cbf nodes are found
|
||||
///
|
||||
/// - This will start a bunch of crawlers.
|
||||
/// - load up the existing cache.
|
||||
/// - Update the cache with new found peers.
|
||||
/// - check if address is in banlist
|
||||
/// - run crawlers until buffer requirement is matched
|
||||
/// - flush the current cache into disk
|
||||
pub fn fetch(&mut self) -> Result<(), AddressManagerError> {
|
||||
self.spawn_workers();
|
||||
|
||||
// Get already existing cache
|
||||
let mut cache = match AddressCache::from_file(&self.cache_filename)? {
|
||||
Some(cache) => cache,
|
||||
None => AddressCache::empty(),
|
||||
};
|
||||
|
||||
while self.directory.get_cbf_buffer() < self.cbf_buffer
|
||||
|| self.directory.get_non_cbf_buffer() < self.non_cbf_buffer
|
||||
{
|
||||
if let Ok(message) = self.receiver.recv() {
|
||||
let (addr, flag) = message;
|
||||
if !cache.banned_peers.contains(&addr) {
|
||||
let cbf = flag.has(ServiceFlags::COMPACT_FILTERS);
|
||||
self.directory.add_address(addr, cbf);
|
||||
cache.add_address(addr, cbf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.progress.update(self.get_progress()?);
|
||||
|
||||
// When completed, flush the cache
|
||||
cache.write_to_file(&self.cache_filename)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a new addresses not previously provided
|
||||
pub fn get_new_cbf_address(&mut self) -> Option<SocketAddr> {
|
||||
self.directory.get_new_address(true)
|
||||
}
|
||||
|
||||
/// Get a new non_cbf address
|
||||
pub fn get_new_non_cbf_address(&mut self) -> Option<SocketAddr> {
|
||||
self.directory.get_new_address(false)
|
||||
}
|
||||
|
||||
/// Ban an address
|
||||
pub fn ban_peer(&mut self, addrs: &SocketAddr, cbf: bool) -> Result<(), AddressManagerError> {
|
||||
let mut cache = AddressCache::from_file(&self.cache_filename)?.ok_or_else(|| {
|
||||
AddressManagerError::Generic("Address Cache file not found".to_string())
|
||||
})?;
|
||||
|
||||
cache.add_to_banlist(*addrs, cbf);
|
||||
|
||||
// When completed, flush the cache
|
||||
cache.write_to_file(&self.cache_filename).unwrap();
|
||||
|
||||
self.directory.remove_address(addrs, cbf);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all the known CBF addresses
|
||||
pub fn get_known_cbfs(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.cbf_nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all the known regular addresses
|
||||
pub fn get_known_non_cbfs(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.non_cbf_nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get previously tried addresses
|
||||
pub fn get_previously_tried(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.directory
|
||||
.previously_sent
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AddressManagerError {
|
||||
/// Std I/O Error
|
||||
Io(std::io::Error),
|
||||
|
||||
/// Internal Peer error
|
||||
Peer(PeerError),
|
||||
|
||||
/// Internal Mutex poisoning error
|
||||
MutexPoisoned,
|
||||
|
||||
/// Internal Mutex wait timed out
|
||||
MutexTimedOut,
|
||||
|
||||
/// Internal RW read lock poisoned
|
||||
RwReadLockPoisined,
|
||||
|
||||
/// Internal RW write lock poisoned
|
||||
RwWriteLockPoisoned,
|
||||
|
||||
/// Internal MPSC sending error
|
||||
MpscSendError,
|
||||
|
||||
/// Serde Json Error
|
||||
SerdeJson(SerdeError),
|
||||
|
||||
/// Generic Errors
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl_error!(PeerError, Peer, AddressManagerError);
|
||||
impl_error!(std::io::Error, Io, AddressManagerError);
|
||||
impl_error!(SerdeError, SerdeJson, AddressManagerError);
|
||||
|
||||
impl<T> From<PoisonError<MutexGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::RwWriteLockPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for AddressManagerError {
|
||||
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
|
||||
AddressManagerError::RwReadLockPoisined
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for AddressManagerError {
|
||||
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
|
||||
let (_, wait_result) = err.into_inner();
|
||||
if wait_result.timed_out() {
|
||||
AddressManagerError::MutexTimedOut
|
||||
} else {
|
||||
AddressManagerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<SendError<T>> for AddressManagerError {
|
||||
fn from(_: SendError<T>) -> Self {
|
||||
AddressManagerError::MpscSendError
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_address_manager() {
|
||||
// Initiate a manager with an non existent cache file name.
|
||||
// It will create a new cache file
|
||||
let mut manager = AddressManager::new(
|
||||
Network::Bitcoin,
|
||||
"addr_cache".to_string(),
|
||||
20,
|
||||
None,
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// start the crawlers and time them
|
||||
println!("Starting manager and initial fetch");
|
||||
let start = std::time::Instant::now();
|
||||
manager.fetch().unwrap();
|
||||
let duration1 = start.elapsed();
|
||||
println!("Completed Initial fetch");
|
||||
|
||||
// Create a new manager from existing cache and fetch again
|
||||
let mut manager = AddressManager::new(
|
||||
Network::Bitcoin,
|
||||
"addr_cache".to_string(),
|
||||
20,
|
||||
None,
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// start the crawlers and time them
|
||||
println!("Starting new fetch with previous cache");
|
||||
let start = std::time::Instant::now();
|
||||
manager.fetch().unwrap();
|
||||
let duration2 = start.elapsed();
|
||||
println!("Completed new fetch()");
|
||||
|
||||
println!("Time taken for initial crawl: {:#?}", duration1);
|
||||
println!("Time taken for next crawl {:#?}", duration2);
|
||||
|
||||
// Check Buffer Management
|
||||
|
||||
println!("Checking buffer management");
|
||||
// Fetch few new address and ensure buffer goes to zero
|
||||
let mut addrs_list = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let addr_cbf = manager.get_new_cbf_address().unwrap();
|
||||
let addrs_non_cbf = manager.get_new_non_cbf_address().unwrap();
|
||||
|
||||
addrs_list.push(addr_cbf);
|
||||
|
||||
addrs_list.push(addrs_non_cbf);
|
||||
}
|
||||
|
||||
assert_eq!(addrs_list.len(), 10);
|
||||
|
||||
// This should exhaust the cbf buffer
|
||||
assert_eq!(manager.directory.get_cbf_buffer(), 0);
|
||||
|
||||
// Calling fetch again should start crawlers until buffer
|
||||
// requirements are matched.
|
||||
println!("Address buffer exhausted, starting new fetch");
|
||||
manager.fetch().unwrap();
|
||||
println!("Fetch Complete");
|
||||
// It should again have a cbf buffer of 5
|
||||
assert_eq!(manager.directory.get_cbf_buffer(), 5);
|
||||
|
||||
println!("Buffer management passed");
|
||||
}
|
||||
}
|
@ -63,7 +63,9 @@ use bitcoin::{Network, OutPoint, Transaction, Txid};
|
||||
|
||||
use rocksdb::{Options, SliceTransform, DB};
|
||||
|
||||
mod address_manager;
|
||||
mod peer;
|
||||
mod peermngr;
|
||||
mod store;
|
||||
mod sync;
|
||||
|
||||
@ -77,8 +79,11 @@ use peer::*;
|
||||
use store::*;
|
||||
use sync::*;
|
||||
|
||||
// Only added to avoid unused warnings in addrsmngr module
|
||||
pub use address_manager::{
|
||||
AddressManager, DiscoveryProgress, LogDiscoveryProgress, NoDiscoveryProgress,
|
||||
};
|
||||
pub use peer::{Mempool, Peer};
|
||||
|
||||
const SYNC_HEADERS_COST: f32 = 1.0;
|
||||
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
|
||||
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
|
||||
@ -371,10 +376,10 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
database.commit_batch(updates)?;
|
||||
|
||||
match first_peer.ask_for_mempool() {
|
||||
Err(CompactFiltersError::PeerBloomDisabled) => {
|
||||
Err(PeerError::PeerBloomDisabled(_)) => {
|
||||
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
|
||||
}
|
||||
e => e?,
|
||||
e => e.map_err(CompactFiltersError::from)?,
|
||||
};
|
||||
|
||||
let mut internal_max_deriv = None;
|
||||
@ -392,7 +397,12 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
)?;
|
||||
}
|
||||
}
|
||||
for tx in first_peer.get_mempool().iter_txs().iter() {
|
||||
for tx in first_peer
|
||||
.get_mempool()
|
||||
.iter_txs()
|
||||
.map_err(CompactFiltersError::from)?
|
||||
.iter()
|
||||
{
|
||||
self.process_tx(
|
||||
database,
|
||||
tx,
|
||||
@ -435,11 +445,14 @@ impl Blockchain for CompactFiltersBlockchain {
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
Ok(self.peers[0]
|
||||
.get_mempool()
|
||||
.get_tx(&Inventory::Transaction(*txid)))
|
||||
.get_tx(&Inventory::Transaction(*txid))
|
||||
.map_err(CompactFiltersError::from)?)
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
|
||||
self.peers[0].broadcast_tx(tx.clone())?;
|
||||
self.peers[0]
|
||||
.broadcast_tx(tx.clone())
|
||||
.map_err(CompactFiltersError::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -487,7 +500,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
|
||||
.peers
|
||||
.iter()
|
||||
.map(|peer_conf| match &peer_conf.socks5 {
|
||||
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
|
||||
None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network)
|
||||
.map_err(CompactFiltersError::from),
|
||||
Some(proxy) => Peer::connect_proxy(
|
||||
peer_conf.address.as_str(),
|
||||
proxy,
|
||||
@ -497,7 +511,8 @@ impl ConfigurableBlockchain for CompactFiltersBlockchain {
|
||||
.map(|(a, b)| (a.as_str(), b.as_str())),
|
||||
Arc::clone(&mempool),
|
||||
config.network,
|
||||
),
|
||||
)
|
||||
.map_err(CompactFiltersError::from),
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
@ -546,6 +561,9 @@ pub enum CompactFiltersError {
|
||||
|
||||
/// Wrapper for [`crate::error::Error`]
|
||||
Global(Box<crate::error::Error>),
|
||||
|
||||
/// Internal Peer Error
|
||||
Peer(PeerError),
|
||||
}
|
||||
|
||||
impl fmt::Display for CompactFiltersError {
|
||||
@ -560,6 +578,7 @@ impl_error!(rocksdb::Error, Db, CompactFiltersError);
|
||||
impl_error!(std::io::Error, Io, CompactFiltersError);
|
||||
impl_error!(bitcoin::util::bip158::Error, Bip158, CompactFiltersError);
|
||||
impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
|
||||
impl_error!(PeerError, Peer, CompactFiltersError);
|
||||
|
||||
impl From<crate::error::Error> for CompactFiltersError {
|
||||
fn from(err: crate::error::Error) -> Self {
|
||||
|
@ -10,11 +10,15 @@
|
||||
// licenses.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::fmt;
|
||||
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard, WaitTimeoutResult};
|
||||
|
||||
use socks::{Socks5Stream, ToTargetAddr};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
@ -30,8 +34,6 @@ use bitcoin::network::stream_reader::StreamReader;
|
||||
use bitcoin::network::Address;
|
||||
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
|
||||
|
||||
use super::CompactFiltersError;
|
||||
|
||||
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
|
||||
|
||||
pub(crate) const TIMEOUT_SECS: u64 = 30;
|
||||
@ -65,17 +67,18 @@ impl Mempool {
|
||||
///
|
||||
/// Note that this doesn't propagate the transaction to other
|
||||
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
|
||||
pub fn add_tx(&self, tx: Transaction) {
|
||||
let mut guard = self.0.write().unwrap();
|
||||
pub fn add_tx(&self, tx: Transaction) -> Result<(), PeerError> {
|
||||
let mut guard = self.0.write()?;
|
||||
|
||||
guard.wtxids.insert(tx.wtxid(), tx.txid());
|
||||
guard.txs.insert(tx.txid(), tx);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look-up a transaction in the mempool given an [`Inventory`] request
|
||||
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
|
||||
pub fn get_tx(&self, inventory: &Inventory) -> Result<Option<Transaction>, PeerError> {
|
||||
let identifer = match inventory {
|
||||
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
|
||||
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return Ok(None),
|
||||
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
|
||||
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
|
||||
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
|
||||
@ -85,27 +88,34 @@ impl Mempool {
|
||||
inv_type,
|
||||
hash
|
||||
);
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let txid = match identifer {
|
||||
TxIdentifier::Txid(txid) => Some(txid),
|
||||
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
|
||||
TxIdentifier::Wtxid(wtxid) => self.0.read()?.wtxids.get(&wtxid).cloned(),
|
||||
};
|
||||
|
||||
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
|
||||
.flatten()
|
||||
let result = match txid {
|
||||
Some(txid) => {
|
||||
let read_lock = self.0.read()?;
|
||||
read_lock.txs.get(&txid).cloned()
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return whether or not the mempool contains a transaction with a given txid
|
||||
pub fn has_tx(&self, txid: &Txid) -> bool {
|
||||
self.0.read().unwrap().txs.contains_key(txid)
|
||||
pub fn has_tx(&self, txid: &Txid) -> Result<bool, PeerError> {
|
||||
Ok(self.0.read()?.txs.contains_key(txid))
|
||||
}
|
||||
|
||||
/// Return the list of transactions contained in the mempool
|
||||
pub fn iter_txs(&self) -> Vec<Transaction> {
|
||||
self.0.read().unwrap().txs.values().cloned().collect()
|
||||
pub fn iter_txs(&self) -> Result<Vec<Transaction>, PeerError> {
|
||||
Ok(self.0.read()?.txs.values().cloned().collect())
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,12 +143,31 @@ impl Peer {
|
||||
address: A,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let stream = TcpStream::connect(address)?;
|
||||
|
||||
Peer::from_stream(stream, mempool, network)
|
||||
}
|
||||
|
||||
/// Connect to a peer over a plaintext TCP connection with a timeout
|
||||
///
|
||||
/// This function behaves exactly the same as `connect` except for two differences
|
||||
/// 1) It assumes your ToSocketAddrs will resolve to a single address
|
||||
/// 2) It lets you specify a connection timeout
|
||||
pub fn connect_with_timeout<A: ToSocketAddrs>(
|
||||
address: A,
|
||||
timeout: Duration,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, PeerError> {
|
||||
let socket_addr = address
|
||||
.to_socket_addrs()?
|
||||
.next()
|
||||
.ok_or(PeerError::AddresseResolution)?;
|
||||
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
|
||||
Peer::from_stream(stream, mempool, network)
|
||||
}
|
||||
|
||||
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
|
||||
/// as a tuple of `(username, password)`
|
||||
///
|
||||
@ -150,7 +179,7 @@ impl Peer {
|
||||
credentials: Option<(&str, &str)>,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let socks_stream = if let Some((username, password)) = credentials {
|
||||
Socks5Stream::connect_with_password(proxy, target, username, password)?
|
||||
} else {
|
||||
@ -165,12 +194,12 @@ impl Peer {
|
||||
stream: TcpStream,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
) -> Result<Self, PeerError> {
|
||||
let writer = Arc::new(Mutex::new(stream.try_clone()?));
|
||||
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
let connected = Arc::new(RwLock::new(true));
|
||||
|
||||
let mut locked_writer = writer.lock().unwrap();
|
||||
let mut locked_writer = writer.lock()?;
|
||||
|
||||
let reader_thread_responses = Arc::clone(&responses);
|
||||
let reader_thread_writer = Arc::clone(&writer);
|
||||
@ -185,6 +214,7 @@ impl Peer {
|
||||
reader_thread_mempool,
|
||||
reader_thread_connected,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
|
||||
@ -209,18 +239,20 @@ impl Peer {
|
||||
0,
|
||||
)),
|
||||
)?;
|
||||
let version = if let NetworkMessage::Version(version) =
|
||||
Self::_recv(&responses, "version", None).unwrap()
|
||||
{
|
||||
version
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
|
||||
let version = match Self::_recv(&responses, "version", Some(Duration::from_secs(1)))? {
|
||||
Some(NetworkMessage::Version(version)) => version,
|
||||
_ => {
|
||||
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
|
||||
}
|
||||
};
|
||||
|
||||
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None).unwrap() {
|
||||
if let Some(NetworkMessage::Verack) =
|
||||
Self::_recv(&responses, "verack", Some(Duration::from_secs(1)))?
|
||||
{
|
||||
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(locked_writer.peer_addr()?));
|
||||
}
|
||||
|
||||
std::mem::drop(locked_writer);
|
||||
@ -236,19 +268,26 @@ impl Peer {
|
||||
})
|
||||
}
|
||||
|
||||
/// Close the peer connection
|
||||
// Consume Self
|
||||
pub fn close(self) -> Result<(), PeerError> {
|
||||
let locked_writer = self.writer.lock()?;
|
||||
Ok((*locked_writer).shutdown(std::net::Shutdown::Both)?)
|
||||
}
|
||||
|
||||
/// Get the socket address of the remote peer
|
||||
pub fn get_address(&self) -> Result<SocketAddr, PeerError> {
|
||||
let locked_writer = self.writer.lock()?;
|
||||
Ok(locked_writer.peer_addr()?)
|
||||
}
|
||||
|
||||
/// Send a Bitcoin network message
|
||||
fn _send(
|
||||
writer: &mut TcpStream,
|
||||
magic: u32,
|
||||
payload: NetworkMessage,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
fn _send(writer: &mut TcpStream, magic: u32, payload: NetworkMessage) -> Result<(), PeerError> {
|
||||
log::trace!("==> {:?}", payload);
|
||||
|
||||
let raw_message = RawNetworkMessage { magic, payload };
|
||||
|
||||
raw_message
|
||||
.consensus_encode(writer)
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?;
|
||||
raw_message.consensus_encode(writer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -258,30 +297,30 @@ impl Peer {
|
||||
responses: &Arc<RwLock<ResponsesMap>>,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Option<NetworkMessage> {
|
||||
) -> Result<Option<NetworkMessage>, PeerError> {
|
||||
let message_resp = {
|
||||
let mut lock = responses.write().unwrap();
|
||||
let mut lock = responses.write()?;
|
||||
let message_resp = lock.entry(wait_for).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
|
||||
let mut messages = lock.lock().unwrap();
|
||||
let mut messages = lock.lock()?;
|
||||
while messages.is_empty() {
|
||||
match timeout {
|
||||
None => messages = cvar.wait(messages).unwrap(),
|
||||
None => messages = cvar.wait(messages)?,
|
||||
Some(t) => {
|
||||
let result = cvar.wait_timeout(messages, t).unwrap();
|
||||
let result = cvar.wait_timeout(messages, t)?;
|
||||
if result.1.timed_out() {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
messages = result.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
messages.pop()
|
||||
Ok(messages.pop())
|
||||
}
|
||||
|
||||
/// Return the [`VersionMessage`] sent by the peer
|
||||
@ -300,8 +339,8 @@ impl Peer {
|
||||
}
|
||||
|
||||
/// Return whether or not the peer is still connected
|
||||
pub fn is_connected(&self) -> bool {
|
||||
*self.connected.read().unwrap()
|
||||
pub fn is_connected(&self) -> Result<bool, PeerError> {
|
||||
Ok(*self.connected.read()?)
|
||||
}
|
||||
|
||||
/// Internal function called once the `reader_thread` is spawned
|
||||
@ -312,14 +351,14 @@ impl Peer {
|
||||
reader_thread_writer: Arc<Mutex<TcpStream>>,
|
||||
reader_thread_mempool: Arc<Mempool>,
|
||||
reader_thread_connected: Arc<RwLock<bool>>,
|
||||
) {
|
||||
) -> Result<(), PeerError> {
|
||||
macro_rules! check_disconnect {
|
||||
($call:expr) => {
|
||||
match $call {
|
||||
Ok(good) => good,
|
||||
Err(e) => {
|
||||
log::debug!("Error {:?}", e);
|
||||
*reader_thread_connected.write().unwrap() = false;
|
||||
*reader_thread_connected.write()? = false;
|
||||
|
||||
break;
|
||||
}
|
||||
@ -328,7 +367,7 @@ impl Peer {
|
||||
}
|
||||
|
||||
let mut reader = StreamReader::new(connection, None);
|
||||
loop {
|
||||
while *reader_thread_connected.read()? {
|
||||
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
|
||||
|
||||
let in_message = if raw_message.magic != network.magic() {
|
||||
@ -342,7 +381,7 @@ impl Peer {
|
||||
match in_message {
|
||||
NetworkMessage::Ping(nonce) => {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::Pong(nonce),
|
||||
));
|
||||
@ -353,19 +392,21 @@ impl Peer {
|
||||
NetworkMessage::GetData(ref inv) => {
|
||||
let (found, not_found): (Vec<_>, Vec<_>) = inv
|
||||
.iter()
|
||||
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
|
||||
.map(|item| (*item, reader_thread_mempool.get_tx(item).unwrap()))
|
||||
.partition(|(_, d)| d.is_some());
|
||||
for (_, found_tx) in found {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::Tx(found_tx.unwrap()),
|
||||
NetworkMessage::Tx(found_tx.ok_or_else(|| PeerError::Generic(
|
||||
"Got None while expecting Transaction".to_string()
|
||||
))?),
|
||||
));
|
||||
}
|
||||
|
||||
if !not_found.is_empty() {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
&mut *reader_thread_writer.lock()?,
|
||||
network.magic(),
|
||||
NetworkMessage::NotFound(
|
||||
not_found.into_iter().map(|(i, _)| i).collect(),
|
||||
@ -377,21 +418,23 @@ impl Peer {
|
||||
}
|
||||
|
||||
let message_resp = {
|
||||
let mut lock = reader_thread_responses.write().unwrap();
|
||||
let mut lock = reader_thread_responses.write()?;
|
||||
let message_resp = lock.entry(in_message.cmd()).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
let mut messages = lock.lock().unwrap();
|
||||
let mut messages = lock.lock()?;
|
||||
messages.push(in_message);
|
||||
cvar.notify_all();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a raw Bitcoin message to the peer
|
||||
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
pub fn send(&self, payload: NetworkMessage) -> Result<(), PeerError> {
|
||||
let mut writer = self.writer.lock()?;
|
||||
Self::_send(&mut writer, self.network.magic(), payload)
|
||||
}
|
||||
|
||||
@ -400,30 +443,27 @@ impl Peer {
|
||||
&self,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
|
||||
Ok(Self::_recv(&self.responses, wait_for, timeout))
|
||||
) -> Result<Option<NetworkMessage>, PeerError> {
|
||||
Self::_recv(&self.responses, wait_for, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait CompactFiltersPeer {
|
||||
fn get_cf_checkpt(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError>;
|
||||
fn get_cf_checkpt(&self, filter_type: u8, stop_hash: BlockHash)
|
||||
-> Result<CFCheckpt, PeerError>;
|
||||
fn get_cf_headers(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError>;
|
||||
) -> Result<CFHeaders, PeerError>;
|
||||
fn get_cf_filters(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError>;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
|
||||
) -> Result<(), PeerError>;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError>;
|
||||
}
|
||||
|
||||
impl CompactFiltersPeer for Peer {
|
||||
@ -431,22 +471,20 @@ impl CompactFiltersPeer for Peer {
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError> {
|
||||
) -> Result<CFCheckpt, PeerError> {
|
||||
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
|
||||
filter_type,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = self.recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFCheckpt(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFCheckpt(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(self.get_address()?));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
@ -457,35 +495,31 @@ impl CompactFiltersPeer for Peer {
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError> {
|
||||
) -> Result<CFHeaders, PeerError> {
|
||||
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
|
||||
filter_type,
|
||||
start_height,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = self.recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFHeaders(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFHeaders(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
return Err(PeerError::InvalidResponse(self.get_address()?));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
|
||||
let response = self
|
||||
.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, PeerError> {
|
||||
let response = self.recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFilter(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::CFilter(response)) => response,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
@ -496,7 +530,7 @@ impl CompactFiltersPeer for Peer {
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
) -> Result<(), PeerError> {
|
||||
self.send(NetworkMessage::GetCFilters(GetCFilters {
|
||||
filter_type,
|
||||
start_height,
|
||||
@ -508,13 +542,13 @@ impl CompactFiltersPeer for Peer {
|
||||
}
|
||||
|
||||
pub trait InvPeer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError>;
|
||||
fn ask_for_mempool(&self) -> Result<(), PeerError>;
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError>;
|
||||
}
|
||||
|
||||
impl InvPeer for Peer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, PeerError> {
|
||||
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
|
||||
block_hash,
|
||||
)]))?;
|
||||
@ -522,51 +556,126 @@ impl InvPeer for Peer {
|
||||
match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
|
||||
None => Ok(None),
|
||||
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
|
||||
_ => Err(CompactFiltersError::InvalidResponse),
|
||||
_ => Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
}
|
||||
}
|
||||
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
|
||||
fn ask_for_mempool(&self) -> Result<(), PeerError> {
|
||||
if !self.version.services.has(ServiceFlags::BLOOM) {
|
||||
return Err(CompactFiltersError::PeerBloomDisabled);
|
||||
return Err(PeerError::PeerBloomDisabled(self.get_address()?));
|
||||
}
|
||||
|
||||
self.send(NetworkMessage::MemPool)?;
|
||||
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
|
||||
None => return Ok(()), // empty mempool
|
||||
Some(NetworkMessage::Inv(inv)) => inv,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
let getdata = inv
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(
|
||||
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid)),
|
||||
|item| matches!(item, Inventory::Transaction(txid) if !self.mempool.has_tx(txid).unwrap()),
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
let num_txs = getdata.len();
|
||||
self.send(NetworkMessage::GetData(getdata))?;
|
||||
|
||||
for _ in 0..num_txs {
|
||||
let tx = self
|
||||
.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let tx = self.recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?;
|
||||
let tx = match tx {
|
||||
NetworkMessage::Tx(tx) => tx,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
Some(NetworkMessage::Tx(tx)) => tx,
|
||||
_ => return Err(PeerError::InvalidResponse(self.get_address()?)),
|
||||
};
|
||||
|
||||
self.mempool.add_tx(tx);
|
||||
self.mempool.add_tx(tx)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
|
||||
self.mempool.add_tx(tx.clone());
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), PeerError> {
|
||||
self.mempool.add_tx(tx.clone())?;
|
||||
self.send(NetworkMessage::Tx(tx))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Peer Errors
|
||||
#[derive(Debug)]
|
||||
pub enum PeerError {
|
||||
/// Internal I/O error
|
||||
Io(std::io::Error),
|
||||
|
||||
/// Internal system time error
|
||||
Time(std::time::SystemTimeError),
|
||||
|
||||
/// A peer sent an invalid or unexpected response
|
||||
InvalidResponse(SocketAddr),
|
||||
|
||||
/// Peer had bloom filter disabled
|
||||
PeerBloomDisabled(SocketAddr),
|
||||
|
||||
/// Internal Mutex poisoning error
|
||||
MutexPoisoned,
|
||||
|
||||
/// Internal Mutex wait timed out
|
||||
MutexTimedout,
|
||||
|
||||
/// Internal RW read lock poisoned
|
||||
RwReadLockPoisined,
|
||||
|
||||
/// Internal RW write lock poisoned
|
||||
RwWriteLockPoisoned,
|
||||
|
||||
/// Mempool Mutex poisoned
|
||||
MempoolPoisoned,
|
||||
|
||||
/// Network address resolution Error
|
||||
AddresseResolution,
|
||||
|
||||
/// Generic Errors
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PeerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for PeerError {}
|
||||
|
||||
impl_error!(std::io::Error, Io, PeerError);
|
||||
impl_error!(std::time::SystemTimeError, Time, PeerError);
|
||||
|
||||
impl<T> From<PoisonError<MutexGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<MutexGuard<'_, T>>) -> Self {
|
||||
PeerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockWriteGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<RwLockWriteGuard<'_, T>>) -> Self {
|
||||
PeerError::RwWriteLockPoisoned
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<RwLockReadGuard<'_, T>>> for PeerError {
|
||||
fn from(_: PoisonError<RwLockReadGuard<'_, T>>) -> Self {
|
||||
PeerError::RwReadLockPoisined
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>> for PeerError {
|
||||
fn from(err: PoisonError<(MutexGuard<'_, T>, WaitTimeoutResult)>) -> Self {
|
||||
let (_, wait_result) = err.into_inner();
|
||||
if wait_result.timed_out() {
|
||||
PeerError::MutexTimedout
|
||||
} else {
|
||||
PeerError::MutexPoisoned
|
||||
}
|
||||
}
|
||||
}
|
||||
|
578
src/blockchain/compact_filters/peermngr.rs
Normal file
578
src/blockchain/compact_filters/peermngr.rs
Normal file
@ -0,0 +1,578 @@
|
||||
use super::address_manager::{AddressManager, AddressManagerError, DiscoveryProgress};
|
||||
use super::peer::{Mempool, Peer, PeerError, TIMEOUT_SECS};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time;
|
||||
|
||||
use bitcoin::network::constants::{Network, ServiceFlags};
|
||||
use bitcoin::network::message::NetworkMessage;
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
// Peer Manager Configuration constants
|
||||
const MIN_CBF_PEERS: usize = 2;
|
||||
const MIN_TOTAL_PEERS: usize = 5;
|
||||
const MIN_CRAWLER_THREADS: usize = 20;
|
||||
const BAN_SCORE_THRESHOLD: usize = 100;
|
||||
const RECEIVE_TIMEOUT: time::Duration = time::Duration::from_secs(TIMEOUT_SECS);
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// An Error structure describing Peer Management errors
|
||||
#[derive(Debug)]
|
||||
pub enum PeerManagerError {
|
||||
// Internal Peer Error
|
||||
Peer(PeerError),
|
||||
|
||||
// Internal AddressManager Error
|
||||
AddrsManager(AddressManagerError),
|
||||
|
||||
// Os String Error
|
||||
OsString(std::ffi::OsString),
|
||||
|
||||
// Peer not found in directory
|
||||
PeerNotFound,
|
||||
|
||||
// Generic Internal Error
|
||||
Generic(String),
|
||||
}
|
||||
|
||||
impl_error!(PeerError, Peer, PeerManagerError);
|
||||
impl_error!(AddressManagerError, AddrsManager, PeerManagerError);
|
||||
impl_error!(std::ffi::OsString, OsString, PeerManagerError);
|
||||
|
||||
/// Peer Data stored in the manager's directory
|
||||
#[derive(Debug)]
|
||||
struct PeerData {
|
||||
peer: Peer,
|
||||
is_cbf: bool,
|
||||
ban_score: usize,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// A Directory structure to hold live Peers
|
||||
/// All peers in the directory have live ongoing connection
|
||||
/// Banning a peer removes it from the directory
|
||||
#[derive(Default, Debug)]
|
||||
struct PeerDirectory {
|
||||
peers: BTreeMap<SocketAddr, PeerData>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PeerDirectory {
|
||||
fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn get_cbf_peers(&self) -> Option<Vec<&PeerData>> {
|
||||
let cbf_peers = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter(|(_, peer)| peer.is_cbf)
|
||||
.map(|(_, peer)| peer)
|
||||
.collect::<Vec<&PeerData>>();
|
||||
|
||||
match cbf_peers.len() {
|
||||
0 => None,
|
||||
_ => Some(cbf_peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
let cbf_addrseses = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(addrs, peerdata)| {
|
||||
if peerdata.is_cbf {
|
||||
Some(addrs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match cbf_addrseses.len() {
|
||||
0 => None,
|
||||
_ => Some(cbf_addrseses),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_peers(&self) -> Option<Vec<&PeerData>> {
|
||||
let non_cbf_peers = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&PeerData>>();
|
||||
|
||||
match non_cbf_peers.len() {
|
||||
0 => None,
|
||||
_ => Some(non_cbf_peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
let addresses = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(
|
||||
|(addrs, peerdata)| {
|
||||
if !peerdata.is_cbf {
|
||||
Some(addrs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.copied()
|
||||
.collect::<Vec<SocketAddr>>();
|
||||
|
||||
match addresses.len() {
|
||||
0 => None,
|
||||
_ => Some(addresses),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
|
||||
let peers = self
|
||||
.peers
|
||||
.iter_mut()
|
||||
.filter(|(_, peerdata)| peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&mut PeerData>>();
|
||||
|
||||
match peers.len() {
|
||||
0 => None,
|
||||
_ => Some(peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_non_cbf_peers_mut(&mut self) -> Option<Vec<&mut PeerData>> {
|
||||
let peers = self
|
||||
.peers
|
||||
.iter_mut()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.map(|(_, peerdata)| peerdata)
|
||||
.collect::<Vec<&mut PeerData>>();
|
||||
|
||||
match peers.len() {
|
||||
0 => None,
|
||||
_ => Some(peers),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cbf_count(&self) -> usize {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| peerdata.is_cbf)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn get_non_cbf_count(&self) -> usize {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peerdata)| !peerdata.is_cbf)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn insert_peer(&mut self, peerdata: PeerData) -> Result<(), PeerManagerError> {
|
||||
let addrs = peerdata.peer.get_address()?;
|
||||
self.peers.entry(addrs).or_insert(peerdata);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_peer(&mut self, addrs: &SocketAddr) -> Option<PeerData> {
|
||||
self.peers.remove(addrs)
|
||||
}
|
||||
|
||||
fn get_peer_banscore(&self, addrs: &SocketAddr) -> Option<usize> {
|
||||
self.peers.get(addrs).map(|peerdata| peerdata.ban_score)
|
||||
}
|
||||
|
||||
fn get_peerdata_mut(&mut self, address: &SocketAddr) -> Option<&mut PeerData> {
|
||||
self.peers.get_mut(address)
|
||||
}
|
||||
|
||||
fn get_peerdata(&self, address: &SocketAddr) -> Option<&PeerData> {
|
||||
self.peers.get(address)
|
||||
}
|
||||
|
||||
fn is_cbf(&self, addrs: &SocketAddr) -> Option<bool> {
|
||||
if let Some(peer) = self.peers.get(addrs) {
|
||||
match peer.is_cbf {
|
||||
true => Some(true),
|
||||
false => Some(false),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PeerManager<P: DiscoveryProgress> {
|
||||
addrs_mngr: AddressManager<P>,
|
||||
directory: PeerDirectory,
|
||||
mempool: Arc<Mempool>,
|
||||
min_cbf: usize,
|
||||
min_total: usize,
|
||||
network: Network,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<P: DiscoveryProgress> PeerManager<P> {
|
||||
pub fn init(
|
||||
network: Network,
|
||||
cache_dir: &str,
|
||||
crawler_threads: Option<usize>,
|
||||
progress: P,
|
||||
cbf_peers: Option<usize>,
|
||||
total_peers: Option<usize>,
|
||||
) -> Result<Self, PeerManagerError> {
|
||||
let mut cache_filename = PathBuf::from(cache_dir);
|
||||
cache_filename.push("addr_cache");
|
||||
|
||||
// Fetch minimum peer requirements, either by user input, or via default
|
||||
let min_cbf = cbf_peers.unwrap_or(MIN_CBF_PEERS);
|
||||
|
||||
let min_total = total_peers.unwrap_or(MIN_TOTAL_PEERS);
|
||||
|
||||
let cbf_buff = min_cbf * 2;
|
||||
let non_cbf_buff = (min_total - min_cbf) * 2;
|
||||
|
||||
// Create internal items
|
||||
let addrs_mngr = AddressManager::new(
|
||||
network,
|
||||
cache_filename.into_os_string().into_string()?,
|
||||
crawler_threads.unwrap_or(MIN_CRAWLER_THREADS),
|
||||
Some(cbf_buff),
|
||||
Some(non_cbf_buff),
|
||||
progress,
|
||||
)?;
|
||||
|
||||
let mempool = Arc::new(Mempool::new());
|
||||
|
||||
let peer_dir = PeerDirectory::new();
|
||||
|
||||
// Create self and update
|
||||
let mut manager = Self {
|
||||
addrs_mngr,
|
||||
directory: peer_dir,
|
||||
mempool,
|
||||
min_cbf,
|
||||
min_total,
|
||||
network,
|
||||
};
|
||||
|
||||
manager.update_directory()?;
|
||||
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
fn update_directory(&mut self) -> Result<(), PeerManagerError> {
|
||||
while self.directory.get_cbf_count() < self.min_cbf
|
||||
|| self.directory.get_non_cbf_count() < (self.min_total - self.min_cbf)
|
||||
{
|
||||
// First connect with cbf peers, then with non_cbf
|
||||
let cbf_fetch = self.directory.get_cbf_count() < self.min_cbf;
|
||||
|
||||
// Try to get an address
|
||||
// if not present start crawlers
|
||||
let target_addrs = match cbf_fetch {
|
||||
true => {
|
||||
if let Some(addrs) = self.addrs_mngr.get_new_cbf_address() {
|
||||
addrs
|
||||
} else {
|
||||
self.addrs_mngr.fetch()?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
false => {
|
||||
if let Some(addrs) = self.addrs_mngr.get_new_non_cbf_address() {
|
||||
addrs
|
||||
} else {
|
||||
self.addrs_mngr.fetch()?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(peer) = Peer::connect(target_addrs, Arc::clone(&self.mempool), self.network) {
|
||||
let address = peer.get_address()?;
|
||||
|
||||
assert_eq!(address, target_addrs);
|
||||
|
||||
let is_cbf = peer
|
||||
.get_version()
|
||||
.services
|
||||
.has(ServiceFlags::COMPACT_FILTERS);
|
||||
|
||||
let peerdata = PeerData {
|
||||
peer,
|
||||
is_cbf,
|
||||
ban_score: 0,
|
||||
};
|
||||
|
||||
self.directory.insert_peer(peerdata)?;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_banscore(
|
||||
&mut self,
|
||||
increase_by: usize,
|
||||
address: &SocketAddr,
|
||||
) -> Result<(), PeerManagerError> {
|
||||
let mut current_score = if let Some(peer) = self.directory.get_peerdata_mut(address) {
|
||||
peer.ban_score
|
||||
} else {
|
||||
return Err(PeerManagerError::PeerNotFound);
|
||||
};
|
||||
|
||||
current_score += increase_by;
|
||||
|
||||
let mut banned = false;
|
||||
|
||||
if current_score >= BAN_SCORE_THRESHOLD {
|
||||
match (
|
||||
self.directory.is_cbf(address),
|
||||
self.directory.remove_peer(address),
|
||||
) {
|
||||
(Some(true), Some(_)) => {
|
||||
self.addrs_mngr.ban_peer(address, true)?;
|
||||
banned = true;
|
||||
}
|
||||
(Some(false), Some(_)) => {
|
||||
self.addrs_mngr.ban_peer(address, false)?;
|
||||
banned = true;
|
||||
}
|
||||
_ => {
|
||||
return Err(PeerManagerError::Generic(
|
||||
"data inconsistency in directory, should not happen".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if banned {
|
||||
self.update_directory()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send_to(
|
||||
&self,
|
||||
address: &SocketAddr,
|
||||
message: NetworkMessage,
|
||||
) -> Result<(), PeerManagerError> {
|
||||
if let Some(peerdata) = self.directory.get_peerdata(address) {
|
||||
peerdata.peer.send(message)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(PeerManagerError::PeerNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive_from(
|
||||
&self,
|
||||
address: &SocketAddr,
|
||||
wait_for: &'static str,
|
||||
) -> Result<Option<NetworkMessage>, PeerManagerError> {
|
||||
if let Some(peerdata) = self.directory.get_peerdata(address) {
|
||||
if let Some(response) = peerdata.peer.recv(wait_for, Some(RECEIVE_TIMEOUT))? {
|
||||
Ok(Some(response))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Err(PeerManagerError::PeerNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connected_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.directory.get_cbf_addresses()
|
||||
}
|
||||
|
||||
pub fn connected_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.directory.get_non_cbf_addresses()
|
||||
}
|
||||
|
||||
pub fn known_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_known_cbfs()
|
||||
}
|
||||
|
||||
pub fn known_non_cbf_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_known_non_cbfs()
|
||||
}
|
||||
|
||||
pub fn previously_tried_addresses(&self) -> Option<Vec<SocketAddr>> {
|
||||
self.addrs_mngr.get_previously_tried()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::super::LogDiscoveryProgress;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ban() {
|
||||
let mut manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let connected_cbfs = manager.connected_cbf_addresses().unwrap();
|
||||
let connected_non_cbfs = manager.connected_non_cbf_addresses().unwrap();
|
||||
|
||||
println!("Currently Connected CBFs: {:#?}", connected_cbfs);
|
||||
assert_eq!(connected_cbfs.len(), 2);
|
||||
assert_eq!(connected_non_cbfs.len(), 3);
|
||||
|
||||
let to_banned = &connected_cbfs[0];
|
||||
|
||||
println!("Banning address : {}", to_banned);
|
||||
|
||||
manager.set_banscore(100, to_banned).unwrap();
|
||||
|
||||
let newly_connected = manager.connected_cbf_addresses().unwrap();
|
||||
|
||||
println!("Newly Connected CBFs: {:#?}", newly_connected);
|
||||
|
||||
assert_eq!(newly_connected.len(), 2);
|
||||
|
||||
assert_ne!(newly_connected, connected_cbfs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_send_recv() {
|
||||
let manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let target_address = manager.connected_cbf_addresses().unwrap()[0];
|
||||
|
||||
let ping = NetworkMessage::Ping(30);
|
||||
|
||||
println!("Asking peer {}", target_address);
|
||||
|
||||
manager.send_to(&target_address, ping).unwrap();
|
||||
|
||||
let response = manager
|
||||
.receive_from(&target_address, "pong")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let value = value.unwrap();
|
||||
|
||||
println!("Got value {:#?}", value);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_connect_all() {
|
||||
let manager = PeerManager::init(
|
||||
Network::Bitcoin,
|
||||
".",
|
||||
None,
|
||||
LogDiscoveryProgress,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cbf_pings = vec![100u64; manager.min_cbf];
|
||||
let non_cbf_pings = vec![200u64; manager.min_total - manager.min_cbf];
|
||||
|
||||
let cbf_peers = manager.connected_cbf_addresses().unwrap();
|
||||
let non_cbf_peers = manager.connected_non_cbf_addresses().unwrap();
|
||||
|
||||
let sent_cbf: Vec<bool> = cbf_pings
|
||||
.iter()
|
||||
.zip(cbf_peers.iter())
|
||||
.map(|(ping, address)| {
|
||||
let message = NetworkMessage::Ping(*ping);
|
||||
manager.send_to(address, message).unwrap();
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(sent_cbf, vec![true; manager.min_cbf]);
|
||||
|
||||
println!("Sent pings to cbf peers");
|
||||
|
||||
let sent_noncbf: Vec<bool> = non_cbf_pings
|
||||
.iter()
|
||||
.zip(non_cbf_peers.iter())
|
||||
.map(|(ping, address)| {
|
||||
let message = NetworkMessage::Ping(*ping);
|
||||
manager.send_to(address, message).unwrap();
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(sent_noncbf, vec![true; manager.min_total - manager.min_cbf]);
|
||||
|
||||
println!("Sent pings to non cbf peers");
|
||||
|
||||
let cbf_received: Vec<u64> = cbf_peers
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let response = manager.receive_from(address, "pong").unwrap().unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
value.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let non_cbf_received: Vec<u64> = non_cbf_peers
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let response = manager.receive_from(address, "pong").unwrap().unwrap();
|
||||
|
||||
let value = match response {
|
||||
NetworkMessage::Pong(v) => Some(v),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
value.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(cbf_pings, cbf_received);
|
||||
|
||||
assert_eq!(non_cbf_pings, non_cbf_received);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user