[compact_filters] Add support for Tor

This commit is contained in:
Alekos Filini 2020-08-29 19:40:45 +02:00
parent 77c95b93ac
commit ddc2bded99
No known key found for this signature in database
GPG Key ID: 5E8AFC3034FDFA4F
5 changed files with 138 additions and 100 deletions

View File

@ -22,6 +22,7 @@ clap = { version = "2.33", optional = true }
base64 = { version = "^0.11", optional = true } base64 = { version = "^0.11", optional = true }
async-trait = { version = "0.1", optional = true } async-trait = { version = "0.1", optional = true }
rocksdb = { version = "0.14", optional = true } rocksdb = { version = "0.14", optional = true }
socks = { version = "0.3", optional = true }
lazy_static = { version = "1.4", optional = true } lazy_static = { version = "1.4", optional = true }
# Platform-specific dependencies # Platform-specific dependencies
@ -38,7 +39,7 @@ compiler = ["clap", "miniscript/compiler"]
default = ["key-value-db", "electrum"] default = ["key-value-db", "electrum"]
electrum = ["electrum-client"] electrum = ["electrum-client"]
esplora = ["reqwest", "futures"] esplora = ["reqwest", "futures"]
compact_filters = ["rocksdb", "lazy_static"] compact_filters = ["rocksdb", "socks", "lazy_static"]
key-value-db = ["sled"] key-value-db = ["sled"]
cli-utils = ["clap", "base64"] cli-utils = ["clap", "base64"]
async-interface = ["async-trait"] async-interface = ["async-trait"]

View File

