diff --git a/.travis.yml b/.travis.yml index d3718fd2..6a6263f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ env: - TARGET=x86_64-unknown-linux-gnu FEATURES=minimal,esplora NO_DEFAULT_FEATURES=1 - TARGET=x86_64-unknown-linux-gnu FEATURES=key-value-db NO_DEFAULT_FEATURES=1 - TARGET=x86_64-unknown-linux-gnu FEATURES=electrum NO_DEFAULT_FEATURES=1 + - TARGET=x86_64-unknown-linux-gnu FEATURES=compact_filters NO_DEFAULT_FEATURES=1 - TARGET=x86_64-unknown-linux-gnu FEATURES=cli-utils,esplora NO_DEFAULT_FEATURES=1 - TARGET=x86_64-unknown-linux-gnu FEATURES=compiler NO_DEFAULT_FEATURES=1 RUN_TESTS=1 # Test the `miniscriptc` example - TARGET=x86_64-unknown-linux-gnu FEATURES=test-electrum NO_DEFAULT_FEATURES=1 RUN_TESTS=1 RUN_CORE=1 diff --git a/Cargo.toml b/Cargo.toml index 71af2dea..91caadf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,13 +14,15 @@ serde_json = { version = "^1.0" } rand = "^0.7" # Optional dependencies -sled = { version = "0.31.0", optional = true } +sled = { version = "0.34", optional = true } electrum-client = { version = "0.2.0-beta.1", optional = true } reqwest = { version = "0.10", optional = true, features = ["json"] } futures = { version = "0.3", optional = true } clap = { version = "2.33", optional = true } base64 = { version = "^0.11", optional = true } async-trait = { version = "0.1", optional = true } +rocksdb = { version = "0.14", optional = true } +lazy_static = { version = "1.4", optional = true } # Platform-specific dependencies [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -36,6 +38,7 @@ compiler = ["clap", "miniscript/compiler"] default = ["key-value-db", "electrum"] electrum = ["electrum-client"] esplora = ["reqwest", "futures"] +compact_filters = ["rocksdb", "lazy_static"] key-value-db = ["sled"] cli-utils = ["clap", "base64"] async-interface = ["async-trait"] diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs new file mode 100644 index 00000000..f32b28b2 --- /dev/null +++ b/src/blockchain/compact_filters/mod.rs @@ -0,0 +1,465 @@ +use std::collections::HashSet; +use std::fmt; +use std::net::ToSocketAddrs; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +#[allow(unused_imports)] +use log::{debug, error, info, trace}; + +use bitcoin::network::message_blockdata::Inventory; +use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid}; + +use rocksdb::{Options, SliceTransform, DB}; + +mod peer; +mod store; +mod sync; + +use super::{Blockchain, Capability, OnlineBlockchain, Progress}; +use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; +use crate::error::Error; +use crate::types::{ScriptType, TransactionDetails, UTXO}; +use crate::FeeRate; + +use peer::*; +use store::*; +use sync::*; + +const SYNC_HEADERS_COST: f32 = 1.0; +const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0; +const PROCESS_BLOCKS_COST: f32 = 20_000.0; + +#[derive(Debug)] +pub struct CompactFiltersBlockchain(Option); + +impl CompactFiltersBlockchain { + pub fn new>( + address: A, + storage_dir: P, + num_threads: usize, + skip_blocks: usize, + network: Network, + ) -> Result { + Ok(CompactFiltersBlockchain(Some(CompactFilters::new( + address, + storage_dir, + num_threads, + skip_blocks, + network, + )?))) + } +} + +struct CompactFilters { + peer: Arc, + headers: Arc>, + skip_blocks: usize, + num_threads: usize, +} + +impl CompactFilters { + pub fn new>( + address: A, + storage_dir: P, + num_threads: usize, + skip_blocks: usize, + network: Network, + ) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16)); + + let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]); + let db = DB::open_cf(&opts, &storage_dir, &cfs)?; + let headers = Arc::new(HeadersStore::new(db, network)?); + + // try to recover partial snapshots + for cf_name in &cfs { + if !cf_name.starts_with("_headers:") { + continue; + } + + info!("Trying to recover: {:?}", cf_name); + headers.recover_snapshot(cf_name)?; + } + + Ok(CompactFilters { + peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?), + num_threads, + headers, + skip_blocks, + }) + } + + fn process_tx( + &self, + database: &mut D, + tx: &Transaction, + height: Option, + timestamp: u64, + internal_max_deriv: &mut u32, + external_max_deriv: &mut u32, + ) -> Result<(), Error> { + let mut updates = database.begin_batch(); + + let mut incoming: u64 = 0; + let mut outgoing: u64 = 0; + + let mut inputs_sum: u64 = 0; + let mut outputs_sum: u64 = 0; + + // look for our own inputs + for (i, input) in tx.input.iter().enumerate() { + if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { + inputs_sum += previous_output.value; + + if database.is_mine(&previous_output.script_pubkey)? { + outgoing += previous_output.value; + + debug!("{} input #{} is mine, removing from utxo", tx.txid(), i); + updates.del_utxo(&input.previous_output)?; + } + } + } + + for (i, output) in tx.output.iter().enumerate() { + // to compute the fees later + outputs_sum += output.value; + + // this output is ours, we have a path to derive it + if let Some((script_type, child)) = + database.get_path_from_script_pubkey(&output.script_pubkey)? + { + debug!("{} output #{} is mine, adding utxo", tx.txid(), i); + updates.set_utxo(&UTXO { + outpoint: OutPoint::new(tx.txid(), i as u32), + txout: output.clone(), + is_internal: script_type.is_internal(), + })?; + incoming += output.value; + + if script_type == ScriptType::Internal && child > *internal_max_deriv { + *internal_max_deriv = child; + } else if script_type == ScriptType::External && child > *external_max_deriv { + *external_max_deriv = child; + } + } + } + + if incoming > 0 || outgoing > 0 { + let tx = TransactionDetails { + txid: tx.txid(), + transaction: Some(tx.clone()), + received: incoming, + sent: outgoing, + height, + timestamp, + fees: inputs_sum.checked_sub(outputs_sum).unwrap_or(0), + }; + + info!("Saving tx {}", tx.txid); + updates.set_tx(&tx)?; + } + + database.commit_batch(updates)?; + + Ok(()) + } +} + +impl Blockchain for CompactFiltersBlockchain { + fn offline() -> Self { + CompactFiltersBlockchain(None) + } + + fn is_online(&self) -> bool { + self.0.is_some() + } +} + +impl OnlineBlockchain for CompactFiltersBlockchain { + fn get_capabilities(&self) -> HashSet { + vec![Capability::FullHistory].into_iter().collect() + } + + fn setup( + &self, + _stop_gap: Option, // TODO: move to electrum and esplora only + database: &mut D, + progress_update: P, + ) -> Result<(), Error> { + let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; + + let cf_sync = Arc::new(CFSync::new( + Arc::clone(&inner.headers), + inner.skip_blocks, + 0x00, + )?); + + let initial_height = inner.headers.get_height()?; + let total_bundles = (inner.peer.get_version().start_height as usize) + .checked_sub(inner.skip_blocks) + .map(|x| x / 1000) + .unwrap_or(0) + + 1; + let expected_bundles_to_sync = total_bundles + .checked_sub(cf_sync.pruned_bundles()?) + .unwrap_or(0); + + let headers_cost = (inner.peer.get_version().start_height as usize) + .checked_sub(initial_height) + .unwrap_or(0) as f32 + * SYNC_HEADERS_COST; + let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST; + + let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST; + + if let Some(snapshot) = sync::sync_headers( + Arc::clone(&inner.peer), + Arc::clone(&inner.headers), + |new_height| { + let local_headers_cost = + new_height.checked_sub(initial_height).unwrap_or(0) as f32 * SYNC_HEADERS_COST; + progress_update.update( + local_headers_cost / total_cost * 100.0, + Some(format!("Synced headers to {}", new_height)), + ) + }, + )? { + if snapshot.work()? > inner.headers.work()? { + info!("Applying snapshot with work: {}", snapshot.work()?); + inner.headers.apply_snapshot(snapshot)?; + } + } + + let synced_height = inner.headers.get_height()?; + let buried_height = synced_height + .checked_sub(sync::BURIED_CONFIRMATIONS) + .unwrap_or(0); + info!("Synced headers to height: {}", synced_height); + + cf_sync.prepare_sync(Arc::clone(&inner.peer))?; + + let all_scripts = Arc::new( + database + .iter_script_pubkeys(None)? + .into_iter() + .map(|s| s.to_bytes()) + .collect::>(), + ); + + let last_synced_block = Arc::new(Mutex::new(synced_height)); + let synced_bundles = Arc::new(AtomicUsize::new(0)); + let progress_update = Arc::new(Mutex::new(progress_update)); + + let mut threads = Vec::with_capacity(inner.num_threads); + for _ in 0..inner.num_threads { + let cf_sync = Arc::clone(&cf_sync); + let peer = Arc::new(inner.peer.new_connection()?); + let headers = Arc::clone(&inner.headers); + let all_scripts = Arc::clone(&all_scripts); + let last_synced_block = Arc::clone(&last_synced_block); + let progress_update = Arc::clone(&progress_update); + let synced_bundles = Arc::clone(&synced_bundles); + + let thread = std::thread::spawn(move || { + cf_sync.capture_thread_for_sync( + peer, + |block_hash, filter| { + if !filter + .match_any(block_hash, &mut all_scripts.iter().map(AsRef::as_ref))? + { + return Ok(false); + } + + let block_height = headers.get_height_for(block_hash)?.unwrap_or(0); + let saved_correct_block = match headers.get_full_block(block_height)? { + Some(block) if &block.bitcoin_hash() == block_hash => true, + _ => false, + }; + + if saved_correct_block { + Ok(false) + } else { + let mut last_synced_block = last_synced_block.lock().unwrap(); + + // If we download a block older than `last_synced_block`, we update it so that + // we know to delete and re-process all txs starting from that height + if block_height < *last_synced_block { + *last_synced_block = block_height; + } + + Ok(true) + } + }, + |index| { + let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst); + let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST; + progress_update.lock().unwrap().update( + (headers_cost + local_filters_cost) / total_cost * 100.0, + Some(format!( + "Synced filters {} - {}", + index * 1000 + 1, + (index + 1) * 1000 + )), + ) + }, + ) + }); + + threads.push(thread); + } + + for t in threads { + t.join().unwrap()?; + } + + progress_update.lock().unwrap().update( + (headers_cost + filters_cost) / total_cost * 100.0, + Some("Processing downloaded blocks and mempool".into()), + )?; + + // delete all txs newer than last_synced_block + let last_synced_block = *last_synced_block.lock().unwrap(); + log::debug!( + "Dropping transactions newer than `last_synced_block` = {}", + last_synced_block + ); + let mut updates = database.begin_batch(); + for details in database.iter_txs(false)? { + match details.height { + Some(height) if (height as usize) < last_synced_block => continue, + _ => updates.del_tx(&details.txid, false)?, + }; + } + database.commit_batch(updates)?; + + inner.peer.ask_for_mempool()?; + + let mut internal_max_deriv = 0; + let mut external_max_deriv = 0; + + for (height, block) in inner.headers.iter_full_blocks()? { + for tx in &block.txdata { + inner.process_tx( + database, + tx, + Some(height as u32), + 0, + &mut internal_max_deriv, + &mut external_max_deriv, + )?; + } + } + for tx in inner.peer.get_mempool().iter_txs().iter() { + inner.process_tx( + database, + tx, + None, + 0, + &mut internal_max_deriv, + &mut external_max_deriv, + )?; + } + + let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0); + let first_ext_new = external_max_deriv as u32 + 1; + if first_ext_new > current_ext { + info!("Setting external index to {}", first_ext_new); + database.set_last_index(ScriptType::External, first_ext_new)?; + } + + let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0); + let first_int_new = internal_max_deriv + 1; + if first_int_new > current_int { + info!("Setting internal index to {}", first_int_new); + database.set_last_index(ScriptType::Internal, first_int_new)?; + } + + info!("Dropping blocks until {}", buried_height); + inner.headers.delete_blocks_until(buried_height)?; + + progress_update + .lock() + .unwrap() + .update(100.0, Some("Done".into()))?; + + Ok(()) + } + + fn get_tx(&self, txid: &Txid) -> Result, Error> { + let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; + + Ok(inner + .peer + .get_mempool() + .get_tx(&Inventory::Transaction(*txid))) + } + + fn broadcast(&self, tx: &Transaction) -> Result<(), Error> { + let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; + inner.peer.broadcast_tx(tx.clone())?; + + Ok(()) + } + + fn get_height(&self) -> Result { + let inner = self.0.as_ref().ok_or(Error::OfflineClient)?; + + Ok(inner.headers.get_height()? as u32) + } + + fn estimate_fee(&self, _target: usize) -> Result { + // TODO + Ok(FeeRate::default()) + } +} + +impl fmt::Debug for CompactFilters { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CompactFilters") + .field("peer", &self.peer) + .finish() + } +} + +#[derive(Debug)] +pub enum CompactFiltersError { + InvalidResponse, + InvalidHeaders, + InvalidFilterHeader, + InvalidFilter, + MissingBlock, + DataCorruption, + Timeout, + + DB(rocksdb::Error), + IO(std::io::Error), + BIP158(bitcoin::util::bip158::Error), + Time(std::time::SystemTimeError), + + Global(Box), +} + +macro_rules! impl_error { + ( $from:ty, $to:ident ) => { + impl std::convert::From<$from> for CompactFiltersError { + fn from(err: $from) -> Self { + CompactFiltersError::$to(err) + } + } + }; +} + +impl_error!(rocksdb::Error, DB); +impl_error!(std::io::Error, IO); +impl_error!(bitcoin::util::bip158::Error, BIP158); +impl_error!(std::time::SystemTimeError, Time); + +impl From for CompactFiltersError { + fn from(err: crate::error::Error) -> Self { + CompactFiltersError::Global(Box::new(err)) + } +} diff --git a/src/blockchain/compact_filters/peer.rs b/src/blockchain/compact_filters/peer.rs new file mode 100644 index 00000000..58932833 --- /dev/null +++ b/src/blockchain/compact_filters/peer.rs @@ -0,0 +1,468 @@ +use std::collections::HashMap; +use std::net::{TcpStream, ToSocketAddrs}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use rand::{thread_rng, Rng}; + +use bitcoin::consensus::Encodable; +use bitcoin::hash_types::BlockHash; +use bitcoin::hashes::Hash; +use bitcoin::network::constants::ServiceFlags; +use bitcoin::network::message::{NetworkMessage, RawNetworkMessage}; +use bitcoin::network::message_blockdata::*; +use bitcoin::network::message_filter::*; +use bitcoin::network::message_network::VersionMessage; +use bitcoin::network::stream_reader::StreamReader; +use bitcoin::network::Address; +use bitcoin::{Block, Network, Transaction, Txid}; + +use super::CompactFiltersError; + +type ResponsesMap = HashMap<&'static str, Arc<(Mutex>, Condvar)>>; + +#[derive(Debug, Default)] +pub struct Mempool { + txs: RwLock>, +} + +impl Mempool { + pub fn add_tx(&self, tx: Transaction) { + self.txs.write().unwrap().insert(tx.txid(), tx); + } + + pub fn get_tx(&self, inventory: &Inventory) -> Option { + let txid = match inventory { + Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None, + Inventory::Transaction(txid) => *txid, + Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()), + }; + self.txs.read().unwrap().get(&txid).cloned() + } + + pub fn has_tx(&self, txid: &Txid) -> bool { + self.txs.read().unwrap().contains_key(txid) + } + + pub fn iter_txs(&self) -> Vec { + self.txs.read().unwrap().values().cloned().collect() + } +} + +#[derive(Debug)] +pub struct Peer { + writer: Arc>, + responses: Arc>, + + reader_thread: thread::JoinHandle<()>, + connected: Arc>, + + mempool: Arc, + + version: VersionMessage, + network: Network, +} + +impl Peer { + pub fn new( + address: A, + mempool: Arc, + network: Network, + ) -> Result { + let connection = TcpStream::connect(address)?; + + let writer = Arc::new(Mutex::new(connection.try_clone()?)); + let responses: Arc> = Arc::new(RwLock::new(HashMap::new())); + let connected = Arc::new(RwLock::new(true)); + + let mut locked_writer = writer.lock().unwrap(); + + let reader_thread_responses = Arc::clone(&responses); + let reader_thread_writer = Arc::clone(&writer); + let reader_thread_mempool = Arc::clone(&mempool); + let reader_thread_connected = Arc::clone(&connected); + let reader_thread = thread::spawn(move || { + Self::reader_thread( + network, + connection, + reader_thread_responses, + reader_thread_writer, + reader_thread_mempool, + reader_thread_connected, + ) + }); + + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; + let nonce = thread_rng().gen(); + let receiver = Address::new(&locked_writer.peer_addr()?, ServiceFlags::NONE); + let sender = Address { + services: ServiceFlags::NONE, + address: [0u16; 8], + port: 0, + }; + + Self::_send( + &mut locked_writer, + network.magic(), + NetworkMessage::Version(VersionMessage::new( + ServiceFlags::WITNESS, + timestamp, + receiver, + sender, + nonce, + "MagicalBitcoinWallet".into(), + 0, + )), + )?; + let version = if let NetworkMessage::Version(version) = + Self::_recv(&responses, "version", None)?.unwrap() + { + version + } else { + return Err(CompactFiltersError::InvalidResponse); + }; + + if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None)?.unwrap() { + Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?; + } else { + return Err(CompactFiltersError::InvalidResponse); + } + + std::mem::drop(locked_writer); + + Ok(Peer { + writer, + reader_thread, + responses, + connected, + mempool, + network, + version, + }) + } + + pub fn new_connection(&self) -> Result { + let socket_addr = self.writer.lock().unwrap().peer_addr()?; + Self::new(socket_addr, Arc::clone(&self.mempool), self.network) + } + + fn _send( + writer: &mut TcpStream, + magic: u32, + payload: NetworkMessage, + ) -> Result<(), CompactFiltersError> { + log::trace!("==> {:?}", payload); + + let raw_message = RawNetworkMessage { magic, payload }; + + raw_message + .consensus_encode(writer) + .map_err(|_| CompactFiltersError::DataCorruption)?; + + Ok(()) + } + + fn _recv( + responses: &Arc>, + wait_for: &'static str, + timeout: Option, + ) -> Result, CompactFiltersError> { + let message_resp = { + let mut lock = responses.write().unwrap(); + let message_resp = lock.entry(wait_for).or_default(); + Arc::clone(&message_resp) + }; + + let (lock, cvar) = &*message_resp; + + let mut messages = lock.lock().unwrap(); + while messages.is_empty() { + match timeout { + None => messages = cvar.wait(messages).unwrap(), + Some(t) => { + let result = cvar.wait_timeout(messages, t).unwrap(); + if result.1.timed_out() { + return Ok(None); + } + + messages = result.0; + } + } + } + + Ok(messages.pop()) + } + + pub fn get_version(&self) -> &VersionMessage { + &self.version + } + + pub fn get_mempool(&self) -> Arc { + Arc::clone(&self.mempool) + } + + pub fn is_connected(&self) -> bool { + *self.connected.read().unwrap() + } + + pub fn reader_thread( + network: Network, + connection: TcpStream, + reader_thread_responses: Arc>, + reader_thread_writer: Arc>, + reader_thread_mempool: Arc, + reader_thread_connected: Arc>, + ) { + macro_rules! check_disconnect { + ($call:expr) => { + match $call { + Ok(good) => good, + Err(e) => { + log::debug!("Error {:?}", e); + *reader_thread_connected.write().unwrap() = false; + + break; + } + } + }; + } + + let mut reader = StreamReader::new(connection, None); + loop { + let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next()); + + let in_message = if raw_message.magic != network.magic() { + continue; + } else { + raw_message.payload + }; + + log::trace!("<== {:?}", in_message); + + match in_message { + NetworkMessage::Ping(nonce) => { + check_disconnect!(Self::_send( + &mut reader_thread_writer.lock().unwrap(), + network.magic(), + NetworkMessage::Pong(nonce), + )); + + continue; + } + NetworkMessage::Alert(_) => continue, + NetworkMessage::GetData(ref inv) => { + let (found, not_found): (Vec<_>, Vec<_>) = inv + .into_iter() + .map(|item| (*item, reader_thread_mempool.get_tx(item))) + .partition(|(_, d)| d.is_some()); + for (_, found_tx) in found { + check_disconnect!(Self::_send( + &mut reader_thread_writer.lock().unwrap(), + network.magic(), + NetworkMessage::Tx(found_tx.unwrap()), + )); + } + + if !not_found.is_empty() { + check_disconnect!(Self::_send( + &mut reader_thread_writer.lock().unwrap(), + network.magic(), + NetworkMessage::NotFound( + not_found.into_iter().map(|(i, _)| i).collect(), + ), + )); + } + } + _ => {} + } + + let message_resp = { + let mut lock = reader_thread_responses.write().unwrap(); + let message_resp = lock.entry(in_message.cmd()).or_default(); + Arc::clone(&message_resp) + }; + + let (lock, cvar) = &*message_resp; + let mut messages = lock.lock().unwrap(); + messages.push(in_message); + cvar.notify_all(); + } + } + + pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> { + let mut writer = self.writer.lock().unwrap(); + Self::_send(&mut writer, self.network.magic(), payload) + } + + pub fn recv( + &self, + wait_for: &'static str, + timeout: Option, + ) -> Result, CompactFiltersError> { + Self::_recv(&self.responses, wait_for, timeout) + } +} + +pub trait CompactFiltersPeer { + fn get_cf_checkpt( + &self, + filter_type: u8, + stop_hash: BlockHash, + ) -> Result; + fn get_cf_headers( + &self, + filter_type: u8, + start_height: u32, + stop_hash: BlockHash, + ) -> Result; + fn get_cf_filters( + &self, + filter_type: u8, + start_height: u32, + stop_hash: BlockHash, + ) -> Result<(), CompactFiltersError>; + fn pop_cf_filter_resp(&self) -> Result; +} + +impl CompactFiltersPeer for Peer { + fn get_cf_checkpt( + &self, + filter_type: u8, + stop_hash: BlockHash, + ) -> Result { + self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt { + filter_type, + stop_hash, + }))?; + + let response = self + .recv("cfcheckpt", Some(Duration::from_secs(10)))? + .ok_or(CompactFiltersError::Timeout)?; + let response = match response { + NetworkMessage::CFCheckpt(response) => response, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + if response.filter_type != filter_type { + return Err(CompactFiltersError::InvalidResponse); + } + + Ok(response) + } + + fn get_cf_headers( + &self, + filter_type: u8, + start_height: u32, + stop_hash: BlockHash, + ) -> Result { + self.send(NetworkMessage::GetCFHeaders(GetCFHeaders { + filter_type, + start_height, + stop_hash, + }))?; + + let response = self + .recv("cfheaders", Some(Duration::from_secs(10)))? + .ok_or(CompactFiltersError::Timeout)?; + let response = match response { + NetworkMessage::CFHeaders(response) => response, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + if response.filter_type != filter_type { + return Err(CompactFiltersError::InvalidResponse); + } + + Ok(response) + } + + fn pop_cf_filter_resp(&self) -> Result { + let response = self + .recv("cfilter", Some(Duration::from_secs(10)))? + .ok_or(CompactFiltersError::Timeout)?; + let response = match response { + NetworkMessage::CFilter(response) => response, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + Ok(response) + } + + fn get_cf_filters( + &self, + filter_type: u8, + start_height: u32, + stop_hash: BlockHash, + ) -> Result<(), CompactFiltersError> { + self.send(NetworkMessage::GetCFilters(GetCFilters { + filter_type, + start_height, + stop_hash, + }))?; + + Ok(()) + } +} + +pub trait InvPeer { + fn get_block(&self, block_hash: BlockHash) -> Result, CompactFiltersError>; + fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>; + fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>; +} + +impl InvPeer for Peer { + fn get_block(&self, block_hash: BlockHash) -> Result, CompactFiltersError> { + self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock( + block_hash, + )]))?; + + match self.recv("block", Some(Duration::from_secs(10)))? { + None => Ok(None), + Some(NetworkMessage::Block(response)) => Ok(Some(response)), + _ => Err(CompactFiltersError::InvalidResponse), + } + } + + fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> { + self.send(NetworkMessage::MemPool)?; + let inv = match self.recv("inv", Some(Duration::from_secs(5)))? { + None => return Ok(()), // empty mempool + Some(NetworkMessage::Inv(inv)) => inv, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + let getdata = inv + .iter() + .cloned() + .filter(|item| match item { + Inventory::Transaction(txid) if !self.mempool.has_tx(txid) => true, + _ => false, + }) + .collect::>(); + let num_txs = getdata.len(); + self.send(NetworkMessage::GetData(getdata))?; + + for _ in 0..num_txs { + let tx = self + .recv("tx", Some(Duration::from_secs(10)))? + .ok_or(CompactFiltersError::Timeout)?; + let tx = match tx { + NetworkMessage::Tx(tx) => tx, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + self.mempool.add_tx(tx); + } + + Ok(()) + } + + fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> { + self.mempool.add_tx(tx.clone()); + self.send(NetworkMessage::Tx(tx))?; + + Ok(()) + } +} diff --git a/src/blockchain/compact_filters/store.rs b/src/blockchain/compact_filters/store.rs new file mode 100644 index 00000000..81f24d93 --- /dev/null +++ b/src/blockchain/compact_filters/store.rs @@ -0,0 +1,871 @@ +use std::convert::TryInto; +use std::io::{Read, Write}; +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::Arc; +use std::sync::RwLock; + +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB}; + +use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable}; +use bitcoin::hash_types::FilterHash; +use bitcoin::hashes::hex::FromHex; +use bitcoin::hashes::{sha256d, Hash}; +use bitcoin::util::bip158::BlockFilter; +use bitcoin::util::hash::BitcoinHash; +use bitcoin::util::uint::Uint256; +use bitcoin::Block; +use bitcoin::BlockHash; +use bitcoin::BlockHeader; +use bitcoin::Network; + +use super::CompactFiltersError; + +lazy_static! { + static ref MAINNET_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); + static ref TESTNET_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); + static ref REGTEST_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); +} + +pub trait StoreType: Default {} + +#[derive(Default)] +pub struct Full; +impl StoreType for Full {} +#[derive(Default)] +pub struct Snapshot; +impl StoreType for Snapshot {} + +pub enum StoreEntry { + BlockHeader(Option), + Block(Option), + BlockHeaderIndex(Option), + CFilterTable((u8, Option)), +} + +impl StoreEntry { + pub fn get_prefix(&self) -> Vec { + match self { + StoreEntry::BlockHeader(_) => b"z", + StoreEntry::Block(_) => b"x", + StoreEntry::BlockHeaderIndex(_) => b"i", + StoreEntry::CFilterTable(_) => b"t", + } + .to_vec() + } + + pub fn get_key(&self) -> Vec { + let mut prefix = self.get_prefix(); + match self { + StoreEntry::BlockHeader(Some(height)) => { + prefix.extend_from_slice(&height.to_be_bytes()) + } + StoreEntry::Block(Some(height)) => prefix.extend_from_slice(&height.to_be_bytes()), + StoreEntry::BlockHeaderIndex(Some(hash)) => { + prefix.extend_from_slice(&hash.into_inner()) + } + StoreEntry::CFilterTable((filter_type, bundle_index)) => { + prefix.push(*filter_type); + if let Some(bundle_index) = bundle_index { + prefix.extend_from_slice(&bundle_index.to_be_bytes()); + } + } + _ => {} + } + + prefix + } +} + +pub trait SerializeDb: Sized { + fn serialize(&self) -> Vec; + fn deserialize(data: &[u8]) -> Result; +} + +impl SerializeDb for T +where + T: Encodable + Decodable, +{ + fn serialize(&self) -> Vec { + serialize(self) + } + + fn deserialize(data: &[u8]) -> Result { + Ok(deserialize(data).map_err(|_| CompactFiltersError::DataCorruption)?) + } +} + +impl Encodable for FilterHeader { + fn consensus_encode( + &self, + mut e: W, + ) -> Result { + let mut written = self.prev_header_hash.consensus_encode(&mut e)?; + written += self.filter_hash.consensus_encode(&mut e)?; + Ok(written) + } +} + +impl Decodable for FilterHeader { + fn consensus_decode(mut d: D) -> Result { + let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?; + let filter_hash = FilterHash::consensus_decode(&mut d)?; + + Ok(FilterHeader { + prev_header_hash, + filter_hash, + }) + } +} + +impl Encodable for BundleStatus { + fn consensus_encode( + &self, + mut e: W, + ) -> Result { + let mut written = 0; + + match self { + BundleStatus::Init => { + written += 0x00u8.consensus_encode(&mut e)?; + } + BundleStatus::CFHeaders { cf_headers } => { + written += 0x01u8.consensus_encode(&mut e)?; + written += VarInt(cf_headers.len() as u64).consensus_encode(&mut e)?; + for header in cf_headers { + written += header.consensus_encode(&mut e)?; + } + } + BundleStatus::CFilters { cf_filters } => { + written += 0x02u8.consensus_encode(&mut e)?; + written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?; + for filter in cf_filters { + written += filter.consensus_encode(&mut e)?; + } + } + BundleStatus::Processed { cf_filters } => { + written += 0x03u8.consensus_encode(&mut e)?; + written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?; + for filter in cf_filters { + written += filter.consensus_encode(&mut e)?; + } + } + BundleStatus::Pruned => { + written += 0x04u8.consensus_encode(&mut e)?; + } + BundleStatus::Tip { cf_filters } => { + written += 0x05u8.consensus_encode(&mut e)?; + written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?; + for filter in cf_filters { + written += filter.consensus_encode(&mut e)?; + } + } + } + + Ok(written) + } +} + +impl Decodable for BundleStatus { + fn consensus_decode(mut d: D) -> Result { + let byte_type = u8::consensus_decode(&mut d)?; + match byte_type { + 0x00 => Ok(BundleStatus::Init), + 0x01 => { + let num = VarInt::consensus_decode(&mut d)?; + let num = num.0 as usize; + + let mut cf_headers = Vec::with_capacity(num); + for _ in 0..num { + cf_headers.push(FilterHeader::consensus_decode(&mut d)?); + } + + Ok(BundleStatus::CFHeaders { cf_headers }) + } + 0x02 => { + let num = VarInt::consensus_decode(&mut d)?; + let num = num.0 as usize; + + let mut cf_filters = Vec::with_capacity(num); + for _ in 0..num { + cf_filters.push(Vec::::consensus_decode(&mut d)?); + } + + Ok(BundleStatus::CFilters { cf_filters }) + } + 0x03 => { + let num = VarInt::consensus_decode(&mut d)?; + let num = num.0 as usize; + + let mut cf_filters = Vec::with_capacity(num); + for _ in 0..num { + cf_filters.push(Vec::::consensus_decode(&mut d)?); + } + + Ok(BundleStatus::Processed { cf_filters }) + } + 0x04 => Ok(BundleStatus::Pruned), + 0x05 => { + let num = VarInt::consensus_decode(&mut d)?; + let num = num.0 as usize; + + let mut cf_filters = Vec::with_capacity(num); + for _ in 0..num { + cf_filters.push(Vec::::consensus_decode(&mut d)?); + } + + Ok(BundleStatus::Tip { cf_filters }) + } + _ => Err(bitcoin::consensus::encode::Error::ParseFailed( + "Invalid byte type", + )), + } + } +} + +pub struct HeadersStore { + store: Arc>, + cf_name: String, + min_height: usize, + network: Network, + phantom: PhantomData, +} + +impl HeadersStore { + pub fn new(store: DB, network: Network) -> Result { + let genesis = match network { + Network::Bitcoin => MAINNET_GENESIS.deref(), + Network::Testnet => TESTNET_GENESIS.deref(), + Network::Regtest => REGTEST_GENESIS.deref(), + }; + + let cf_name = "default".to_string(); + let cf_handle = store.cf_handle(&cf_name).unwrap(); + + let genesis_key = StoreEntry::BlockHeader(Some(0)).get_key(); + + if store.get_pinned_cf(cf_handle, &genesis_key)?.is_none() { + let mut batch = WriteBatch::default(); + batch.put_cf( + cf_handle, + genesis_key, + (genesis.header, genesis.header.work()).serialize(), + ); + batch.put_cf( + cf_handle, + StoreEntry::BlockHeaderIndex(Some(genesis.bitcoin_hash())).get_key(), + &0usize.to_be_bytes(), + ); + store.write(batch)?; + } + + Ok(HeadersStore { + store: Arc::new(RwLock::new(store)), + cf_name, + min_height: 0, + network, + phantom: PhantomData, + }) + } + + pub fn get_locators(&self) -> Result, CompactFiltersError> { + let mut step = 1; + let mut index = self.get_height()?; + let mut answer = Vec::new(); + + let store_read = self.store.read().unwrap(); + let cf_handle = store_read.cf_handle(&self.cf_name).unwrap(); + + loop { + if answer.len() > 10 { + step *= 2; + } + + let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize( + &store_read + .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(index)).get_key())? + .unwrap(), + )?; + answer.push((header.bitcoin_hash(), index)); + + if let Some(new_index) = index.checked_sub(step) { + index = new_index; + } else { + break; + } + } + + Ok(answer) + } + + pub fn start_snapshot( + &self, + from: usize, + ) -> Result, CompactFiltersError> { + let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect(); + let new_cf_name = format!("_headers:{}", new_cf_name); + + let mut write_store = self.store.write().unwrap(); + + write_store.create_cf(&new_cf_name, &Default::default())?; + + let cf_handle = write_store.cf_handle(&self.cf_name).unwrap(); + let new_cf_handle = write_store.cf_handle(&new_cf_name).unwrap(); + + let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize( + &write_store + .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())? + .ok_or(CompactFiltersError::DataCorruption)?, + )?; + + let mut batch = WriteBatch::default(); + batch.put_cf( + new_cf_handle, + StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(), + &from.to_be_bytes(), + ); + batch.put_cf( + new_cf_handle, + StoreEntry::BlockHeader(Some(from)).get_key(), + (header, work).serialize(), + ); + write_store.write(batch)?; + + let store = Arc::clone(&self.store); + Ok(HeadersStore { + store, + cf_name: new_cf_name, + min_height: from, + network: self.network, + phantom: PhantomData, + }) + } + + pub fn recover_snapshot(&self, cf_name: &str) -> Result<(), CompactFiltersError> { + let mut write_store = self.store.write().unwrap(); + let snapshot_cf_handle = write_store.cf_handle(cf_name).unwrap(); + + let prefix = StoreEntry::BlockHeader(None).get_key(); + let mut iterator = write_store.prefix_iterator_cf(snapshot_cf_handle, prefix); + + let min_height = match iterator + .next() + .and_then(|(k, _)| k[1..].try_into().ok()) + .map(|bytes| usize::from_be_bytes(bytes)) + { + None => { + std::mem::drop(iterator); + write_store.drop_cf(cf_name).ok(); + + return Ok(()); + } + Some(x) => x, + }; + std::mem::drop(iterator); + std::mem::drop(write_store); + + let snapshot = HeadersStore { + store: Arc::clone(&self.store), + cf_name: cf_name.into(), + min_height, + network: self.network, + phantom: PhantomData, + }; + if snapshot.work()? > self.work()? { + self.apply_snapshot(snapshot)?; + } + + Ok(()) + } + + pub fn apply_snapshot( + &self, + snaphost: HeadersStore, + ) -> Result<(), CompactFiltersError> { + let mut batch = WriteBatch::default(); + + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + let snapshot_cf_handle = read_store.cf_handle(&snaphost.cf_name).unwrap(); + + let from_key = StoreEntry::BlockHeader(Some(snaphost.min_height)).get_key(); + let to_key = StoreEntry::BlockHeader(Some(usize::MAX)).get_key(); + + let mut opts = ReadOptions::default(); + opts.set_iterate_upper_bound(to_key.clone()); + + log::debug!("Removing items"); + batch.delete_range_cf(cf_handle, &from_key, &to_key); + for (_, v) in read_store.iterator_cf_opt( + cf_handle, + opts, + IteratorMode::From(&from_key, Direction::Forward), + ) { + let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?; + + batch.delete_cf( + cf_handle, + StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(), + ); + } + + // Delete full blocks overriden by snapshot + let from_key = StoreEntry::Block(Some(snaphost.min_height)).get_key(); + let to_key = StoreEntry::Block(Some(usize::MAX)).get_key(); + batch.delete_range(&from_key, &to_key); + + log::debug!("Copying over new items"); + for (k, v) in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) { + batch.put_cf(cf_handle, k, v); + } + + read_store.write(batch)?; + + std::mem::drop(snapshot_cf_handle); + std::mem::drop(cf_handle); + std::mem::drop(read_store); + + self.store.write().unwrap().drop_cf(&snaphost.cf_name)?; + + Ok(()) + } + + pub fn get_height_for( + &self, + block_hash: &BlockHash, + ) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let key = StoreEntry::BlockHeaderIndex(Some(block_hash.clone())).get_key(); + let data = read_store.get_pinned_cf(cf_handle, key)?; + Ok(data + .map(|data| { + Ok::<_, CompactFiltersError>(usize::from_be_bytes( + data.as_ref() + .try_into() + .map_err(|_| CompactFiltersError::DataCorruption)?, + )) + }) + .transpose()?) + } + + pub fn get_block_hash(&self, height: usize) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let key = StoreEntry::BlockHeader(Some(height)).get_key(); + let data = read_store.get_pinned_cf(cf_handle, key)?; + Ok(data + .map(|data| { + let (header, _): (BlockHeader, Uint256) = + deserialize(&data).map_err(|_| CompactFiltersError::DataCorruption)?; + Ok::<_, CompactFiltersError>(header.bitcoin_hash()) + }) + .transpose()?) + } + + pub fn save_full_block(&self, block: &Block, height: usize) -> Result<(), CompactFiltersError> { + let key = StoreEntry::Block(Some(height)).get_key(); + self.store.read().unwrap().put(key, block.serialize())?; + + Ok(()) + } + + pub fn get_full_block(&self, height: usize) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + + let key = StoreEntry::Block(Some(height)).get_key(); + let opt_block = read_store.get_pinned(key)?; + + Ok(opt_block + .map(|data| deserialize(&data)) + .transpose() + .map_err(|_| CompactFiltersError::DataCorruption)?) + } + + pub fn delete_blocks_until(&self, height: usize) -> Result<(), CompactFiltersError> { + let from_key = StoreEntry::Block(Some(0)).get_key(); + let to_key = StoreEntry::Block(Some(height)).get_key(); + + let mut batch = WriteBatch::default(); + batch.delete_range(&from_key, &to_key); + + self.store.read().unwrap().write(batch)?; + + Ok(()) + } + + pub fn iter_full_blocks(&self) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + + let prefix = StoreEntry::Block(None).get_key(); + + let iterator = read_store.prefix_iterator(&prefix); + // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't + // have the right prefix + iterator + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(k, v)| { + let height: usize = usize::from_be_bytes( + k[1..] + .try_into() + .map_err(|_| CompactFiltersError::DataCorruption)?, + ); + let block = SerializeDb::deserialize(&v)?; + + Ok((height, block)) + }) + .collect::>() + } +} + +impl HeadersStore { + pub fn work(&self) -> Result { + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let prefix = StoreEntry::BlockHeader(None).get_key(); + let iterator = read_store.prefix_iterator_cf(cf_handle, prefix); + + Ok(iterator + .last() + .map(|(_, v)| -> Result<_, CompactFiltersError> { + let (_, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?; + + Ok(work) + }) + .transpose()? + .unwrap_or_default()) + } + + pub fn get_height(&self) -> Result { + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let prefix = StoreEntry::BlockHeader(None).get_key(); + let iterator = read_store.prefix_iterator_cf(cf_handle, prefix); + + Ok(iterator + .last() + .map(|(k, _)| -> Result<_, CompactFiltersError> { + let height = usize::from_be_bytes( + k[1..] + .try_into() + .map_err(|_| CompactFiltersError::DataCorruption)?, + ); + + Ok(height) + }) + .transpose()? + .unwrap_or_default()) + } + + pub fn get_tip_hash(&self) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let prefix = StoreEntry::BlockHeader(None).get_key(); + let iterator = read_store.prefix_iterator_cf(cf_handle, prefix); + + Ok(iterator + .last() + .map(|(_, v)| -> Result<_, CompactFiltersError> { + let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?; + + Ok(header.bitcoin_hash()) + }) + .transpose()?) + } + + pub fn apply( + &mut self, + from: usize, + headers: Vec, + ) -> Result { + let mut batch = WriteBatch::default(); + + let read_store = self.store.read().unwrap(); + let cf_handle = read_store.cf_handle(&self.cf_name).unwrap(); + + let (mut last_hash, mut accumulated_work) = read_store + .get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())? + .map(|result| { + let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&result)?; + Ok::<_, CompactFiltersError>((header.bitcoin_hash(), work)) + }) + .transpose()? + .ok_or(CompactFiltersError::DataCorruption)?; + + for (index, header) in headers.into_iter().enumerate() { + if header.prev_blockhash != last_hash { + return Err(CompactFiltersError::InvalidHeaders); + } + + last_hash = header.bitcoin_hash(); + accumulated_work = accumulated_work + header.work(); + + let height = from + index + 1; + batch.put_cf( + cf_handle, + StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(), + &(height).to_be_bytes(), + ); + batch.put_cf( + cf_handle, + StoreEntry::BlockHeader(Some(height)).get_key(), + (header, accumulated_work).serialize(), + ); + } + + std::mem::drop(cf_handle); + std::mem::drop(read_store); + + self.store.write().unwrap().write(batch)?; + Ok(last_hash) + } +} + +pub type FilterHeaderHash = FilterHash; + +#[derive(Debug, Clone)] +pub struct FilterHeader { + prev_header_hash: FilterHeaderHash, + filter_hash: FilterHash, +} + +impl BitcoinHash for FilterHeader { + fn bitcoin_hash(&self) -> FilterHeaderHash { + let mut hash_data = self.filter_hash.into_inner().to_vec(); + hash_data.extend_from_slice(&self.prev_header_hash); + sha256d::Hash::hash(&hash_data).into() + } +} + +pub enum BundleStatus { + Init, + CFHeaders { cf_headers: Vec }, + CFilters { cf_filters: Vec> }, + Processed { cf_filters: Vec> }, + Tip { cf_filters: Vec> }, + Pruned, +} + +pub struct CFStore { + store: Arc>, + filter_type: u8, +} + +type BundleEntry = (BundleStatus, FilterHeaderHash); + +impl CFStore { + pub fn new( + headers_store: &HeadersStore, + filter_type: u8, + ) -> Result { + let cf_store = CFStore { + store: Arc::clone(&headers_store.store), + filter_type, + }; + + let genesis = match headers_store.network { + Network::Bitcoin => MAINNET_GENESIS.deref(), + Network::Testnet => TESTNET_GENESIS.deref(), + Network::Regtest => REGTEST_GENESIS.deref(), + }; + + let filter = BlockFilter::new_script_filter(genesis, |utxo| { + Err(bitcoin::util::bip158::Error::UtxoMissing(*utxo)) + })?; + let first_key = StoreEntry::CFilterTable((filter_type, Some(0))).get_key(); + + // Add the genesis' filter + { + let read_store = cf_store.store.read().unwrap(); + if read_store.get_pinned(&first_key)?.is_none() { + read_store.put( + &first_key, + (BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(), + )?; + } + } + + Ok(cf_store) + } + + pub fn get_filter_type(&self) -> u8 { + self.filter_type + } + + pub fn get_bundles(&self) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + + let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key(); + let iterator = read_store.prefix_iterator(&prefix); + + // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't + // have the right prefix + iterator + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(_, data)| BundleEntry::deserialize(&data)) + .collect::>() + } + + pub fn get_checkpoints(&self) -> Result, CompactFiltersError> { + let read_store = self.store.read().unwrap(); + + let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key(); + let iterator = read_store.prefix_iterator(&prefix); + + // FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't + // have the right prefix + Ok(iterator + .filter(|(k, _)| k.starts_with(&prefix)) + .skip(1) + .map(|(_, data)| Ok::<_, CompactFiltersError>(BundleEntry::deserialize(&data)?.1)) + .collect::>()?) + } + + pub fn replace_checkpoints( + &self, + checkpoints: Vec, + ) -> Result<(), CompactFiltersError> { + let current_checkpoints = self.get_checkpoints()?; + + let mut equal_bundles = 0; + for (index, (our, their)) in current_checkpoints + .iter() + .zip(checkpoints.iter()) + .enumerate() + { + equal_bundles = index; + + if our != their { + break; + } + } + + let read_store = self.store.read().unwrap(); + let mut batch = WriteBatch::default(); + + for (index, filter_hash) in checkpoints.iter().enumerate().skip(equal_bundles) { + let key = StoreEntry::CFilterTable((self.filter_type, Some(index + 1))).get_key(); // +1 to skip the genesis' filter + + if let Some((BundleStatus::Tip { .. }, _)) = read_store + .get_pinned(&key)? + .map(|data| BundleEntry::deserialize(&data)) + .transpose()? + { + println!("Keeping bundle #{} as Tip", index); + } else { + batch.put(&key, (BundleStatus::Init, *filter_hash).serialize()); + } + } + + read_store.write(batch)?; + + Ok(()) + } + + pub fn advance_to_cf_headers( + &self, + bundle: usize, + checkpoint_hash: FilterHeaderHash, + filter_headers: Vec, + ) -> Result { + let mut last_hash = checkpoint_hash; + let cf_headers = filter_headers + .into_iter() + .map(|filter_hash| { + let filter_header = FilterHeader { + prev_header_hash: last_hash, + filter_hash, + }; + last_hash = filter_header.bitcoin_hash(); + + filter_header + }) + .collect(); + + let read_store = self.store.read().unwrap(); + + let next_key = StoreEntry::CFilterTable((self.filter_type, Some(bundle + 1))).get_key(); // +1 to skip the genesis' filter + if let Some((_, next_checkpoint)) = read_store + .get_pinned(&next_key)? + .map(|data| BundleEntry::deserialize(&data)) + .transpose()? + { + // check connection with the next bundle if present + if last_hash != next_checkpoint { + return Err(CompactFiltersError::InvalidFilterHeader); + } + } + + let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); + let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash); + + read_store.put(key, value.serialize())?; + + Ok(value.0) + } + + pub fn advance_to_cf_filters( + &self, + bundle: usize, + checkpoint_hash: FilterHeaderHash, + headers: Vec, + filters: Vec<(usize, Vec)>, + ) -> Result { + let cf_filters = filters + .into_iter() + .zip(headers.iter()) + .map(|((_, filter_content), header)| { + if header.filter_hash != sha256d::Hash::hash(&filter_content).into() { + return Err(CompactFiltersError::InvalidFilter); + } + + Ok::<_, CompactFiltersError>(filter_content) + }) + .collect::>()?; + + let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); + let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash); + + let read_store = self.store.read().unwrap(); + read_store.put(key, value.serialize())?; + + Ok(value.0) + } + + pub fn prune_filters( + &self, + bundle: usize, + checkpoint_hash: FilterHeaderHash, + ) -> Result { + let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); + let value = (BundleStatus::Pruned, checkpoint_hash); + + let read_store = self.store.read().unwrap(); + read_store.put(key, value.serialize())?; + + Ok(value.0) + } + + pub fn mark_as_tip( + &self, + bundle: usize, + cf_filters: Vec>, + checkpoint_hash: FilterHeaderHash, + ) -> Result { + let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); + let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash); + + let read_store = self.store.read().unwrap(); + read_store.put(key, value.serialize())?; + + Ok(value.0) + } +} diff --git a/src/blockchain/compact_filters/sync.rs b/src/blockchain/compact_filters/sync.rs new file mode 100644 index 00000000..57b65f8b --- /dev/null +++ b/src/blockchain/compact_filters/sync.rs @@ -0,0 +1,282 @@ +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::{Arc, Mutex}; + +use bitcoin::hash_types::{BlockHash, FilterHash}; +use bitcoin::network::message::NetworkMessage; +use bitcoin::network::message_blockdata::GetHeadersMessage; +use bitcoin::util::bip158::BlockFilter; + +use super::peer::*; +use super::store::*; +use super::CompactFiltersError; + +pub(crate) const BURIED_CONFIRMATIONS: usize = 100; + +pub struct CFSync { + headers_store: Arc>, + cf_store: Arc, + skip_blocks: usize, + bundles: Mutex>, +} + +impl CFSync { + pub fn new( + headers_store: Arc>, + skip_blocks: usize, + filter_type: u8, + ) -> Result { + let cf_store = Arc::new(CFStore::new(&headers_store, filter_type)?); + + Ok(CFSync { + headers_store, + cf_store, + skip_blocks, + bundles: Mutex::new(VecDeque::new()), + }) + } + + pub fn pruned_bundles(&self) -> Result { + Ok(self + .cf_store + .get_bundles()? + .into_iter() + .skip(self.skip_blocks / 1000) + .fold(0, |acc, (status, _)| match status { + BundleStatus::Pruned => acc + 1, + _ => acc, + })) + } + + pub fn prepare_sync(&self, peer: Arc) -> Result<(), CompactFiltersError> { + let mut bundles_lock = self.bundles.lock().unwrap(); + + let resp = peer.get_cf_checkpt( + self.cf_store.get_filter_type(), + self.headers_store.get_tip_hash()?.unwrap(), + )?; + self.cf_store.replace_checkpoints(resp.filter_headers)?; + + bundles_lock.clear(); + for (index, (status, checkpoint)) in self.cf_store.get_bundles()?.into_iter().enumerate() { + bundles_lock.push_back((status, checkpoint, index)); + } + + Ok(()) + } + + pub fn capture_thread_for_sync( + &self, + peer: Arc, + process: F, + completed_bundle: Q, + ) -> Result<(), CompactFiltersError> + where + F: Fn(&BlockHash, &BlockFilter) -> Result, + Q: Fn(usize) -> Result<(), crate::error::Error>, + { + let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated + + loop { + let (mut status, checkpoint, index) = match self.bundles.lock().unwrap().pop_front() { + None => break, + Some(x) => x, + }; + + log::debug!( + "Processing bundle #{} - height {} to {}", + index, + index * 1000 + 1, + (index + 1) * 1000 + ); + + let process_received_filters = + |expected_filters| -> Result>, CompactFiltersError> { + let mut filters_map = BTreeMap::new(); + for _ in 0..expected_filters { + let filter = peer.pop_cf_filter_resp()?; + if filter.filter_type != self.cf_store.get_filter_type() { + return Err(CompactFiltersError::InvalidResponse); + } + + match self.headers_store.get_height_for(&filter.block_hash)? { + Some(height) => filters_map.insert(height, filter.filter), + None => return Err(CompactFiltersError::InvalidFilter), + }; + } + + Ok(filters_map) + }; + + let start_height = index * 1000 + 1; + let mut already_processed = 0; + + if start_height < self.skip_blocks { + status = self.cf_store.prune_filters(index, checkpoint)?; + } + + let stop_height = std::cmp::min(current_height, start_height + 999); + let stop_hash = self.headers_store.get_block_hash(stop_height)?.unwrap(); + + if let BundleStatus::Init = status { + log::trace!("status: Init"); + + let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?; + + assert!(resp.previous_filter == checkpoint); + status = + self.cf_store + .advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?; + } + if let BundleStatus::Tip { cf_filters } = status { + log::trace!("status: Tip (beginning) "); + + already_processed = cf_filters.len(); + let headers_resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?; + + let cf_headers = match self.cf_store.advance_to_cf_headers( + index, + checkpoint, + headers_resp.filter_hashes, + )? { + BundleStatus::CFHeaders { cf_headers } => cf_headers, + _ => return Err(CompactFiltersError::InvalidResponse), + }; + + peer.get_cf_filters( + self.cf_store.get_filter_type(), + (start_height + cf_filters.len()) as u32, + stop_hash, + )?; + let expected_filters = stop_height - start_height + 1 - cf_filters.len(); + let filters_map = process_received_filters(expected_filters)?; + let filters = cf_filters + .into_iter() + .enumerate() + .chain(filters_map.into_iter()) + .collect(); + status = self + .cf_store + .advance_to_cf_filters(index, checkpoint, cf_headers, filters)?; + } + if let BundleStatus::CFHeaders { cf_headers } = status { + log::trace!("status: CFHeaders"); + + peer.get_cf_filters( + self.cf_store.get_filter_type(), + start_height as u32, + stop_hash, + )?; + let expected_filters = stop_height - start_height + 1; + let filters_map = process_received_filters(expected_filters)?; + status = self.cf_store.advance_to_cf_filters( + index, + checkpoint, + cf_headers, + filters_map.into_iter().collect(), + )?; + } + if let BundleStatus::CFilters { cf_filters } = status { + log::trace!("status: CFilters"); + + let last_sync_buried_height = (start_height + already_processed) + .checked_sub(BURIED_CONFIRMATIONS) + .unwrap_or(0); + + for (filter_index, filter) in cf_filters.iter().enumerate() { + let height = filter_index + start_height; + + // do not download blocks that were already "buried" since the last sync + if height < last_sync_buried_height { + continue; + } + + let block_hash = self.headers_store.get_block_hash(height)?.unwrap(); + + // TODO: also download random blocks? + if process(&block_hash, &BlockFilter::new(&filter))? { + log::debug!("Downloading block {}", block_hash); + + let block = peer + .get_block(block_hash)? + .ok_or(CompactFiltersError::MissingBlock)?; + self.headers_store.save_full_block(&block, height)?; + } + } + + status = BundleStatus::Processed { cf_filters }; + } + if let BundleStatus::Processed { cf_filters } = status { + log::trace!("status: Processed"); + + if current_height - stop_height > 1000 { + status = self.cf_store.prune_filters(index, checkpoint)?; + } else { + status = self.cf_store.mark_as_tip(index, cf_filters, checkpoint)?; + } + + completed_bundle(index)?; + } + if let BundleStatus::Pruned = status { + log::trace!("status: Pruned"); + } + if let BundleStatus::Tip { .. } = status { + log::trace!("status: Tip"); + } + } + + Ok(()) + } +} + +pub fn sync_headers( + peer: Arc, + store: Arc>, + sync_fn: F, +) -> Result>, CompactFiltersError> +where + F: Fn(usize) -> Result<(), crate::error::Error>, +{ + let locators = store.get_locators()?; + let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect(); + let locators_map: HashMap<_, _> = locators.into_iter().collect(); + + peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new( + locators_vec, + Default::default(), + )))?; + let (mut snapshot, mut last_hash) = + if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { + if headers.is_empty() { + return Ok(None); + } + + match locators_map.get(&headers[0].prev_blockhash) { + None => return Err(CompactFiltersError::InvalidHeaders), + Some(from) => ( + store.start_snapshot(*from)?, + headers[0].prev_blockhash.clone(), + ), + } + } else { + return Err(CompactFiltersError::InvalidResponse); + }; + + let mut sync_height = store.get_height()?; + while sync_height < peer.get_version().start_height as usize { + peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new( + vec![last_hash], + Default::default(), + )))?; + if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? { + let batch_len = headers.len(); + last_hash = snapshot.apply(sync_height, headers)?; + + sync_height += batch_len; + sync_fn(sync_height)?; + } else { + return Err(CompactFiltersError::InvalidResponse); + } + } + + Ok(Some(snapshot)) +} diff --git a/src/blockchain/electrum.rs b/src/blockchain/electrum.rs index 44331e2c..c4ed4cd4 100644 --- a/src/blockchain/electrum.rs +++ b/src/blockchain/electrum.rs @@ -40,9 +40,13 @@ impl Blockchain for ElectrumBlockchain { impl OnlineBlockchain for ElectrumBlockchain { fn get_capabilities(&self) -> HashSet { - vec![Capability::FullHistory, Capability::GetAnyTx] - .into_iter() - .collect() + vec![ + Capability::FullHistory, + Capability::GetAnyTx, + Capability::AccurateFees, + ] + .into_iter() + .collect() } fn setup( diff --git a/src/blockchain/esplora.rs b/src/blockchain/esplora.rs index 10eb5cb4..4214a0fa 100644 --- a/src/blockchain/esplora.rs +++ b/src/blockchain/esplora.rs @@ -59,9 +59,13 @@ impl Blockchain for EsploraBlockchain { #[maybe_async] impl OnlineBlockchain for EsploraBlockchain { fn get_capabilities(&self) -> HashSet { - vec![Capability::FullHistory, Capability::GetAnyTx] - .into_iter() - .collect() + vec![ + Capability::FullHistory, + Capability::GetAnyTx, + Capability::AccurateFees, + ] + .into_iter() + .collect() } fn setup( diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index da267335..a38f38b5 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -19,10 +19,14 @@ pub mod esplora; #[cfg(feature = "esplora")] pub use self::esplora::EsploraBlockchain; +#[cfg(feature = "compact_filters")] +pub mod compact_filters; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Capability { FullHistory, GetAnyTx, + AccurateFees, } pub trait Blockchain { @@ -46,13 +50,13 @@ impl Blockchain for OfflineBlockchain { pub trait OnlineBlockchain: Blockchain { fn get_capabilities(&self) -> HashSet; - fn setup( + fn setup( &self, stop_gap: Option, database: &mut D, progress_update: P, ) -> Result<(), Error>; - fn sync( + fn sync( &self, stop_gap: Option, database: &mut D, @@ -70,7 +74,7 @@ pub trait OnlineBlockchain: Blockchain { pub type ProgressData = (f32, Option); -pub trait Progress { +pub trait Progress: Send { fn update(&self, progress: f32, message: Option) -> Result<(), Error>; } @@ -89,6 +93,7 @@ impl Progress for Sender { } } +#[derive(Clone)] pub struct NoopProgress; pub fn noop_progress() -> NoopProgress { @@ -100,3 +105,18 @@ impl Progress for NoopProgress { Ok(()) } } + +#[derive(Clone)] +pub struct LogProgress; + +pub fn log_progress() -> LogProgress { + LogProgress +} + +impl Progress for LogProgress { + fn update(&self, progress: f32, message: Option) -> Result<(), Error> { + log::info!("Sync {:.3}%: `{}`", progress, message.unwrap_or("".into())); + + Ok(()) + } +} diff --git a/src/cli.rs b/src/cli.rs index 3da4d49e..2b0c5210 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,6 +11,7 @@ use bitcoin::hashes::hex::{FromHex, ToHex}; use bitcoin::util::psbt::PartiallySignedTransaction; use bitcoin::{Address, OutPoint, Txid}; +use crate::blockchain::log_progress; use crate::error::Error; use crate::types::ScriptType; use crate::{FeeRate, TxBuilder, Wallet}; @@ -344,7 +345,7 @@ where if let Some(_sub_matches) = matches.subcommand_matches("get_new_address") { Ok(Some(format!("{}", wallet.get_new_address()?))) } else if let Some(_sub_matches) = matches.subcommand_matches("sync") { - maybe_await!(wallet.sync(None))?; + maybe_await!(wallet.sync(log_progress(), None))?; Ok(None) } else if let Some(_sub_matches) = matches.subcommand_matches("list_unspent") { let mut res = String::new(); diff --git a/src/error.rs b/src/error.rs index f7641bdd..77cb92c3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,6 +54,8 @@ pub enum Error { Electrum(electrum_client::Error), #[cfg(feature = "esplora")] Esplora(crate::blockchain::esplora::EsploraError), + #[cfg(feature = "compact_filters")] + CompactFilters(crate::blockchain::compact_filters::CompactFiltersError), #[cfg(feature = "key-value-db")] Sled(sled::Error), } @@ -87,3 +89,13 @@ impl_error!(electrum_client::Error, Electrum); impl_error!(crate::blockchain::esplora::EsploraError, Esplora); #[cfg(feature = "key-value-db")] impl_error!(sled::Error, Sled); + +#[cfg(feature = "compact_filters")] +impl From for Error { + fn from(other: crate::blockchain::compact_filters::CompactFiltersError) -> Self { + match other { + crate::blockchain::compact_filters::CompactFiltersError::Global(e) => *e, + err @ _ => Error::CompactFilters(err), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 8143052d..f727c109 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ extern crate async_trait; #[macro_use] extern crate magical_macros; -#[cfg(test)] +#[cfg(any(test, feature = "compact_filters"))] #[macro_use] extern crate lazy_static; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index edc80360..24c6e1fc 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -27,7 +27,7 @@ pub mod utils; use tx_builder::TxBuilder; use utils::{FeeRate, IsDust}; -use crate::blockchain::{noop_progress, Blockchain, OfflineBlockchain, OnlineBlockchain}; +use crate::blockchain::{Blockchain, OfflineBlockchain, OnlineBlockchain, Progress}; use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; use crate::descriptor::{get_checksum, DescriptorMeta, ExtendedDescriptor, ExtractPolicy, Policy}; use crate::error::Error; @@ -1015,7 +1015,11 @@ where } #[maybe_async] - pub fn sync(&self, max_address_param: Option) -> Result<(), Error> { + pub fn sync( + &self, + progress_update: P, + max_address_param: Option, + ) -> Result<(), Error> { debug!("Begin sync..."); let mut run_setup = false; @@ -1057,13 +1061,13 @@ where maybe_await!(self.client.setup( None, self.database.borrow_mut().deref_mut(), - noop_progress(), + progress_update, )) } else { maybe_await!(self.client.sync( None, self.database.borrow_mut().deref_mut(), - noop_progress(), + progress_update, )) } } diff --git a/testutils-macros/src/lib.rs b/testutils-macros/src/lib.rs index b70a37ec..0722c7cf 100644 --- a/testutils-macros/src/lib.rs +++ b/testutils-macros/src/lib.rs @@ -55,7 +55,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt use testutils::{TestClient, serial}; - use #root_ident::blockchain::OnlineBlockchain; + use #root_ident::blockchain::{OnlineBlockchain, noop_progress}; use #root_ident::descriptor::ExtendedDescriptor; use #root_ident::database::MemoryDatabase; use #root_ident::types::ScriptType; @@ -92,7 +92,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt }; let txid = test_client.receive(tx); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); assert_eq!(wallet.list_unspent().unwrap()[0].is_internal, false); @@ -116,7 +116,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 25) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 100_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 2); @@ -127,14 +127,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt fn test_sync_before_and_after_receive() { let (wallet, descriptors, mut test_client) = init_single_sig(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 0); test_client.receive(testutils! { @tx ( (@external descriptors, 0) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 1); @@ -149,7 +149,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000, (@external descriptors, 5) => 30_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 105_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 1); @@ -174,7 +174,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 5) => 25_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 75_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 2); @@ -190,14 +190,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); test_client.receive(testutils! { @tx ( (@external descriptors, 0) => 25_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 75_000); } @@ -210,7 +210,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) ( @replaceable true ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 1); @@ -224,7 +224,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt let new_txid = test_client.bump_fee(&txid); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 1); @@ -246,7 +246,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) ( @confirmations 1 ) ( @replaceable true ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); assert_eq!(wallet.list_transactions(false).unwrap().len(), 1); @@ -259,7 +259,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt // Invalidate 1 block test_client.invalidate(1); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); @@ -278,7 +278,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap(); @@ -286,7 +286,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), details.received); assert_eq!(wallet.list_transactions(false).unwrap().len(), 2); @@ -303,7 +303,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap(); @@ -311,12 +311,12 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt assert!(finalized, "Cannot finalize transaction"); let sent_txid = wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), details.received); // empty wallet let wallet = get_wallet_from_descriptors(&descriptors); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); let tx_map = wallet.list_transactions(false).unwrap().into_iter().map(|tx| (tx.txid, tx)).collect::>(); @@ -340,7 +340,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); let mut total_sent = 0; @@ -350,17 +350,17 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); total_sent += 5_000 + details.fees; } - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent); // empty wallet let wallet = get_wallet_from_descriptors(&descriptors); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent); } @@ -374,14 +374,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 5_000)]).enable_rbf()).unwrap(); let (psbt, finalized) = wallet.sign(psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000 - details.fees - 5_000); assert_eq!(wallet.get_balance().unwrap(), details.received); @@ -389,7 +389,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(new_psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000 - new_details.fees - 5_000); assert_eq!(wallet.get_balance().unwrap(), new_details.received); @@ -406,14 +406,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 50_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap(); let (psbt, finalized) = wallet.sign(psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 1_000 - details.fees); assert_eq!(wallet.get_balance().unwrap(), details.received); @@ -422,7 +422,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(new_psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 0); assert_eq!(new_details.received, 0); @@ -439,14 +439,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 75_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap(); let (psbt, finalized) = wallet.sign(psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees); assert_eq!(details.received, 1_000 - details.fees); @@ -455,7 +455,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(new_psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(new_details.sent, 75_000); assert_eq!(wallet.get_balance().unwrap(), new_details.received); } @@ -470,14 +470,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt @tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1) }); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 75_000); let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap(); let (psbt, finalized) = wallet.sign(psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees); assert_eq!(details.received, 1_000 - details.fees); @@ -487,7 +487,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap(); assert!(finalized, "Cannot finalize transaction"); wallet.broadcast(new_psbt.extract_tx()).unwrap(); - wallet.sync(None).unwrap(); + wallet.sync(noop_progress(), None).unwrap(); assert_eq!(new_details.sent, 75_000); assert_eq!(wallet.get_balance().unwrap(), 0); assert_eq!(new_details.received, 0);