diff --git a/src/blockchain/compact_filters/mod.rs b/src/blockchain/compact_filters/mod.rs index 99c7820e..649c53df 100644 --- a/src/blockchain/compact_filters/mod.rs +++ b/src/blockchain/compact_filters/mod.rs @@ -383,7 +383,12 @@ impl Blockchain for CompactFiltersBlockchain { } database.commit_batch(updates)?; - first_peer.ask_for_mempool()?; + match first_peer.ask_for_mempool() { + Err(CompactFiltersError::PeerBloomDisabled) => { + log::warn!("Peer has BLOOM disabled, we can't ask for the mempool") + } + e => e?, + }; let mut internal_max_deriv = None; let mut external_max_deriv = None; @@ -537,6 +542,8 @@ pub enum CompactFiltersError { NotConnected, /// A peer took too long to reply to one of our messages Timeout, + /// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag + PeerBloomDisabled, /// No peers have been specified NoPeers, diff --git a/src/blockchain/compact_filters/peer.rs b/src/blockchain/compact_filters/peer.rs index b0289540..62fa2cb4 100644 --- a/src/blockchain/compact_filters/peer.rs +++ b/src/blockchain/compact_filters/peer.rs @@ -34,7 +34,6 @@ use rand::{thread_rng, Rng}; use bitcoin::consensus::Encodable; use bitcoin::hash_types::BlockHash; -use bitcoin::hashes::Hash; use bitcoin::network::constants::ServiceFlags; use bitcoin::network::message::{NetworkMessage, RawNetworkMessage}; use bitcoin::network::message_blockdata::*; @@ -42,7 +41,7 @@ use bitcoin::network::message_filter::*; use bitcoin::network::message_network::VersionMessage; use bitcoin::network::stream_reader::StreamReader; use bitcoin::network::Address; -use bitcoin::{Block, Network, Transaction, Txid}; +use bitcoin::{Block, Network, Transaction, Txid, Wtxid}; use super::CompactFiltersError; @@ -55,37 +54,71 @@ pub(crate) const TIMEOUT_SECS: u64 = 30; /// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not /// duplicated in memory. #[derive(Debug, Default)] -pub struct Mempool { - txs: RwLock>, +pub struct Mempool(RwLock); + +#[derive(Debug, Default)] +struct InnerMempool { + txs: HashMap, + wtxids: HashMap, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum TxIdentifier { + Wtxid(Wtxid), + Txid(Txid), } impl Mempool { + /// Create a new empty mempool + pub fn new() -> Self { + Self::default() + } + /// Add a transaction to the mempool /// /// Note that this doesn't propagate the transaction to other /// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used. pub fn add_tx(&self, tx: Transaction) { - self.txs.write().unwrap().insert(tx.txid(), tx); + let mut guard = self.0.write().unwrap(); + + guard.wtxids.insert(tx.wtxid(), tx.txid()); + guard.txs.insert(tx.txid(), tx); } /// Look-up a transaction in the mempool given an [`Inventory`] request pub fn get_tx(&self, inventory: &Inventory) -> Option { - let txid = match inventory { + let identifer = match inventory { Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None, - Inventory::Transaction(txid) => *txid, - Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()), + Inventory::Transaction(txid) => TxIdentifier::Txid(*txid), + Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid), + Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid), + Inventory::Unknown { inv_type, hash } => { + log::warn!( + "Unknown inventory request type `{}`, hash `{:?}`", + inv_type, + hash + ); + return None; + } }; - self.txs.read().unwrap().get(&txid).cloned() + + let txid = match identifer { + TxIdentifier::Txid(txid) => Some(txid), + TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(), + }; + + txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned()) + .flatten() } /// Return whether or not the mempool contains a transaction with a given txid pub fn has_tx(&self, txid: &Txid) -> bool { - self.txs.read().unwrap().contains_key(txid) + self.0.read().unwrap().txs.contains_key(txid) } /// Return the list of transactions contained in the mempool pub fn iter_txs(&self) -> Vec { - self.txs.read().unwrap().values().cloned().collect() + self.0.read().unwrap().txs.values().cloned().collect() } } @@ -508,6 +541,10 @@ impl InvPeer for Peer { } fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> { + if !self.version.services.has(ServiceFlags::BLOOM) { + return Err(CompactFiltersError::PeerBloomDisabled); + } + self.send(NetworkMessage::MemPool)?; let inv = match self.recv("inv", Some(Duration::from_secs(5)))? { None => return Ok(()), // empty mempool diff --git a/src/blockchain/compact_filters/store.rs b/src/blockchain/compact_filters/store.rs index f552fde3..5e278f24 100644 --- a/src/blockchain/compact_filters/store.rs +++ b/src/blockchain/compact_filters/store.rs @@ -36,9 +36,9 @@ use rand::{thread_rng, Rng}; use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB}; use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable}; -use bitcoin::hash_types::FilterHash; +use bitcoin::hash_types::{FilterHash, FilterHeader}; use bitcoin::hashes::hex::FromHex; -use bitcoin::hashes::{sha256d, Hash}; +use bitcoin::hashes::Hash; use bitcoin::util::bip158::BlockFilter; use bitcoin::util::uint::Uint256; use bitcoin::Block; @@ -52,6 +52,7 @@ lazy_static! { static ref MAINNET_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); static ref TESTNET_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); static ref REGTEST_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); + static ref SIGNET_GENESIS: Block = deserialize(&Vec::::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A008F4D5FAE77031E8AD222030101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap(); } pub trait StoreType: Default + fmt::Debug {} @@ -122,34 +123,8 @@ where } } -impl Encodable for FilterHeader { - fn consensus_encode( - &self, - mut e: W, - ) -> Result { - let mut written = self.prev_header_hash.consensus_encode(&mut e)?; - written += self.filter_hash.consensus_encode(&mut e)?; - Ok(written) - } -} - -impl Decodable for FilterHeader { - fn consensus_decode(mut d: D) -> Result { - let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?; - let filter_hash = FilterHash::consensus_decode(&mut d)?; - - Ok(FilterHeader { - prev_header_hash, - filter_hash, - }) - } -} - impl Encodable for BundleStatus { - fn consensus_encode( - &self, - mut e: W, - ) -> Result { + fn consensus_encode(&self, mut e: W) -> Result { let mut written = 0; match self { @@ -264,6 +239,7 @@ impl ChainStore { Network::Bitcoin => MAINNET_GENESIS.deref(), Network::Testnet => TESTNET_GENESIS.deref(), Network::Regtest => REGTEST_GENESIS.deref(), + Network::Signet => SIGNET_GENESIS.deref(), }; let cf_name = "default".to_string(); @@ -658,22 +634,6 @@ impl fmt::Debug for ChainStore { } } -pub type FilterHeaderHash = FilterHash; - -#[derive(Debug, Clone)] -pub struct FilterHeader { - prev_header_hash: FilterHeaderHash, - filter_hash: FilterHash, -} - -impl FilterHeader { - fn header_hash(&self) -> FilterHeaderHash { - let mut hash_data = self.filter_hash.into_inner().to_vec(); - hash_data.extend_from_slice(&self.prev_header_hash); - sha256d::Hash::hash(&hash_data).into() - } -} - pub enum BundleStatus { Init, CFHeaders { cf_headers: Vec }, @@ -688,7 +648,7 @@ pub struct CFStore { filter_type: u8, } -type BundleEntry = (BundleStatus, FilterHeaderHash); +type BundleEntry = (BundleStatus, FilterHeader); impl CFStore { pub fn new( @@ -704,6 +664,7 @@ impl CFStore { Network::Bitcoin => MAINNET_GENESIS.deref(), Network::Testnet => TESTNET_GENESIS.deref(), Network::Regtest => REGTEST_GENESIS.deref(), + Network::Signet => SIGNET_GENESIS.deref(), }; let filter = BlockFilter::new_script_filter(genesis, |utxo| { @@ -717,7 +678,11 @@ impl CFStore { if read_store.get_pinned(&first_key)?.is_none() { read_store.put( &first_key, - (BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(), + ( + BundleStatus::Init, + filter.filter_header(&FilterHeader::from_hash(Default::default())), + ) + .serialize(), )?; } } @@ -743,7 +708,7 @@ impl CFStore { .collect::>() } - pub fn get_checkpoints(&self) -> Result, CompactFiltersError> { + pub fn get_checkpoints(&self) -> Result, CompactFiltersError> { let read_store = self.store.read().unwrap(); let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key(); @@ -760,7 +725,7 @@ impl CFStore { pub fn replace_checkpoints( &self, - checkpoints: Vec, + checkpoints: Vec, ) -> Result<(), CompactFiltersError> { let current_checkpoints = self.get_checkpoints()?; @@ -802,20 +767,16 @@ impl CFStore { pub fn advance_to_cf_headers( &self, bundle: usize, - checkpoint_hash: FilterHeaderHash, - filter_headers: Vec, + checkpoint: FilterHeader, + filter_hashes: Vec, ) -> Result { - let mut last_hash = checkpoint_hash; - let cf_headers = filter_headers + let cf_headers: Vec = filter_hashes .into_iter() - .map(|filter_hash| { - let filter_header = FilterHeader { - prev_header_hash: last_hash, - filter_hash, - }; - last_hash = filter_header.header_hash(); + .scan(checkpoint, |prev_header, filter_hash| { + let filter_header = filter_hash.filter_header(&prev_header); + *prev_header = filter_header; - filter_header + Some(filter_header) }) .collect(); @@ -828,13 +789,13 @@ impl CFStore { .transpose()? { // check connection with the next bundle if present - if last_hash != next_checkpoint { + if cf_headers.iter().last() != Some(&next_checkpoint) { return Err(CompactFiltersError::InvalidFilterHeader); } } let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); - let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash); + let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint); read_store.put(key, value.serialize())?; @@ -844,24 +805,26 @@ impl CFStore { pub fn advance_to_cf_filters( &self, bundle: usize, - checkpoint_hash: FilterHeaderHash, + checkpoint: FilterHeader, headers: Vec, filters: Vec<(usize, Vec)>, ) -> Result { let cf_filters = filters .into_iter() - .zip(headers.iter()) - .map(|((_, filter_content), header)| { - if header.filter_hash != sha256d::Hash::hash(&filter_content).into() { - return Err(CompactFiltersError::InvalidFilter); + .zip(headers.into_iter()) + .scan(checkpoint, |prev_header, ((_, filter_content), header)| { + let filter = BlockFilter::new(&filter_content); + if header != filter.filter_header(&prev_header) { + return Some(Err(CompactFiltersError::InvalidFilter)); } + *prev_header = header; - Ok::<_, CompactFiltersError>(filter_content) + Some(Ok::<_, CompactFiltersError>(filter_content)) }) .collect::>()?; let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); - let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash); + let value = (BundleStatus::CFilters { cf_filters }, checkpoint); let read_store = self.store.read().unwrap(); read_store.put(key, value.serialize())?; @@ -872,10 +835,10 @@ impl CFStore { pub fn prune_filters( &self, bundle: usize, - checkpoint_hash: FilterHeaderHash, + checkpoint: FilterHeader, ) -> Result { let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); - let value = (BundleStatus::Pruned, checkpoint_hash); + let value = (BundleStatus::Pruned, checkpoint); let read_store = self.store.read().unwrap(); read_store.put(key, value.serialize())?; @@ -887,10 +850,10 @@ impl CFStore { &self, bundle: usize, cf_filters: Vec>, - checkpoint_hash: FilterHeaderHash, + checkpoint: FilterHeader, ) -> Result { let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key(); - let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash); + let value = (BundleStatus::Tip { cf_filters }, checkpoint); let read_store = self.store.read().unwrap(); read_store.put(key, value.serialize())?; diff --git a/src/blockchain/compact_filters/sync.rs b/src/blockchain/compact_filters/sync.rs index a935fc1f..5b8a3aaa 100644 --- a/src/blockchain/compact_filters/sync.rs +++ b/src/blockchain/compact_filters/sync.rs @@ -26,7 +26,7 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use bitcoin::hash_types::{BlockHash, FilterHash}; +use bitcoin::hash_types::{BlockHash, FilterHeader}; use bitcoin::network::message::NetworkMessage; use bitcoin::network::message_blockdata::GetHeadersMessage; use bitcoin::util::bip158::BlockFilter; @@ -42,7 +42,7 @@ pub struct CFSync { headers_store: Arc>, cf_store: Arc, skip_blocks: usize, - bundles: Mutex>, + bundles: Mutex>, } impl CFSync { @@ -148,7 +148,7 @@ impl CFSync { let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?; - assert!(resp.previous_filter == checkpoint); + assert!(resp.previous_filter_header == checkpoint); status = self.cf_store .advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?;