@ -1,6 +1,4 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt;
use std::net::ToSocketAddrs;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -9,7 +7,7 @@ use std::sync::{Arc, Mutex};
use log::{debug, error, info, trace}; use log::{debug, error, info, trace};
use bitcoin::network::message_blockdata::Inventory; use bitcoin::network::message_blockdata::Inventory;
use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid}; use bitcoin::{BitcoinHash, OutPoint, Transaction, Txid};
use rocksdb::{Options, SliceTransform, DB}; use rocksdb::{Options, SliceTransform, DB};
@ -27,6 +25,8 @@ use peer::*;
use store::*; use store::*;
use sync::*; use sync::*;
pub use peer::{Mempool, Peer};
const SYNC_HEADERS_COST: f32 = 1.0; const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0; const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0; const PROCESS_BLOCKS_COST: f32 = 20_000.0;
@ -35,45 +35,45 @@ const PROCESS_BLOCKS_COST: f32 = 20_000.0;
pub struct CompactFiltersBlockchain(Option<CompactFilters>); pub struct CompactFiltersBlockchain(Option<CompactFilters>);
impl CompactFiltersBlockchain { impl CompactFiltersBlockchain {
pub fn new<A: ToSocketAddrs, P: AsRef<Path>>( pub fn new<P: AsRef<Path>>(
address: A, peers: Vec<Peer>,
storage_dir: P, storage_dir: P,
num_threads: usize, skip_blocks: Option<usize>,
skip_blocks: usize,
network: Network,
) -> Result<Self, CompactFiltersError> { ) -> Result<Self, CompactFiltersError> {
Ok(CompactFiltersBlockchain(Some(CompactFilters::new( Ok(CompactFiltersBlockchain(Some(CompactFilters::new(
address, peers,
storage_dir, storage_dir,
num_threads,
skip_blocks, skip_blocks,
network,
)?))) )?)))
} }
} }
#[derive(Debug)]
struct CompactFilters { struct CompactFilters {
peer: Arc<Peer>, peers: Vec<Arc<Peer>>,
headers: Arc<HeadersStore<Full>>, headers: Arc<ChainStore<Full>>,
skip_blocks: usize, skip_blocks: Option<usize>,
num_threads: usize,
} }
impl CompactFilters { impl CompactFilters {
pub fn new<A: ToSocketAddrs, P: AsRef<Path>>( pub fn new<P: AsRef<Path>>(
address: A, peers: Vec<Peer>,
storage_dir: P, storage_dir: P,
num_threads: usize, skip_blocks: Option<usize>,
skip_blocks: usize,
network: Network,
) -> Result<Self, CompactFiltersError> { ) -> Result<Self, CompactFiltersError> {
if peers.is_empty() {
return Err(CompactFiltersError::NoPeers);
}
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16)); opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
let network = peers[0].get_network();
let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]); let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]);
let db = DB::open_cf(&opts, &storage_dir, &cfs)?; let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
let headers = Arc::new(HeadersStore::new(db, network)?); let headers = Arc::new(ChainStore::new(db, network)?);
// try to recover partial snapshots // try to recover partial snapshots
for cf_name in &cfs { for cf_name in &cfs {
@ -86,8 +86,7 @@ impl CompactFilters {
} }
Ok(CompactFilters { Ok(CompactFilters {
peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?), peers: peers.into_iter().map(Arc::new).collect(),
num_threads,
headers, headers,
skip_blocks, skip_blocks,
}) })
@ -191,16 +190,15 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
progress_update: P, progress_update: P,
) -> Result<(), Error> { ) -> Result<(), Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
let first_peer = &inner.peers[0];
let cf_sync = Arc::new(CFSync::new( let skip_blocks = inner.skip_blocks.unwrap_or(0);
Arc::clone(&inner.headers),
inner.skip_blocks, let cf_sync = Arc::new(CFSync::new(Arc::clone(&inner.headers), skip_blocks, 0x00)?);
0x00,
)?);
let initial_height = inner.headers.get_height()?; let initial_height = inner.headers.get_height()?;
let total_bundles = (inner.peer.get_version().start_height as usize) let total_bundles = (first_peer.get_version().start_height as usize)
.checked_sub(inner.skip_blocks) .checked_sub(skip_blocks)
.map(|x| x / 1000) .map(|x| x / 1000)
.unwrap_or(0) .unwrap_or(0)
+ 1; + 1;
@ -208,7 +206,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
.checked_sub(cf_sync.pruned_bundles()?) .checked_sub(cf_sync.pruned_bundles()?)
.unwrap_or(0); .unwrap_or(0);
let headers_cost = (inner.peer.get_version().start_height as usize) let headers_cost = (first_peer.get_version().start_height as usize)
.checked_sub(initial_height) .checked_sub(initial_height)
.unwrap_or(0) as f32 .unwrap_or(0) as f32
* SYNC_HEADERS_COST; * SYNC_HEADERS_COST;
@ -217,7 +215,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST; let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
if let Some(snapshot) = sync::sync_headers( if let Some(snapshot) = sync::sync_headers(
Arc::clone(&inner.peer), Arc::clone(&first_peer),
Arc::clone(&inner.headers), Arc::clone(&inner.headers),
|new_height| { |new_height| {
let local_headers_cost = let local_headers_cost =
@ -240,7 +238,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
.unwrap_or(0); .unwrap_or(0);
info!("Synced headers to height: {}", synced_height); info!("Synced headers to height: {}", synced_height);
cf_sync.prepare_sync(Arc::clone(&inner.peer))?; cf_sync.prepare_sync(Arc::clone(&first_peer))?;
let all_scripts = Arc::new( let all_scripts = Arc::new(
database database
@ -254,10 +252,10 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
let synced_bundles = Arc::new(AtomicUsize::new(0)); let synced_bundles = Arc::new(AtomicUsize::new(0));
let progress_update = Arc::new(Mutex::new(progress_update)); let progress_update = Arc::new(Mutex::new(progress_update));
let mut threads = Vec::with_capacity(inner.num_threads); let mut threads = Vec::with_capacity(inner.peers.len());
for _ in 0..inner.num_threads { for peer in &inner.peers {
let cf_sync = Arc::clone(&cf_sync); let cf_sync = Arc::clone(&cf_sync);
let peer = Arc::new(inner.peer.new_connection()?); let peer = Arc::clone(&peer);
let headers = Arc::clone(&inner.headers); let headers = Arc::clone(&inner.headers);
let all_scripts = Arc::clone(&all_scripts); let all_scripts = Arc::clone(&all_scripts);
let last_synced_block = Arc::clone(&last_synced_block); let last_synced_block = Arc::clone(&last_synced_block);
@ -336,7 +334,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
} }
database.commit_batch(updates)?; database.commit_batch(updates)?;
inner.peer.ask_for_mempool()?; first_peer.ask_for_mempool()?;
let mut internal_max_deriv = 0; let mut internal_max_deriv = 0;
let mut external_max_deriv = 0; let mut external_max_deriv = 0;
@ -353,7 +351,7 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
)?; )?;
} }
} }
for tx in inner.peer.get_mempool().iter_txs().iter() { for tx in first_peer.get_mempool().iter_txs().iter() {
inner.process_tx( inner.process_tx(
database, database,
tx, tx,
@ -392,15 +390,14 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> { fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
Ok(inner Ok(inner.peers[0]
.peer
.get_mempool() .get_mempool()
.get_tx(&Inventory::Transaction(*txid))) .get_tx(&Inventory::Transaction(*txid)))
} }
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> { fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
inner.peer.broadcast_tx(tx.clone())?; inner.peers[0].broadcast_tx(tx.clone())?;
Ok(()) Ok(())
} }
@ -417,14 +414,6 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
} }
} }
impl fmt::Debug for CompactFilters {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactFilters")
.field("peer", &self.peer)
.finish()
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum CompactFiltersError { pub enum CompactFiltersError {
InvalidResponse, InvalidResponse,
@ -433,8 +422,12 @@ pub enum CompactFiltersError {
InvalidFilter, InvalidFilter,
MissingBlock, MissingBlock,
DataCorruption, DataCorruption,
NotConnected,
Timeout, Timeout,
NoPeers,
DB(rocksdb::Error), DB(rocksdb::Error),
IO(std::io::Error), IO(std::io::Error),
BIP158(bitcoin::util::bip158::Error), BIP158(bitcoin::util::bip158::Error),

View File

@ -4,6 +4,8 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread; use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use socks::{Socks5Stream, ToTargetAddr};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use bitcoin::consensus::Encodable; use bitcoin::consensus::Encodable;
@ -22,6 +24,8 @@ use super::CompactFiltersError;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>; type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
pub(crate) const TIMEOUT_SECS: u64 = 10;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Mempool { pub struct Mempool {
txs: RwLock<HashMap<Txid, Transaction>>, txs: RwLock<HashMap<Txid, Transaction>>,
@ -70,9 +74,33 @@ impl Peer {
mempool: Arc<Mempool>, mempool: Arc<Mempool>,
network: Network, network: Network,
) -> Result<Self, CompactFiltersError> { ) -> Result<Self, CompactFiltersError> {
let connection = TcpStream::connect(address)?; let stream = TcpStream::connect(address)?;
let writer = Arc::new(Mutex::new(connection.try_clone()?)); Peer::from_stream(stream, mempool, network)
}
pub fn new_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
target: T,
proxy: P,
credentials: Option<(&str, &str)>,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
let socks_stream = if let Some((username, password)) = credentials {
Socks5Stream::connect_with_password(proxy, target, username, password)?
} else {
Socks5Stream::connect(proxy, target)?
};
Peer::from_stream(socks_stream.into_inner(), mempool, network)
}
fn from_stream(
stream: TcpStream,
mempool: Arc<Mempool>,
network: Network,
) -> Result<Self, CompactFiltersError> {
let writer = Arc::new(Mutex::new(stream.try_clone()?));
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new())); let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
let connected = Arc::new(RwLock::new(true)); let connected = Arc::new(RwLock::new(true));
@ -85,7 +113,7 @@ impl Peer {
let reader_thread = thread::spawn(move || { let reader_thread = thread::spawn(move || {
Self::reader_thread( Self::reader_thread(
network, network,
connection, stream,
reader_thread_responses, reader_thread_responses,
reader_thread_writer, reader_thread_writer,
reader_thread_mempool, reader_thread_mempool,
@ -142,11 +170,6 @@ impl Peer {
}) })
} }
pub fn new_connection(&self) -> Result<Self, CompactFiltersError> {
let socket_addr = self.writer.lock().unwrap().peer_addr()?;
Self::new(socket_addr, Arc::clone(&self.mempool), self.network)
}
fn _send( fn _send(
writer: &mut TcpStream, writer: &mut TcpStream,
magic: u32, magic: u32,
@ -198,6 +221,10 @@ impl Peer {
&self.version &self.version
} }
pub fn get_network(&self) -> Network {
self.network
}
pub fn get_mempool(&self) -> Arc<Mempool> { pub fn get_mempool(&self) -> Arc<Mempool> {
Arc::clone(&self.mempool) Arc::clone(&self.mempool)
} }
@ -337,7 +364,7 @@ impl CompactFiltersPeer for Peer {
}))?; }))?;
let response = self let response = self
.recv("cfcheckpt", Some(Duration::from_secs(10)))? .recv("cfcheckpt", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?; .ok_or(CompactFiltersError::Timeout)?;
let response = match response { let response = match response {
NetworkMessage::CFCheckpt(response) => response, NetworkMessage::CFCheckpt(response) => response,
@ -364,7 +391,7 @@ impl CompactFiltersPeer for Peer {
}))?; }))?;
let response = self let response = self
.recv("cfheaders", Some(Duration::from_secs(10)))? .recv("cfheaders", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?; .ok_or(CompactFiltersError::Timeout)?;
let response = match response { let response = match response {
NetworkMessage::CFHeaders(response) => response, NetworkMessage::CFHeaders(response) => response,
@ -380,7 +407,7 @@ impl CompactFiltersPeer for Peer {
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> { fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
let response = self let response = self
.recv("cfilter", Some(Duration::from_secs(10)))? .recv("cfilter", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?; .ok_or(CompactFiltersError::Timeout)?;
let response = match response { let response = match response {
NetworkMessage::CFilter(response) => response, NetworkMessage::CFilter(response) => response,
@ -418,7 +445,7 @@ impl InvPeer for Peer {
block_hash, block_hash,
)]))?; )]))?;
match self.recv("block", Some(Duration::from_secs(10)))? { match self.recv("block", Some(Duration::from_secs(TIMEOUT_SECS)))? {
None => Ok(None), None => Ok(None),
Some(NetworkMessage::Block(response)) => Ok(Some(response)), Some(NetworkMessage::Block(response)) => Ok(Some(response)),
_ => Err(CompactFiltersError::InvalidResponse), _ => Err(CompactFiltersError::InvalidResponse),
@ -446,7 +473,7 @@ impl InvPeer for Peer {
for _ in 0..num_txs { for _ in 0..num_txs {
let tx = self let tx = self
.recv("tx", Some(Duration::from_secs(10)))? .recv("tx", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?; .ok_or(CompactFiltersError::Timeout)?;
let tx = match tx { let tx = match tx {
NetworkMessage::Tx(tx) => tx, NetworkMessage::Tx(tx) => tx,

View File

@ -1,4 +1,5 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::fmt;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
@ -30,12 +31,12 @@ lazy_static! {
static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
} }
pub trait StoreType: Default {} pub trait StoreType: Default + fmt::Debug {}
#[derive(Default)] #[derive(Default, Debug)]
pub struct Full; pub struct Full;
impl StoreType for Full {} impl StoreType for Full {}
#[derive(Default)] #[derive(Default, Debug)]
pub struct Snapshot; pub struct Snapshot;
impl StoreType for Snapshot {} impl StoreType for Snapshot {}
@ -226,7 +227,7 @@ impl Decodable for BundleStatus {
} }
} }
pub struct HeadersStore<T: StoreType> { pub struct ChainStore<T: StoreType> {
store: Arc<RwLock<DB>>, store: Arc<RwLock<DB>>,
cf_name: String, cf_name: String,
min_height: usize, min_height: usize,
@ -234,7 +235,7 @@ pub struct HeadersStore<T: StoreType> {
phantom: PhantomData<T>, phantom: PhantomData<T>,
} }
impl HeadersStore<Full> { impl ChainStore<Full> {
pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> { pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
let genesis = match network { let genesis = match network {
Network::Bitcoin => MAINNET_GENESIS.deref(), Network::Bitcoin => MAINNET_GENESIS.deref(),
@ -262,7 +263,7 @@ impl HeadersStore<Full> {
store.write(batch)?; store.write(batch)?;
} }
Ok(HeadersStore { Ok(ChainStore {
store: Arc::new(RwLock::new(store)), store: Arc::new(RwLock::new(store)),
cf_name, cf_name,
min_height: 0, min_height: 0,
@ -301,10 +302,7 @@ impl HeadersStore<Full> {
Ok(answer) Ok(answer)
} }
pub fn start_snapshot( pub fn start_snapshot(&self, from: usize) -> Result<ChainStore<Snapshot>, CompactFiltersError> {
&self,
from: usize,
) -> Result<HeadersStore<Snapshot>, CompactFiltersError> {
let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect(); let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
let new_cf_name = format!("_headers:{}", new_cf_name); let new_cf_name = format!("_headers:{}", new_cf_name);
@ -335,7 +333,7 @@ impl HeadersStore<Full> {
write_store.write(batch)?; write_store.write(batch)?;
let store = Arc::clone(&self.store); let store = Arc::clone(&self.store);
Ok(HeadersStore { Ok(ChainStore {
store, store,
cf_name: new_cf_name, cf_name: new_cf_name,
min_height: from, min_height: from,
@ -367,7 +365,7 @@ impl HeadersStore<Full> {
std::mem::drop(iterator); std::mem::drop(iterator);
std::mem::drop(write_store); std::mem::drop(write_store);
let snapshot = HeadersStore { let snapshot = ChainStore {
store: Arc::clone(&self.store), store: Arc::clone(&self.store),
cf_name: cf_name.into(), cf_name: cf_name.into(),
min_height, min_height,
@ -383,7 +381,7 @@ impl HeadersStore<Full> {
pub fn apply_snapshot( pub fn apply_snapshot(
&self, &self,
snaphost: HeadersStore<Snapshot>, snaphost: ChainStore<Snapshot>,
) -> Result<(), CompactFiltersError> { ) -> Result<(), CompactFiltersError> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
@ -523,7 +521,7 @@ impl HeadersStore<Full> {
} }
} }
impl<T: StoreType> HeadersStore<T> { impl<T: StoreType> ChainStore<T> {
pub fn work(&self) -> Result<Uint256, CompactFiltersError> { pub fn work(&self) -> Result<Uint256, CompactFiltersError> {
let read_store = self.store.read().unwrap(); let read_store = self.store.read().unwrap();
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
@ -629,6 +627,18 @@ impl<T: StoreType> HeadersStore<T> {
} }
} }
impl<T: StoreType> fmt::Debug for ChainStore<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(&format!("ChainStore<{:?}>", T::default()))
.field("cf_name", &self.cf_name)
.field("min_height", &self.min_height)
.field("network", &self.network)
.field("headers_height", &self.get_height())
.field("tip_hash", &self.get_tip_hash())
.finish()
}
}
pub type FilterHeaderHash = FilterHash; pub type FilterHeaderHash = FilterHash;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -663,7 +673,7 @@ type BundleEntry = (BundleStatus, FilterHeaderHash);
impl CFStore { impl CFStore {
pub fn new( pub fn new(
headers_store: &HeadersStore<Full>, headers_store: &ChainStore<Full>,
filter_type: u8, filter_type: u8,
) -> Result<Self, CompactFiltersError> { ) -> Result<Self, CompactFiltersError> {
let cf_store = CFStore { let cf_store = CFStore {

View File

@ -1,5 +1,6 @@
use std::collections::{BTreeMap, HashMap, VecDeque}; use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
use bitcoin::hash_types::{BlockHash, FilterHash}; use bitcoin::hash_types::{BlockHash, FilterHash};
use bitcoin::network::message::NetworkMessage; use bitcoin::network::message::NetworkMessage;
@ -9,11 +10,12 @@ use bitcoin::util::bip158::BlockFilter;
use super::peer::*; use super::peer::*;
use super::store::*; use super::store::*;
use super::CompactFiltersError; use super::CompactFiltersError;
use crate::error::Error;
pub(crate) const BURIED_CONFIRMATIONS: usize = 100; pub(crate) const BURIED_CONFIRMATIONS: usize = 100;
pub struct CFSync { pub struct CFSync {
headers_store: Arc<HeadersStore<Full>>, headers_store: Arc<ChainStore<Full>>,
cf_store: Arc<CFStore>, cf_store: Arc<CFStore>,
skip_blocks: usize, skip_blocks: usize,
bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>, bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
@ -21,7 +23,7 @@ pub struct CFSync {
impl CFSync { impl CFSync {
pub fn new( pub fn new(
headers_store: Arc<HeadersStore<Full>>, headers_store: Arc<ChainStore<Full>>,
skip_blocks: usize, skip_blocks: usize,
filter_type: u8, filter_type: u8,
) -> Result<Self, CompactFiltersError> { ) -> Result<Self, CompactFiltersError> {
@ -72,7 +74,7 @@ impl CFSync {
) -> Result<(), CompactFiltersError> ) -> Result<(), CompactFiltersError>
where where
F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>, F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>,
Q: Fn(usize) -> Result<(), crate::error::Error>, Q: Fn(usize) -> Result<(), Error>,
{ {
let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated
@ -230,11 +232,11 @@ impl CFSync {
pub fn sync_headers<F>( pub fn sync_headers<F>(
peer: Arc<Peer>, peer: Arc<Peer>,
store: Arc<HeadersStore<Full>>, store: Arc<ChainStore<Full>>,
sync_fn: F, sync_fn: F,
) -> Result<Option<HeadersStore<Snapshot>>, CompactFiltersError> ) -> Result<Option<ChainStore<Snapshot>>, CompactFiltersError>
where where
F: Fn(usize) -> Result<(), crate::error::Error>, F: Fn(usize) -> Result<(), Error>,
{ {
let locators = store.get_locators()?; let locators = store.get_locators()?;
let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect(); let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect();
@ -244,8 +246,10 @@ where
locators_vec, locators_vec,
Default::default(), Default::default(),
)))?; )))?;
let (mut snapshot, mut last_hash) = let (mut snapshot, mut last_hash) = if let NetworkMessage::Headers(headers) = peer
if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { .recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?
{
if headers.is_empty() { if headers.is_empty() {
return Ok(None); return Ok(None);
} }
@ -267,7 +271,10 @@ where
vec![last_hash], vec![last_hash],
Default::default(), Default::default(),
)))?; )))?;
if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { if let NetworkMessage::Headers(headers) = peer
.recv("headers", Some(Duration::from_secs(TIMEOUT_SECS)))?
.ok_or(CompactFiltersError::Timeout)?
{
let batch_len = headers.len(); let batch_len = headers.len();
last_hash = snapshot.apply(sync_height, headers)?; last_hash = snapshot.apply(sync_height, headers)?;