From ddc2bded99f9eaf9ce379aacb4ea88e9cb5314d9 Mon Sep 17 00:00:00 2001 From: Alekos Filini Date: Sat, 29 Aug 2020 19:40:45 +0200 Subject: [PATCH] [compact_filters] Add support for Tor --- Cargo.toml | 3 +- src/blockchain/compact_filters/mod.rs | 91 ++++++++++++------------- src/blockchain/compact_filters/peer.rs | 53 ++++++++++---- src/blockchain/compact_filters/store.rs | 40 +++++++---- src/blockchain/compact_filters/sync.rs | 51 ++++++++------ 5 files changed, 138 insertions(+), 100 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 91caadf9..8b5bea3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ clap = { version = "2.33", optional = true } base64 = { version = "^0.11", optional = true } async-trait = { version = "0.1", optional = true } rocksdb = { version = "0.14", optional = true } +socks = { version = "0.3", optional = true } lazy_static = { version = "1.4", optional = true } # Platform-specific dependencies @@ -38,7 +39,7 @@ compiler = ["clap", "miniscript/compiler"] default = ["key-value-db", "electrum"] electrum = ["electrum-client"] esplora = ["reqwest", "futures"] -compact_filters = ["rocksdb", "lazy_static"] +compact_filters = ["rocksdb", "socks", "lazy_static"] key-value-db = ["sled"] cli-utils = ["clap", "base64"] async-interface = ["async-trait"] diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index f32b28b2..4e33af46 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -1,6 +1,4 @@ use std::collections::HashSet; -use std::fmt; -use std::net::ToSocketAddrs; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -9,7 +7,7 @@ use std::sync::{Arc, Mutex}; use log::{debug, error, info, trace}; use bitcoin::network::message_blockdata::Inventory; -use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid}; +use bitcoin::{BitcoinHash, OutPoint, Transaction, Txid}; use rocksdb::{Options, SliceTransform, DB}; @@ -27,6 +25,8 @@ use peer::*; use store::*; use sync::*; +pub use peer::{Mempool, Peer}; + const SYNC_HEADERS_COST: f32 = 1.0; const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0; const PROCESS_BLOCKS_COST: f32 = 20_000.0; @@ -35,45 +35,45 @@ const PROCESS_BLOCKS_COST: f32 = 20_000.0; pub struct CompactFiltersBlockchain(Option); impl CompactFiltersBlockchain { - pub fn new>( - address: A, + pub fn new>( + peers: Vec, storage_dir: P, - num_threads: usize, - skip_blocks: usize, - network: Network, + skip_blocks: Option, ) -> Result { Ok(CompactFiltersBlockchain(Some(CompactFilters::new( - address, + peers, storage_dir, - num_threads, skip_blocks, - network, )?))) } } +#[derive(Debug)] struct CompactFilters { - peer: Arc, - headers: Arc>, - skip_blocks: usize, - num_threads: usize, + peers: Vec>, + headers: Arc>, + skip_blocks: Option, } impl CompactFilters { - pub fn new>( - address: A, + pub fn new>( + peers: Vec, storage_dir: P, - num_threads: usize, - skip_blocks: usize, - network: Network, + skip_blocks: Option, ) -> Result { + if peers.is_empty() { + return Err(CompactFiltersError::NoPeers); + } + let mut opts = Options::default(); opts.create_if_missing(true); opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16)); + let network = peers[0].get_network(); + let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]); let db = DB::open_cf(&opts, &storage_dir, &cfs)?; - let headers = Arc::new(HeadersStore::new(db, network)?); + let headers = Arc::new(ChainStore::new(db, network)?); // try to recover partial snapshots for cf_name in &cfs { @@ -86,8 +86,7 @@ impl CompactFilters { } Ok(CompactFilters { - peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?), - num_threads, + peers: peers.into_iter().map(Arc::new).collect(), headers, skip_blocks, }) @@ -191,16 +190,15 @@ impl OnlineBlockchain for CompactFiltersBlockchain { progress_update: P, ) -> Result<(), Error> { let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; + let first_peer = &inner.peers[0]; - let cf_sync = Arc::new(CFSync::new( - Arc::clone(&inner.headers), - inner.skip_blocks, - 0x00, - )?); + let skip_blocks = inner.skip_blocks.unwrap_or(0); + + let cf_sync = Arc::new(CFSync::new(Arc::clone(&inner.headers), skip_blocks, 0x00)?); let initial_height = inner.headers.get_height()?; - let total_bundles = (inner.peer.get_version().start_height as usize) - .checked_sub(inner.skip_blocks) + let total_bundles = (first_peer.get_version().start_height as usize) + .checked_sub(skip_blocks) .map(|x| x / 1000) .unwrap_or(0) + 1; @@ -208,7 +206,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain { .checked_sub(cf_sync.pruned_bundles()?) .unwrap_or(0); - let headers_cost = (inner.peer.get_version().start_height as usize) + let headers_cost = (first_peer.get_version().start_height as usize) .checked_sub(initial_height) .unwrap_or(0) as f32 * SYNC_HEADERS_COST; @@ -217,7 +215,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain { let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST; if let Some(snapshot) = sync::sync_headers( - Arc::clone(&inner.peer), + Arc::clone(&first_peer), Arc::clone(&inner.headers), |new_height| { let local_headers_cost = @@ -240,7 +238,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain { .unwrap_or(0); info!("Synced headers to height: {}", synced_height); - cf_sync.prepare_sync(Arc::clone(&inner.peer))?; + cf_sync.prepare_sync(Arc::clone(&first_peer))?; let all_scripts = Arc::new( database @@ -254,10 +252,10 @@ impl OnlineBlockchain for CompactFiltersBlockchain { let synced_bundles = Arc::new(AtomicUsize::new(0)); let progress_update = Arc::new(Mutex::new(progress_update)); - let mut threads = Vec::with_capacity(inner.num_threads); - for _ in 0..inner.num_threads { + let mut threads = Vec::with_capacity(inner.peers.len()); + for peer in &inner.peers { let cf_sync = Arc::clone(&cf_sync); - let peer = Arc::new(inner.peer.new_connection()?); + let peer = Arc::clone(&peer); let headers = Arc::clone(&inner.headers); let all_scripts = Arc::clone(&all_scripts); let last_synced_block = Arc::clone(&last_synced_block); @@ -336,7 +334,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain { } database.commit_batch(updates)?; - inner.peer.ask_for_mempool()?; + first_peer.ask_for_mempool()?; let mut internal_max_deriv = 0; let mut external_max_deriv = 0; @@ -353,7 +351,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain { )?; } } - for tx in inner.peer.get_mempool().iter_txs().iter() { + for tx in first_peer.get_mempool().iter_txs().iter() { inner.process_tx( database, tx, @@ -392,15 +390,14 @@ impl OnlineBlockchain for CompactFiltersBlockchain { fn get_tx(&self, txid: &Txid) -> Result, Error> { let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; - Ok(inner - .peer + Ok(inner.peers[0] .get_mempool() .get_tx(&Inventory::Transaction(*txid))) } fn broadcast(&self, tx: &Transaction) -> Result<(), Error> { let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; - inner.peer.broadcast_tx(tx.clone())?; + inner.peers[0].broadcast_tx(tx.clone())?; Ok(()) } @@ -417,14 +414,6 @@ impl OnlineBlockchain for CompactFiltersBlockchain { } } -impl fmt::Debug for CompactFilters { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CompactFilters") - .field("peer", &self.peer) - .finish() - } -} - #[derive(Debug)] pub enum CompactFiltersError { InvalidResponse, @@ -433,8 +422,12 @@ pub enum CompactFiltersError { InvalidFilter, MissingBlock, DataCorruption, + + NotConnected, Timeout, + NoPeers, + DB(rocksdb::Error), IO(std::io::Error), BIP158(bitcoin::util::bip158::Error), diff --git a/src/blockchain/compact_filters/peer.rs b/src/blockchain/compact_filters/peer.rs index 58932833..33120096 100644 --- a/src/blockchain/compact_filters/peer.rs +++ b/src/blockchain/compact_filters/peer.rs @@ -4,6 +4,8 @@ use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use socks::{Socks5Stream, ToTargetAddr}; + use rand::{thread_rng, Rng}; use bitcoin::consensus::Encodable; @@ -22,6 +24,8 @@ use super::CompactFiltersError; type ResponsesMap = HashMap<&'static str, Arc<(Mutex>, Condvar)>>; +pub(crate) const TIMEOUT_SECS: u64 = 10; + #[derive(Debug, Default)] pub struct Mempool { txs: RwLock>, @@ -70,9 +74,33 @@ impl Peer { mempool: Arc, network: Network, ) -> Result { - let connection = TcpStream::connect(address)?; + let stream = TcpStream::connect(address)?; - let writer = Arc::new(Mutex::new(connection.try_clone()?)); + Peer::from_stream(stream, mempool, network) + } + + pub fn new_proxy( + target: T, + proxy: P, + credentials: Option<(&str, &str)>, + mempool: Arc, + network: Network, + ) -> Result { + let socks_stream = if let Some((username, password)) = credentials { + Socks5Stream::connect_with_password(proxy, target, username, password)? + } else { + Socks5Stream::connect(proxy, target)? + }; + + Peer::from_stream(socks_stream.into_inner(), mempool, network) + } + + fn from_stream( + stream: TcpStream, + mempool: Arc, + network: Network, + ) -> Result { + let writer = Arc::new(Mutex::new(stream.try_clone()?)); let responses: Arc> = Arc::new(RwLock::new(HashMap::new())); let connected = Arc::new(RwLock::new(true)); @@ -85,7 +113,7 @@ impl Peer { let reader_thread = thread::spawn(move || { Self::reader_thread( network, - connection, + stream, reader_thread_responses, reader_thread_writer, reader_thread_mempool, @@ -142,11 +170,6 @@ impl Peer { }) } - pub fn new_connection(&self) -> Result { - let socket_addr = self.writer.lock().unwrap().peer_addr()?; - Self::new(socket_addr, Arc::clone(&self.mempool), self.network) - } - fn _send( writer: &mut TcpStream, magic: u32, @@ -198,6 +221,10 @@ impl Peer { &self.version } + pub fn get_network(&self) -> Network { + self.network + } + pub fn get_mempool(&self) -> Arc { Arc::clone(&self.mempool) } @@ -337,7 +364,7 @@ impl CompactFiltersPeer for Peer { }))?; let response = self - .recv("cfcheckpt", Some(Duration::from_secs(10)))? + .recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))? .ok_or(CompactFiltersError::Timeout)?; let response = match response { NetworkMessage::CFCheckpt(response) => response, @@ -364,7 +391,7 @@ impl CompactFiltersPeer for Peer { }))?; let response = self - .recv("cfheaders", Some(Duration::from_secs(10)))? + .recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))? .ok_or(CompactFiltersError::Timeout)?; let response = match response { NetworkMessage::CFHeaders(response) => response, @@ -380,7 +407,7 @@ impl CompactFiltersPeer for Peer { fn pop_cf_filter_resp(&self) -> Result { let response = self - .recv("cfilter", Some(Duration::from_secs(10)))? + .recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))? .ok_or(CompactFiltersError::Timeout)?; let response = match response { NetworkMessage::CFilter(response) => response, @@ -418,7 +445,7 @@ impl InvPeer for Peer { block_hash, )]))?; - match self.recv("block", Some(Duration::from_secs(10)))? { + match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? { None => Ok(None), Some(NetworkMessage::Block(response)) => Ok(Some(response)), _ => Err(CompactFiltersError::InvalidResponse), @@ -446,7 +473,7 @@ impl InvPeer for Peer { for _ in 0..num_txs { let tx = self - .recv("tx", Some(Duration::from_secs(10)))? + .recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))? .ok_or(CompactFiltersError::Timeout)?; let tx = match tx { NetworkMessage::Tx(tx) => tx, diff --git a/src/blockchain/compact_filters/store.rs b/src/blockchain/compact_filters/store.rs index 81f24d93..bfd570cd 100644 --- a/src/blockchain/compact_filters/store.rs +++ b/src/blockchain/compact_filters/store.rs @@ -1,4 +1,5 @@ use std::convert::TryInto; +use std::fmt; use std::io::{Read, Write}; use std::marker::PhantomData; use std::ops::Deref; @@ -30,12 +31,12 @@ lazy_static! { static ref REGTEST_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); } -pub trait StoreType: Default {} +pub trait StoreType: Default + fmt::Debug {} -#[derive(Default)] +#[derive(Default, Debug)] pub struct Full; impl StoreType for Full {} -#[derive(Default)] +#[derive(Default, Debug)] pub struct Snapshot; impl StoreType for Snapshot {} @@ -226,7 +227,7 @@ impl Decodable for BundleStatus { } } -pub struct HeadersStore { +pub struct ChainStore { store: Arc>, cf_name: String, min_height: usize, @@ -234,7 +235,7 @@ pub struct HeadersStore { phantom: PhantomData, } -impl HeadersStore { +impl ChainStore { pub fn new(store: DB, network: Network) -> Result { let genesis = match network { Network::Bitcoin => MAINNET_GENESIS.deref(), @@ -262,7 +263,7 @@ impl HeadersStore { store.write(batch)?; } - Ok(HeadersStore { + Ok(ChainStore { store: Arc::new(RwLock::new(store)), cf_name, min_height: 0, @@ -301,10 +302,7 @@ impl HeadersStore { Ok(answer) } - pub fn start_snapshot( - &self, - from: usize, - ) -> Result, CompactFiltersError> { + pub fn start_snapshot(&self, from: usize) -> Result, CompactFiltersError> { let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect(); let new_cf_name = format!("_headers:{}", new_cf_name); @@ -335,7 +333,7 @@ impl HeadersStore { write_store.write(batch)?; let store = Arc::clone(&self.store); - Ok(HeadersStore { + Ok(ChainStore { store, cf_name: new_cf_name, min_height: from, @@ -367,7 +365,7 @@ impl HeadersStore { std::mem::drop(iterator); std::mem::drop(write_store); - let snapshot = HeadersStore { + let snapshot = ChainStore { store: Arc::clone(&self.store), cf_name: cf_name.into(), min_height, @@ -383,7 +381,7 @@ impl HeadersStore { pub fn apply_snapshot( &self, - snaphost: HeadersStore, + snaphost: ChainStore, ) -> Result<(), CompactFiltersError> { let mut batch = WriteBatch::default(); @@ -523,7 +521,7 @@ impl HeadersStore { } } -impl HeadersStore { +impl ChainStore { pub fn work(&self) -> Result { let read_store = self.store.read().unwrap(); let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); @@ -629,6 +627,18 @@ impl HeadersStore { } } +impl fmt::Debug for ChainStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(&format!("ChainStore<{:?}>", T::default())) + .field("cf_name", &self.cf_name) + .field("min_height", &self.min_height) + .field("network", &self.network) + .field("headers_height", &self.get_height()) + .field("tip_hash", &self.get_tip_hash()) + .finish() + } +} + pub type FilterHeaderHash = FilterHash; #[derive(Debug, Clone)] @@ -663,7 +673,7 @@ type BundleEntry = (BundleStatus, FilterHeaderHash); impl CFStore { pub fn new( - headers_store: &HeadersStore, + headers_store: &ChainStore, filter_type: u8, ) -> Result { let cf_store = CFStore { diff --git a/src/blockchain/compact_filters/sync.rs b/src/blockchain/compact_filters/sync.rs index 57b65f8b..3c09f687 100644 --- a/src/blockchain/compact_filters/sync.rs +++ b/src/blockchain/compact_filters/sync.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use bitcoin::hash_types::{BlockHash, FilterHash}; use bitcoin::network::message::NetworkMessage; @@ -9,11 +10,12 @@ use bitcoin::util::bip158::BlockFilter; use super::peer::*; use super::store::*; use super::CompactFiltersError; +use crate::error::Error; pub(crate) const BURIED_CONFIRMATIONS: usize = 100; pub struct CFSync { - headers_store: Arc>, + headers_store: Arc>, cf_store: Arc, skip_blocks: usize, bundles: Mutex>, @@ -21,7 +23,7 @@ pub struct CFSync { impl CFSync { pub fn new( - headers_store: Arc>, + headers_store: Arc>, skip_blocks: usize, filter_type: u8, ) -> Result { @@ -72,7 +74,7 @@ impl CFSync { ) -> Result<(), CompactFiltersError> where F: Fn(&BlockHash, &BlockFilter) -> Result, - Q: Fn(usize) -> Result<(), crate::error::Error>, + Q: Fn(usize) -> Result<(), Error>, { let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated @@ -230,11 +232,11 @@ impl CFSync { pub fn sync_headers( peer: Arc, - store: Arc>, + store: Arc>, sync_fn: F, -) -> Result>, CompactFiltersError> +) -> Result>, CompactFiltersError> where - F: Fn(usize) -> Result<(), crate::error::Error>, + F: Fn(usize) -> Result<(), Error>, { let locators = store.get_locators()?; let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect(); @@ -244,22 +246,24 @@ where locators_vec, Default::default(), )))?; - let (mut snapshot, mut last_hash) = - if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { - if headers.is_empty() { - return Ok(None); - } + let (mut snapshot, mut last_hash) = if let NetworkMessage::Headers(headers) = peer + .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))? + .ok_or(CompactFiltersError::Timeout)? + { + if headers.is_empty() { + return Ok(None); + } - match locators_map.get(&headers[0].prev_blockhash) { - None => return Err(CompactFiltersError::InvalidHeaders), - Some(from) => ( - store.start_snapshot(*from)?, - headers[0].prev_blockhash.clone(), - ), - } - } else { - return Err(CompactFiltersError::InvalidResponse); - }; + match locators_map.get(&headers[0].prev_blockhash) { + None => return Err(CompactFiltersError::InvalidHeaders), + Some(from) => ( + store.start_snapshot(*from)?, + headers[0].prev_blockhash.clone(), + ), + } + } else { + return Err(CompactFiltersError::InvalidResponse); + }; let mut sync_height = store.get_height()?; while sync_height < peer.get_version().start_height as usize { @@ -267,7 +271,10 @@ where vec![last_hash], Default::default(), )))?; - if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { + if let NetworkMessage::Headers(headers) = peer + .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))? + .ok_or(CompactFiltersError::Timeout)? + { let batch_len = headers.len(); last_hash = snapshot.apply(sync_height, headers)?;