[compact_filters] Use the new rust-bitcoin API

This commit is contained in:
Alekos Filini 2021-02-02 19:52:44 -05:00
parent 023dabd9b2
commit 19eca4e2d1
No known key found for this signature in database
GPG Key ID: 431401E4A4530061
4 changed files with 95 additions and 88 deletions

View File

@ -383,7 +383,12 @@ impl Blockchain for CompactFiltersBlockchain {
}
database.commit_batch(updates)?;
first_peer.ask_for_mempool()?;
match first_peer.ask_for_mempool() {
Err(CompactFiltersError::PeerBloomDisabled) => {
log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
}
e => e?,
};
let mut internal_max_deriv = None;
let mut external_max_deriv = None;
@ -537,6 +542,8 @@ pub enum CompactFiltersError {
NotConnected,
/// A peer took too long to reply to one of our messages
Timeout,
/// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag
PeerBloomDisabled,
/// No peers have been specified
NoPeers,

View File

@ -34,7 +34,6 @@ use rand::{thread_rng, Rng};
use bitcoin::consensus::Encodable;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::Hash;
use bitcoin::network::constants::ServiceFlags;
use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
use bitcoin::network::message_blockdata::*;
@ -42,7 +41,7 @@ use bitcoin::network::message_filter::*;
use bitcoin::network::message_network::VersionMessage;
use bitcoin::network::stream_reader::StreamReader;
use bitcoin::network::Address;
use bitcoin::{Block, Network, Transaction, Txid};
use bitcoin::{Block, Network, Transaction, Txid, Wtxid};
use super::CompactFiltersError;
@ -55,37 +54,71 @@ pub(crate) const TIMEOUT_SECS: u64 = 30;
/// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not
/// duplicated in memory.
#[derive(Debug, Default)]
pub struct Mempool {
txs: RwLock<HashMap<Txid, Transaction>>,
pub struct Mempool(RwLock<InnerMempool>);
#[derive(Debug, Default)]
struct InnerMempool {
txs: HashMap<Txid, Transaction>,
wtxids: HashMap<Wtxid, Txid>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum TxIdentifier {
Wtxid(Wtxid),
Txid(Txid),
}
impl Mempool {
/// Create a new empty mempool
pub fn new() -> Self {
Self::default()
}
/// Add a transaction to the mempool
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::Blockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
self.txs.write().unwrap().insert(tx.txid(), tx);
let mut guard = self.0.write().unwrap();
guard.wtxids.insert(tx.wtxid(), tx.txid());
guard.txs.insert(tx.txid(), tx);
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
let txid = match inventory {
let identifer = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
Inventory::Transaction(txid) => *txid,
Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()),
Inventory::Transaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WitnessTransaction(txid) => TxIdentifier::Txid(*txid),
Inventory::WTx(wtxid) => TxIdentifier::Wtxid(*wtxid),
Inventory::Unknown { inv_type, hash } => {
log::warn!(
"Unknown inventory request type `{}`, hash `{:?}`",
inv_type,
hash
);
return None;
}
};
self.txs.read().unwrap().get(&txid).cloned()
let txid = match identifer {
TxIdentifier::Txid(txid) => Some(txid),
TxIdentifier::Wtxid(wtxid) => self.0.read().unwrap().wtxids.get(&wtxid).cloned(),
};
txid.map(|txid| self.0.read().unwrap().txs.get(&txid).cloned())
.flatten()
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
self.txs.read().unwrap().contains_key(txid)
self.0.read().unwrap().txs.contains_key(txid)
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
self.txs.read().unwrap().values().cloned().collect()
self.0.read().unwrap().txs.values().cloned().collect()
}
}
@ -508,6 +541,10 @@ impl InvPeer for Peer {
}
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
if !self.version.services.has(ServiceFlags::BLOOM) {
return Err(CompactFiltersError::PeerBloomDisabled);
}
self.send(NetworkMessage::MemPool)?;
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
None => return Ok(()), // empty mempool

View File

@ -36,9 +36,9 @@ use rand::{thread_rng, Rng};
use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
use bitcoin::hash_types::FilterHash;
use bitcoin::hash_types::{FilterHash, FilterHeader};
use bitcoin::hashes::hex::FromHex;
use bitcoin::hashes::{sha256d, Hash};
use bitcoin::hashes::Hash;
use bitcoin::util::bip158::BlockFilter;
use bitcoin::util::uint::Uint256;
use bitcoin::Block;
@ -52,6 +52,7 @@ lazy_static! {
static ref MAINNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
static ref TESTNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
static ref SIGNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A008F4D5FAE77031E8AD222030101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
}
pub trait StoreType: Default + fmt::Debug {}
@ -122,34 +123,8 @@ where
}
}
impl Encodable for FilterHeader {
fn consensus_encode<W: Write>(
&self,
mut e: W,
) -> Result<usize, bitcoin::consensus::encode::Error> {
let mut written = self.prev_header_hash.consensus_encode(&mut e)?;
written += self.filter_hash.consensus_encode(&mut e)?;
Ok(written)
}
}
impl Decodable for FilterHeader {
fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?;
let filter_hash = FilterHash::consensus_decode(&mut d)?;
Ok(FilterHeader {
prev_header_hash,
filter_hash,
})
}
}
impl Encodable for BundleStatus {
fn consensus_encode<W: Write>(
&self,
mut e: W,
) -> Result<usize, bitcoin::consensus::encode::Error> {
fn consensus_encode<W: Write>(&self, mut e: W) -> Result<usize, std::io::Error> {
let mut written = 0;
match self {
@ -264,6 +239,7 @@ impl ChainStore<Full> {
Network::Bitcoin => MAINNET_GENESIS.deref(),
Network::Testnet => TESTNET_GENESIS.deref(),
Network::Regtest => REGTEST_GENESIS.deref(),
Network::Signet => SIGNET_GENESIS.deref(),
};
let cf_name = "default".to_string();
@ -658,22 +634,6 @@ impl<T: StoreType> fmt::Debug for ChainStore<T> {
}
}
pub type FilterHeaderHash = FilterHash;
#[derive(Debug, Clone)]
pub struct FilterHeader {
prev_header_hash: FilterHeaderHash,
filter_hash: FilterHash,
}
impl FilterHeader {
fn header_hash(&self) -> FilterHeaderHash {
let mut hash_data = self.filter_hash.into_inner().to_vec();
hash_data.extend_from_slice(&self.prev_header_hash);
sha256d::Hash::hash(&hash_data).into()
}
}
pub enum BundleStatus {
Init,
CFHeaders { cf_headers: Vec<FilterHeader> },
@ -688,7 +648,7 @@ pub struct CFStore {
filter_type: u8,
}
type BundleEntry = (BundleStatus, FilterHeaderHash);
type BundleEntry = (BundleStatus, FilterHeader);
impl CFStore {
pub fn new(
@ -704,6 +664,7 @@ impl CFStore {
Network::Bitcoin => MAINNET_GENESIS.deref(),
Network::Testnet => TESTNET_GENESIS.deref(),
Network::Regtest => REGTEST_GENESIS.deref(),
Network::Signet => SIGNET_GENESIS.deref(),
};
let filter = BlockFilter::new_script_filter(genesis, |utxo| {
@ -717,7 +678,11 @@ impl CFStore {
if read_store.get_pinned(&first_key)?.is_none() {
read_store.put(
&first_key,
(BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(),
(
BundleStatus::Init,
filter.filter_header(&FilterHeader::from_hash(Default::default())),
)
.serialize(),
)?;
}
}
@ -743,7 +708,7 @@ impl CFStore {
.collect::<Result<_, _>>()
}
pub fn get_checkpoints(&self) -> Result<Vec<FilterHash>, CompactFiltersError> {
pub fn get_checkpoints(&self) -> Result<Vec<FilterHeader>, CompactFiltersError> {
let read_store = self.store.read().unwrap();
let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
@ -760,7 +725,7 @@ impl CFStore {
pub fn replace_checkpoints(
&self,
checkpoints: Vec<FilterHash>,
checkpoints: Vec<FilterHeader>,
) -> Result<(), CompactFiltersError> {
let current_checkpoints = self.get_checkpoints()?;
@ -802,20 +767,16 @@ impl CFStore {
pub fn advance_to_cf_headers(
&self,
bundle: usize,
checkpoint_hash: FilterHeaderHash,
filter_headers: Vec<FilterHash>,
checkpoint: FilterHeader,
filter_hashes: Vec<FilterHash>,
) -> Result<BundleStatus, CompactFiltersError> {
let mut last_hash = checkpoint_hash;
let cf_headers = filter_headers
let cf_headers: Vec<FilterHeader> = filter_hashes
.into_iter()
.map(|filter_hash| {
let filter_header = FilterHeader {
prev_header_hash: last_hash,
filter_hash,
};
last_hash = filter_header.header_hash();
.scan(checkpoint, |prev_header, filter_hash| {
let filter_header = filter_hash.filter_header(&prev_header);
*prev_header = filter_header;
filter_header
Some(filter_header)
})
.collect();
@ -828,13 +789,13 @@ impl CFStore {
.transpose()?
{
// check connection with the next bundle if present
if last_hash != next_checkpoint {
if cf_headers.iter().last() != Some(&next_checkpoint) {
return Err(CompactFiltersError::InvalidFilterHeader);
}
}
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash);
let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint);
read_store.put(key, value.serialize())?;
@ -844,24 +805,26 @@ impl CFStore {
pub fn advance_to_cf_filters(
&self,
bundle: usize,
checkpoint_hash: FilterHeaderHash,
checkpoint: FilterHeader,
headers: Vec<FilterHeader>,
filters: Vec<(usize, Vec<u8>)>,
) -> Result<BundleStatus, CompactFiltersError> {
let cf_filters = filters
.into_iter()
.zip(headers.iter())
.map(|((_, filter_content), header)| {
if header.filter_hash != sha256d::Hash::hash(&filter_content).into() {
return Err(CompactFiltersError::InvalidFilter);
.zip(headers.into_iter())
.scan(checkpoint, |prev_header, ((_, filter_content), header)| {
let filter = BlockFilter::new(&filter_content);
if header != filter.filter_header(&prev_header) {
return Some(Err(CompactFiltersError::InvalidFilter));
}
*prev_header = header;
Ok::<_, CompactFiltersError>(filter_content)
Some(Ok::<_, CompactFiltersError>(filter_content))
})
.collect::<Result<_, _>>()?;
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash);
let value = (BundleStatus::CFilters { cf_filters }, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;
@ -872,10 +835,10 @@ impl CFStore {
pub fn prune_filters(
&self,
bundle: usize,
checkpoint_hash: FilterHeaderHash,
checkpoint: FilterHeader,
) -> Result<BundleStatus, CompactFiltersError> {
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
let value = (BundleStatus::Pruned, checkpoint_hash);
let value = (BundleStatus::Pruned, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;
@ -887,10 +850,10 @@ impl CFStore {
&self,
bundle: usize,
cf_filters: Vec<Vec<u8>>,
checkpoint_hash: FilterHeaderHash,
checkpoint: FilterHeader,
) -> Result<BundleStatus, CompactFiltersError> {
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash);
let value = (BundleStatus::Tip { cf_filters }, checkpoint);
let read_store = self.store.read().unwrap();
read_store.put(key, value.serialize())?;

View File

@ -26,7 +26,7 @@ use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bitcoin::hash_types::{BlockHash, FilterHash};
use bitcoin::hash_types::{BlockHash, FilterHeader};
use bitcoin::network::message::NetworkMessage;
use bitcoin::network::message_blockdata::GetHeadersMessage;
use bitcoin::util::bip158::BlockFilter;
@ -42,7 +42,7 @@ pub struct CFSync {
headers_store: Arc<ChainStore<Full>>,
cf_store: Arc<CFStore>,
skip_blocks: usize,
bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
bundles: Mutex<VecDeque<(BundleStatus, FilterHeader, usize)>>,
}
impl CFSync {
@ -148,7 +148,7 @@ impl CFSync {
let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
assert!(resp.previous_filter == checkpoint);
assert!(resp.previous_filter_header == checkpoint);
status =
self.cf_store
.advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?;