Compact Filters blockchain implementation
This commit is contained in:
parent
c12aa3d327
commit
77c95b93ac
@ -13,6 +13,7 @@ env:
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=minimal,esplora NO_DEFAULT_FEATURES=1
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=key-value-db NO_DEFAULT_FEATURES=1
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=electrum NO_DEFAULT_FEATURES=1
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=compact_filters NO_DEFAULT_FEATURES=1
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=cli-utils,esplora NO_DEFAULT_FEATURES=1
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=compiler NO_DEFAULT_FEATURES=1 RUN_TESTS=1 # Test the `miniscriptc` example
|
||||
- TARGET=x86_64-unknown-linux-gnu FEATURES=test-electrum NO_DEFAULT_FEATURES=1 RUN_TESTS=1 RUN_CORE=1
|
||||
|
@ -14,13 +14,15 @@ serde_json = { version = "^1.0" }
|
||||
rand = "^0.7"
|
||||
|
||||
# Optional dependencies
|
||||
sled = { version = "0.31.0", optional = true }
|
||||
sled = { version = "0.34", optional = true }
|
||||
electrum-client = { version = "0.2.0-beta.1", optional = true }
|
||||
reqwest = { version = "0.10", optional = true, features = ["json"] }
|
||||
futures = { version = "0.3", optional = true }
|
||||
clap = { version = "2.33", optional = true }
|
||||
base64 = { version = "^0.11", optional = true }
|
||||
async-trait = { version = "0.1", optional = true }
|
||||
rocksdb = { version = "0.14", optional = true }
|
||||
lazy_static = { version = "1.4", optional = true }
|
||||
|
||||
# Platform-specific dependencies
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
@ -36,6 +38,7 @@ compiler = ["clap", "miniscript/compiler"]
|
||||
default = ["key-value-db", "electrum"]
|
||||
electrum = ["electrum-client"]
|
||||
esplora = ["reqwest", "futures"]
|
||||
compact_filters = ["rocksdb", "lazy_static"]
|
||||
key-value-db = ["sled"]
|
||||
cli-utils = ["clap", "base64"]
|
||||
async-interface = ["async-trait"]
|
||||
|
465
src/blockchain/compact_filters/mod.rs
Normal file
465
src/blockchain/compact_filters/mod.rs
Normal file
@ -0,0 +1,465 @@
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace};
|
||||
|
||||
use bitcoin::network::message_blockdata::Inventory;
|
||||
use bitcoin::{BitcoinHash, Network, OutPoint, Transaction, Txid};
|
||||
|
||||
use rocksdb::{Options, SliceTransform, DB};
|
||||
|
||||
mod peer;
|
||||
mod store;
|
||||
mod sync;
|
||||
|
||||
use super::{Blockchain, Capability, OnlineBlockchain, Progress};
|
||||
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
|
||||
use crate::error::Error;
|
||||
use crate::types::{ScriptType, TransactionDetails, UTXO};
|
||||
use crate::FeeRate;
|
||||
|
||||
use peer::*;
|
||||
use store::*;
|
||||
use sync::*;
|
||||
|
||||
const SYNC_HEADERS_COST: f32 = 1.0;
|
||||
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
|
||||
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CompactFiltersBlockchain(Option<CompactFilters>);
|
||||
|
||||
impl CompactFiltersBlockchain {
|
||||
pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
|
||||
address: A,
|
||||
storage_dir: P,
|
||||
num_threads: usize,
|
||||
skip_blocks: usize,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
Ok(CompactFiltersBlockchain(Some(CompactFilters::new(
|
||||
address,
|
||||
storage_dir,
|
||||
num_threads,
|
||||
skip_blocks,
|
||||
network,
|
||||
)?)))
|
||||
}
|
||||
}
|
||||
|
||||
struct CompactFilters {
|
||||
peer: Arc<Peer>,
|
||||
headers: Arc<HeadersStore<Full>>,
|
||||
skip_blocks: usize,
|
||||
num_threads: usize,
|
||||
}
|
||||
|
||||
impl CompactFilters {
|
||||
pub fn new<A: ToSocketAddrs, P: AsRef<Path>>(
|
||||
address: A,
|
||||
storage_dir: P,
|
||||
num_threads: usize,
|
||||
skip_blocks: usize,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
|
||||
|
||||
let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or(vec!["default".to_string()]);
|
||||
let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
|
||||
let headers = Arc::new(HeadersStore::new(db, network)?);
|
||||
|
||||
// try to recover partial snapshots
|
||||
for cf_name in &cfs {
|
||||
if !cf_name.starts_with("_headers:") {
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Trying to recover: {:?}", cf_name);
|
||||
headers.recover_snapshot(cf_name)?;
|
||||
}
|
||||
|
||||
Ok(CompactFilters {
|
||||
peer: Arc::new(Peer::new(address, Arc::new(Mempool::default()), network)?),
|
||||
num_threads,
|
||||
headers,
|
||||
skip_blocks,
|
||||
})
|
||||
}
|
||||
|
||||
fn process_tx<D: BatchDatabase + DatabaseUtils>(
|
||||
&self,
|
||||
database: &mut D,
|
||||
tx: &Transaction,
|
||||
height: Option<u32>,
|
||||
timestamp: u64,
|
||||
internal_max_deriv: &mut u32,
|
||||
external_max_deriv: &mut u32,
|
||||
) -> Result<(), Error> {
|
||||
let mut updates = database.begin_batch();
|
||||
|
||||
let mut incoming: u64 = 0;
|
||||
let mut outgoing: u64 = 0;
|
||||
|
||||
let mut inputs_sum: u64 = 0;
|
||||
let mut outputs_sum: u64 = 0;
|
||||
|
||||
// look for our own inputs
|
||||
for (i, input) in tx.input.iter().enumerate() {
|
||||
if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
|
||||
inputs_sum += previous_output.value;
|
||||
|
||||
if database.is_mine(&previous_output.script_pubkey)? {
|
||||
outgoing += previous_output.value;
|
||||
|
||||
debug!("{} input #{} is mine, removing from utxo", tx.txid(), i);
|
||||
updates.del_utxo(&input.previous_output)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (i, output) in tx.output.iter().enumerate() {
|
||||
// to compute the fees later
|
||||
outputs_sum += output.value;
|
||||
|
||||
// this output is ours, we have a path to derive it
|
||||
if let Some((script_type, child)) =
|
||||
database.get_path_from_script_pubkey(&output.script_pubkey)?
|
||||
{
|
||||
debug!("{} output #{} is mine, adding utxo", tx.txid(), i);
|
||||
updates.set_utxo(&UTXO {
|
||||
outpoint: OutPoint::new(tx.txid(), i as u32),
|
||||
txout: output.clone(),
|
||||
is_internal: script_type.is_internal(),
|
||||
})?;
|
||||
incoming += output.value;
|
||||
|
||||
if script_type == ScriptType::Internal && child > *internal_max_deriv {
|
||||
*internal_max_deriv = child;
|
||||
} else if script_type == ScriptType::External && child > *external_max_deriv {
|
||||
*external_max_deriv = child;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if incoming > 0 || outgoing > 0 {
|
||||
let tx = TransactionDetails {
|
||||
txid: tx.txid(),
|
||||
transaction: Some(tx.clone()),
|
||||
received: incoming,
|
||||
sent: outgoing,
|
||||
height,
|
||||
timestamp,
|
||||
fees: inputs_sum.checked_sub(outputs_sum).unwrap_or(0),
|
||||
};
|
||||
|
||||
info!("Saving tx {}", tx.txid);
|
||||
updates.set_tx(&tx)?;
|
||||
}
|
||||
|
||||
database.commit_batch(updates)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Blockchain for CompactFiltersBlockchain {
|
||||
fn offline() -> Self {
|
||||
CompactFiltersBlockchain(None)
|
||||
}
|
||||
|
||||
fn is_online(&self) -> bool {
|
||||
self.0.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl OnlineBlockchain for CompactFiltersBlockchain {
|
||||
fn get_capabilities(&self) -> HashSet<Capability> {
|
||||
vec![Capability::FullHistory].into_iter().collect()
|
||||
}
|
||||
|
||||
fn setup<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
|
||||
&self,
|
||||
_stop_gap: Option<usize>, // TODO: move to electrum and esplora only
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error> {
|
||||
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
|
||||
|
||||
let cf_sync = Arc::new(CFSync::new(
|
||||
Arc::clone(&inner.headers),
|
||||
inner.skip_blocks,
|
||||
0x00,
|
||||
)?);
|
||||
|
||||
let initial_height = inner.headers.get_height()?;
|
||||
let total_bundles = (inner.peer.get_version().start_height as usize)
|
||||
.checked_sub(inner.skip_blocks)
|
||||
.map(|x| x / 1000)
|
||||
.unwrap_or(0)
|
||||
+ 1;
|
||||
let expected_bundles_to_sync = total_bundles
|
||||
.checked_sub(cf_sync.pruned_bundles()?)
|
||||
.unwrap_or(0);
|
||||
|
||||
let headers_cost = (inner.peer.get_version().start_height as usize)
|
||||
.checked_sub(initial_height)
|
||||
.unwrap_or(0) as f32
|
||||
* SYNC_HEADERS_COST;
|
||||
let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST;
|
||||
|
||||
let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
|
||||
|
||||
if let Some(snapshot) = sync::sync_headers(
|
||||
Arc::clone(&inner.peer),
|
||||
Arc::clone(&inner.headers),
|
||||
|new_height| {
|
||||
let local_headers_cost =
|
||||
new_height.checked_sub(initial_height).unwrap_or(0) as f32 * SYNC_HEADERS_COST;
|
||||
progress_update.update(
|
||||
local_headers_cost / total_cost * 100.0,
|
||||
Some(format!("Synced headers to {}", new_height)),
|
||||
)
|
||||
},
|
||||
)? {
|
||||
if snapshot.work()? > inner.headers.work()? {
|
||||
info!("Applying snapshot with work: {}", snapshot.work()?);
|
||||
inner.headers.apply_snapshot(snapshot)?;
|
||||
}
|
||||
}
|
||||
|
||||
let synced_height = inner.headers.get_height()?;
|
||||
let buried_height = synced_height
|
||||
.checked_sub(sync::BURIED_CONFIRMATIONS)
|
||||
.unwrap_or(0);
|
||||
info!("Synced headers to height: {}", synced_height);
|
||||
|
||||
cf_sync.prepare_sync(Arc::clone(&inner.peer))?;
|
||||
|
||||
let all_scripts = Arc::new(
|
||||
database
|
||||
.iter_script_pubkeys(None)?
|
||||
.into_iter()
|
||||
.map(|s| s.to_bytes())
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
let last_synced_block = Arc::new(Mutex::new(synced_height));
|
||||
let synced_bundles = Arc::new(AtomicUsize::new(0));
|
||||
let progress_update = Arc::new(Mutex::new(progress_update));
|
||||
|
||||
let mut threads = Vec::with_capacity(inner.num_threads);
|
||||
for _ in 0..inner.num_threads {
|
||||
let cf_sync = Arc::clone(&cf_sync);
|
||||
let peer = Arc::new(inner.peer.new_connection()?);
|
||||
let headers = Arc::clone(&inner.headers);
|
||||
let all_scripts = Arc::clone(&all_scripts);
|
||||
let last_synced_block = Arc::clone(&last_synced_block);
|
||||
let progress_update = Arc::clone(&progress_update);
|
||||
let synced_bundles = Arc::clone(&synced_bundles);
|
||||
|
||||
let thread = std::thread::spawn(move || {
|
||||
cf_sync.capture_thread_for_sync(
|
||||
peer,
|
||||
|block_hash, filter| {
|
||||
if !filter
|
||||
.match_any(block_hash, &mut all_scripts.iter().map(AsRef::as_ref))?
|
||||
{
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let block_height = headers.get_height_for(block_hash)?.unwrap_or(0);
|
||||
let saved_correct_block = match headers.get_full_block(block_height)? {
|
||||
Some(block) if &block.bitcoin_hash() == block_hash => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if saved_correct_block {
|
||||
Ok(false)
|
||||
} else {
|
||||
let mut last_synced_block = last_synced_block.lock().unwrap();
|
||||
|
||||
// If we download a block older than `last_synced_block`, we update it so that
|
||||
// we know to delete and re-process all txs starting from that height
|
||||
if block_height < *last_synced_block {
|
||||
*last_synced_block = block_height;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
},
|
||||
|index| {
|
||||
let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst);
|
||||
let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST;
|
||||
progress_update.lock().unwrap().update(
|
||||
(headers_cost + local_filters_cost) / total_cost * 100.0,
|
||||
Some(format!(
|
||||
"Synced filters {} - {}",
|
||||
index * 1000 + 1,
|
||||
(index + 1) * 1000
|
||||
)),
|
||||
)
|
||||
},
|
||||
)
|
||||
});
|
||||
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for t in threads {
|
||||
t.join().unwrap()?;
|
||||
}
|
||||
|
||||
progress_update.lock().unwrap().update(
|
||||
(headers_cost + filters_cost) / total_cost * 100.0,
|
||||
Some("Processing downloaded blocks and mempool".into()),
|
||||
)?;
|
||||
|
||||
// delete all txs newer than last_synced_block
|
||||
let last_synced_block = *last_synced_block.lock().unwrap();
|
||||
log::debug!(
|
||||
"Dropping transactions newer than `last_synced_block` = {}",
|
||||
last_synced_block
|
||||
);
|
||||
let mut updates = database.begin_batch();
|
||||
for details in database.iter_txs(false)? {
|
||||
match details.height {
|
||||
Some(height) if (height as usize) < last_synced_block => continue,
|
||||
_ => updates.del_tx(&details.txid, false)?,
|
||||
};
|
||||
}
|
||||
database.commit_batch(updates)?;
|
||||
|
||||
inner.peer.ask_for_mempool()?;
|
||||
|
||||
let mut internal_max_deriv = 0;
|
||||
let mut external_max_deriv = 0;
|
||||
|
||||
for (height, block) in inner.headers.iter_full_blocks()? {
|
||||
for tx in &block.txdata {
|
||||
inner.process_tx(
|
||||
database,
|
||||
tx,
|
||||
Some(height as u32),
|
||||
0,
|
||||
&mut internal_max_deriv,
|
||||
&mut external_max_deriv,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
for tx in inner.peer.get_mempool().iter_txs().iter() {
|
||||
inner.process_tx(
|
||||
database,
|
||||
tx,
|
||||
None,
|
||||
0,
|
||||
&mut internal_max_deriv,
|
||||
&mut external_max_deriv,
|
||||
)?;
|
||||
}
|
||||
|
||||
let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0);
|
||||
let first_ext_new = external_max_deriv as u32 + 1;
|
||||
if first_ext_new > current_ext {
|
||||
info!("Setting external index to {}", first_ext_new);
|
||||
database.set_last_index(ScriptType::External, first_ext_new)?;
|
||||
}
|
||||
|
||||
let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0);
|
||||
let first_int_new = internal_max_deriv + 1;
|
||||
if first_int_new > current_int {
|
||||
info!("Setting internal index to {}", first_int_new);
|
||||
database.set_last_index(ScriptType::Internal, first_int_new)?;
|
||||
}
|
||||
|
||||
info!("Dropping blocks until {}", buried_height);
|
||||
inner.headers.delete_blocks_until(buried_height)?;
|
||||
|
||||
progress_update
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update(100.0, Some("Done".into()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
|
||||
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
|
||||
|
||||
Ok(inner
|
||||
.peer
|
||||
.get_mempool()
|
||||
.get_tx(&Inventory::Transaction(*txid)))
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
|
||||
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
|
||||
inner.peer.broadcast_tx(tx.clone())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_height(&self) -> Result<u32, Error> {
|
||||
let inner = self.0.as_ref().ok_or(Error::OfflineClient)?;
|
||||
|
||||
Ok(inner.headers.get_height()? as u32)
|
||||
}
|
||||
|
||||
fn estimate_fee(&self, _target: usize) -> Result<FeeRate, Error> {
|
||||
// TODO
|
||||
Ok(FeeRate::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for CompactFilters {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CompactFilters")
|
||||
.field("peer", &self.peer)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CompactFiltersError {
|
||||
InvalidResponse,
|
||||
InvalidHeaders,
|
||||
InvalidFilterHeader,
|
||||
InvalidFilter,
|
||||
MissingBlock,
|
||||
DataCorruption,
|
||||
Timeout,
|
||||
|
||||
DB(rocksdb::Error),
|
||||
IO(std::io::Error),
|
||||
BIP158(bitcoin::util::bip158::Error),
|
||||
Time(std::time::SystemTimeError),
|
||||
|
||||
Global(Box<crate::error::Error>),
|
||||
}
|
||||
|
||||
macro_rules! impl_error {
|
||||
( $from:ty, $to:ident ) => {
|
||||
impl std::convert::From<$from> for CompactFiltersError {
|
||||
fn from(err: $from) -> Self {
|
||||
CompactFiltersError::$to(err)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_error!(rocksdb::Error, DB);
|
||||
impl_error!(std::io::Error, IO);
|
||||
impl_error!(bitcoin::util::bip158::Error, BIP158);
|
||||
impl_error!(std::time::SystemTimeError, Time);
|
||||
|
||||
impl From<crate::error::Error> for CompactFiltersError {
|
||||
fn from(err: crate::error::Error) -> Self {
|
||||
CompactFiltersError::Global(Box::new(err))
|
||||
}
|
||||
}
|
468
src/blockchain/compact_filters/peer.rs
Normal file
468
src/blockchain/compact_filters/peer.rs
Normal file
@ -0,0 +1,468 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use bitcoin::consensus::Encodable;
|
||||
use bitcoin::hash_types::BlockHash;
|
||||
use bitcoin::hashes::Hash;
|
||||
use bitcoin::network::constants::ServiceFlags;
|
||||
use bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
|
||||
use bitcoin::network::message_blockdata::*;
|
||||
use bitcoin::network::message_filter::*;
|
||||
use bitcoin::network::message_network::VersionMessage;
|
||||
use bitcoin::network::stream_reader::StreamReader;
|
||||
use bitcoin::network::Address;
|
||||
use bitcoin::{Block, Network, Transaction, Txid};
|
||||
|
||||
use super::CompactFiltersError;
|
||||
|
||||
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Mempool {
|
||||
txs: RwLock<HashMap<Txid, Transaction>>,
|
||||
}
|
||||
|
||||
impl Mempool {
|
||||
pub fn add_tx(&self, tx: Transaction) {
|
||||
self.txs.write().unwrap().insert(tx.txid(), tx);
|
||||
}
|
||||
|
||||
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
|
||||
let txid = match inventory {
|
||||
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
|
||||
Inventory::Transaction(txid) => *txid,
|
||||
Inventory::WitnessTransaction(wtxid) => Txid::from_inner(wtxid.into_inner()),
|
||||
};
|
||||
self.txs.read().unwrap().get(&txid).cloned()
|
||||
}
|
||||
|
||||
pub fn has_tx(&self, txid: &Txid) -> bool {
|
||||
self.txs.read().unwrap().contains_key(txid)
|
||||
}
|
||||
|
||||
pub fn iter_txs(&self) -> Vec<Transaction> {
|
||||
self.txs.read().unwrap().values().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Peer {
|
||||
writer: Arc<Mutex<TcpStream>>,
|
||||
responses: Arc<RwLock<ResponsesMap>>,
|
||||
|
||||
reader_thread: thread::JoinHandle<()>,
|
||||
connected: Arc<RwLock<bool>>,
|
||||
|
||||
mempool: Arc<Mempool>,
|
||||
|
||||
version: VersionMessage,
|
||||
network: Network,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn new<A: ToSocketAddrs>(
|
||||
address: A,
|
||||
mempool: Arc<Mempool>,
|
||||
network: Network,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
let connection = TcpStream::connect(address)?;
|
||||
|
||||
let writer = Arc::new(Mutex::new(connection.try_clone()?));
|
||||
let responses: Arc<RwLock<ResponsesMap>> = Arc::new(RwLock::new(HashMap::new()));
|
||||
let connected = Arc::new(RwLock::new(true));
|
||||
|
||||
let mut locked_writer = writer.lock().unwrap();
|
||||
|
||||
let reader_thread_responses = Arc::clone(&responses);
|
||||
let reader_thread_writer = Arc::clone(&writer);
|
||||
let reader_thread_mempool = Arc::clone(&mempool);
|
||||
let reader_thread_connected = Arc::clone(&connected);
|
||||
let reader_thread = thread::spawn(move || {
|
||||
Self::reader_thread(
|
||||
network,
|
||||
connection,
|
||||
reader_thread_responses,
|
||||
reader_thread_writer,
|
||||
reader_thread_mempool,
|
||||
reader_thread_connected,
|
||||
)
|
||||
});
|
||||
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
|
||||
let nonce = thread_rng().gen();
|
||||
let receiver = Address::new(&locked_writer.peer_addr()?, ServiceFlags::NONE);
|
||||
let sender = Address {
|
||||
services: ServiceFlags::NONE,
|
||||
address: [0u16; 8],
|
||||
port: 0,
|
||||
};
|
||||
|
||||
Self::_send(
|
||||
&mut locked_writer,
|
||||
network.magic(),
|
||||
NetworkMessage::Version(VersionMessage::new(
|
||||
ServiceFlags::WITNESS,
|
||||
timestamp,
|
||||
receiver,
|
||||
sender,
|
||||
nonce,
|
||||
"MagicalBitcoinWallet".into(),
|
||||
0,
|
||||
)),
|
||||
)?;
|
||||
let version = if let NetworkMessage::Version(version) =
|
||||
Self::_recv(&responses, "version", None)?.unwrap()
|
||||
{
|
||||
version
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
};
|
||||
|
||||
if let NetworkMessage::Verack = Self::_recv(&responses, "verack", None)?.unwrap() {
|
||||
Self::_send(&mut locked_writer, network.magic(), NetworkMessage::Verack)?;
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
}
|
||||
|
||||
std::mem::drop(locked_writer);
|
||||
|
||||
Ok(Peer {
|
||||
writer,
|
||||
reader_thread,
|
||||
responses,
|
||||
connected,
|
||||
mempool,
|
||||
network,
|
||||
version,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_connection(&self) -> Result<Self, CompactFiltersError> {
|
||||
let socket_addr = self.writer.lock().unwrap().peer_addr()?;
|
||||
Self::new(socket_addr, Arc::clone(&self.mempool), self.network)
|
||||
}
|
||||
|
||||
fn _send(
|
||||
writer: &mut TcpStream,
|
||||
magic: u32,
|
||||
payload: NetworkMessage,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
log::trace!("==> {:?}", payload);
|
||||
|
||||
let raw_message = RawNetworkMessage { magic, payload };
|
||||
|
||||
raw_message
|
||||
.consensus_encode(writer)
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn _recv(
|
||||
responses: &Arc<RwLock<ResponsesMap>>,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
|
||||
let message_resp = {
|
||||
let mut lock = responses.write().unwrap();
|
||||
let message_resp = lock.entry(wait_for).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
|
||||
let mut messages = lock.lock().unwrap();
|
||||
while messages.is_empty() {
|
||||
match timeout {
|
||||
None => messages = cvar.wait(messages).unwrap(),
|
||||
Some(t) => {
|
||||
let result = cvar.wait_timeout(messages, t).unwrap();
|
||||
if result.1.timed_out() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
messages = result.0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(messages.pop())
|
||||
}
|
||||
|
||||
pub fn get_version(&self) -> &VersionMessage {
|
||||
&self.version
|
||||
}
|
||||
|
||||
pub fn get_mempool(&self) -> Arc<Mempool> {
|
||||
Arc::clone(&self.mempool)
|
||||
}
|
||||
|
||||
pub fn is_connected(&self) -> bool {
|
||||
*self.connected.read().unwrap()
|
||||
}
|
||||
|
||||
pub fn reader_thread(
|
||||
network: Network,
|
||||
connection: TcpStream,
|
||||
reader_thread_responses: Arc<RwLock<ResponsesMap>>,
|
||||
reader_thread_writer: Arc<Mutex<TcpStream>>,
|
||||
reader_thread_mempool: Arc<Mempool>,
|
||||
reader_thread_connected: Arc<RwLock<bool>>,
|
||||
) {
|
||||
macro_rules! check_disconnect {
|
||||
($call:expr) => {
|
||||
match $call {
|
||||
Ok(good) => good,
|
||||
Err(e) => {
|
||||
log::debug!("Error {:?}", e);
|
||||
*reader_thread_connected.write().unwrap() = false;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let mut reader = StreamReader::new(connection, None);
|
||||
loop {
|
||||
let raw_message: RawNetworkMessage = check_disconnect!(reader.read_next());
|
||||
|
||||
let in_message = if raw_message.magic != network.magic() {
|
||||
continue;
|
||||
} else {
|
||||
raw_message.payload
|
||||
};
|
||||
|
||||
log::trace!("<== {:?}", in_message);
|
||||
|
||||
match in_message {
|
||||
NetworkMessage::Ping(nonce) => {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
network.magic(),
|
||||
NetworkMessage::Pong(nonce),
|
||||
));
|
||||
|
||||
continue;
|
||||
}
|
||||
NetworkMessage::Alert(_) => continue,
|
||||
NetworkMessage::GetData(ref inv) => {
|
||||
let (found, not_found): (Vec<_>, Vec<_>) = inv
|
||||
.into_iter()
|
||||
.map(|item| (*item, reader_thread_mempool.get_tx(item)))
|
||||
.partition(|(_, d)| d.is_some());
|
||||
for (_, found_tx) in found {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
network.magic(),
|
||||
NetworkMessage::Tx(found_tx.unwrap()),
|
||||
));
|
||||
}
|
||||
|
||||
if !not_found.is_empty() {
|
||||
check_disconnect!(Self::_send(
|
||||
&mut reader_thread_writer.lock().unwrap(),
|
||||
network.magic(),
|
||||
NetworkMessage::NotFound(
|
||||
not_found.into_iter().map(|(i, _)| i).collect(),
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let message_resp = {
|
||||
let mut lock = reader_thread_responses.write().unwrap();
|
||||
let message_resp = lock.entry(in_message.cmd()).or_default();
|
||||
Arc::clone(&message_resp)
|
||||
};
|
||||
|
||||
let (lock, cvar) = &*message_resp;
|
||||
let mut messages = lock.lock().unwrap();
|
||||
messages.push(in_message);
|
||||
cvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
Self::_send(&mut writer, self.network.magic(), payload)
|
||||
}
|
||||
|
||||
pub fn recv(
|
||||
&self,
|
||||
wait_for: &'static str,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Option<NetworkMessage>, CompactFiltersError> {
|
||||
Self::_recv(&self.responses, wait_for, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait CompactFiltersPeer {
|
||||
fn get_cf_checkpt(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError>;
|
||||
fn get_cf_headers(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError>;
|
||||
fn get_cf_filters(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError>;
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError>;
|
||||
}
|
||||
|
||||
impl CompactFiltersPeer for Peer {
|
||||
fn get_cf_checkpt(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFCheckpt, CompactFiltersError> {
|
||||
self.send(NetworkMessage::GetCFCheckpt(GetCFCheckpt {
|
||||
filter_type,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfcheckpt", Some(Duration::from_secs(10)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFCheckpt(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_cf_headers(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<CFHeaders, CompactFiltersError> {
|
||||
self.send(NetworkMessage::GetCFHeaders(GetCFHeaders {
|
||||
filter_type,
|
||||
start_height,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
let response = self
|
||||
.recv("cfheaders", Some(Duration::from_secs(10)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFHeaders(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
if response.filter_type != filter_type {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn pop_cf_filter_resp(&self) -> Result<CFilter, CompactFiltersError> {
|
||||
let response = self
|
||||
.recv("cfilter", Some(Duration::from_secs(10)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let response = match response {
|
||||
NetworkMessage::CFilter(response) => response,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_cf_filters(
|
||||
&self,
|
||||
filter_type: u8,
|
||||
start_height: u32,
|
||||
stop_hash: BlockHash,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
self.send(NetworkMessage::GetCFilters(GetCFilters {
|
||||
filter_type,
|
||||
start_height,
|
||||
stop_hash,
|
||||
}))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait InvPeer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError>;
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError>;
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError>;
|
||||
}
|
||||
|
||||
impl InvPeer for Peer {
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Option<Block>, CompactFiltersError> {
|
||||
self.send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(
|
||||
block_hash,
|
||||
)]))?;
|
||||
|
||||
match self.recv("block", Some(Duration::from_secs(10)))? {
|
||||
None => Ok(None),
|
||||
Some(NetworkMessage::Block(response)) => Ok(Some(response)),
|
||||
_ => Err(CompactFiltersError::InvalidResponse),
|
||||
}
|
||||
}
|
||||
|
||||
fn ask_for_mempool(&self) -> Result<(), CompactFiltersError> {
|
||||
self.send(NetworkMessage::MemPool)?;
|
||||
let inv = match self.recv("inv", Some(Duration::from_secs(5)))? {
|
||||
None => return Ok(()), // empty mempool
|
||||
Some(NetworkMessage::Inv(inv)) => inv,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
let getdata = inv
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|item| match item {
|
||||
Inventory::Transaction(txid) if !self.mempool.has_tx(txid) => true,
|
||||
_ => false,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let num_txs = getdata.len();
|
||||
self.send(NetworkMessage::GetData(getdata))?;
|
||||
|
||||
for _ in 0..num_txs {
|
||||
let tx = self
|
||||
.recv("tx", Some(Duration::from_secs(10)))?
|
||||
.ok_or(CompactFiltersError::Timeout)?;
|
||||
let tx = match tx {
|
||||
NetworkMessage::Tx(tx) => tx,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
self.mempool.add_tx(tx);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn broadcast_tx(&self, tx: Transaction) -> Result<(), CompactFiltersError> {
|
||||
self.mempool.add_tx(tx.clone());
|
||||
self.send(NetworkMessage::Tx(tx))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
871
src/blockchain/compact_filters/store.rs
Normal file
871
src/blockchain/compact_filters/store.rs
Normal file
@ -0,0 +1,871 @@
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Read, Write};
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use rocksdb::{Direction, IteratorMode, ReadOptions, WriteBatch, DB};
|
||||
|
||||
use bitcoin::consensus::{deserialize, encode::VarInt, serialize, Decodable, Encodable};
|
||||
use bitcoin::hash_types::FilterHash;
|
||||
use bitcoin::hashes::hex::FromHex;
|
||||
use bitcoin::hashes::{sha256d, Hash};
|
||||
use bitcoin::util::bip158::BlockFilter;
|
||||
use bitcoin::util::hash::BitcoinHash;
|
||||
use bitcoin::util::uint::Uint256;
|
||||
use bitcoin::Block;
|
||||
use bitcoin::BlockHash;
|
||||
use bitcoin::BlockHeader;
|
||||
use bitcoin::Network;
|
||||
|
||||
use super::CompactFiltersError;
|
||||
|
||||
lazy_static! {
|
||||
static ref MAINNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4A29AB5F49FFFF001D1DAC2B7C0101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
|
||||
static ref TESTNET_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF001D1AA4AE180101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
|
||||
static ref REGTEST_GENESIS: Block = deserialize(&Vec::<u8>::from_hex("0100000000000000000000000000000000000000000000000000000000000000000000003BA3EDFD7A7B12B27AC72C3E67768F617FC81BC3888A51323A9FB8AA4B1E5E4ADAE5494DFFFF7F20020000000101000000010000000000000000000000000000000000000000000000000000000000000000FFFFFFFF4D04FFFF001D0104455468652054696D65732030332F4A616E2F32303039204368616E63656C6C6F72206F6E206272696E6B206F66207365636F6E64206261696C6F757420666F722062616E6B73FFFFFFFF0100F2052A01000000434104678AFDB0FE5548271967F1A67130B7105CD6A828E03909A67962E0EA1F61DEB649F6BC3F4CEF38C4F35504E51EC112DE5C384DF7BA0B8D578A4C702B6BF11D5FAC00000000").unwrap()).unwrap();
|
||||
}
|
||||
|
||||
pub trait StoreType: Default {}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Full;
|
||||
impl StoreType for Full {}
|
||||
#[derive(Default)]
|
||||
pub struct Snapshot;
|
||||
impl StoreType for Snapshot {}
|
||||
|
||||
pub enum StoreEntry {
|
||||
BlockHeader(Option<usize>),
|
||||
Block(Option<usize>),
|
||||
BlockHeaderIndex(Option<BlockHash>),
|
||||
CFilterTable((u8, Option<usize>)),
|
||||
}
|
||||
|
||||
impl StoreEntry {
|
||||
pub fn get_prefix(&self) -> Vec<u8> {
|
||||
match self {
|
||||
StoreEntry::BlockHeader(_) => b"z",
|
||||
StoreEntry::Block(_) => b"x",
|
||||
StoreEntry::BlockHeaderIndex(_) => b"i",
|
||||
StoreEntry::CFilterTable(_) => b"t",
|
||||
}
|
||||
.to_vec()
|
||||
}
|
||||
|
||||
pub fn get_key(&self) -> Vec<u8> {
|
||||
let mut prefix = self.get_prefix();
|
||||
match self {
|
||||
StoreEntry::BlockHeader(Some(height)) => {
|
||||
prefix.extend_from_slice(&height.to_be_bytes())
|
||||
}
|
||||
StoreEntry::Block(Some(height)) => prefix.extend_from_slice(&height.to_be_bytes()),
|
||||
StoreEntry::BlockHeaderIndex(Some(hash)) => {
|
||||
prefix.extend_from_slice(&hash.into_inner())
|
||||
}
|
||||
StoreEntry::CFilterTable((filter_type, bundle_index)) => {
|
||||
prefix.push(*filter_type);
|
||||
if let Some(bundle_index) = bundle_index {
|
||||
prefix.extend_from_slice(&bundle_index.to_be_bytes());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
prefix
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SerializeDb: Sized {
|
||||
fn serialize(&self) -> Vec<u8>;
|
||||
fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError>;
|
||||
}
|
||||
|
||||
impl<T> SerializeDb for T
|
||||
where
|
||||
T: Encodable + Decodable,
|
||||
{
|
||||
fn serialize(&self) -> Vec<u8> {
|
||||
serialize(self)
|
||||
}
|
||||
|
||||
fn deserialize(data: &[u8]) -> Result<Self, CompactFiltersError> {
|
||||
Ok(deserialize(data).map_err(|_| CompactFiltersError::DataCorruption)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for FilterHeader {
|
||||
fn consensus_encode<W: Write>(
|
||||
&self,
|
||||
mut e: W,
|
||||
) -> Result<usize, bitcoin::consensus::encode::Error> {
|
||||
let mut written = self.prev_header_hash.consensus_encode(&mut e)?;
|
||||
written += self.filter_hash.consensus_encode(&mut e)?;
|
||||
Ok(written)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for FilterHeader {
|
||||
fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
|
||||
let prev_header_hash = FilterHeaderHash::consensus_decode(&mut d)?;
|
||||
let filter_hash = FilterHash::consensus_decode(&mut d)?;
|
||||
|
||||
Ok(FilterHeader {
|
||||
prev_header_hash,
|
||||
filter_hash,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for BundleStatus {
|
||||
fn consensus_encode<W: Write>(
|
||||
&self,
|
||||
mut e: W,
|
||||
) -> Result<usize, bitcoin::consensus::encode::Error> {
|
||||
let mut written = 0;
|
||||
|
||||
match self {
|
||||
BundleStatus::Init => {
|
||||
written += 0x00u8.consensus_encode(&mut e)?;
|
||||
}
|
||||
BundleStatus::CFHeaders { cf_headers } => {
|
||||
written += 0x01u8.consensus_encode(&mut e)?;
|
||||
written += VarInt(cf_headers.len() as u64).consensus_encode(&mut e)?;
|
||||
for header in cf_headers {
|
||||
written += header.consensus_encode(&mut e)?;
|
||||
}
|
||||
}
|
||||
BundleStatus::CFilters { cf_filters } => {
|
||||
written += 0x02u8.consensus_encode(&mut e)?;
|
||||
written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
|
||||
for filter in cf_filters {
|
||||
written += filter.consensus_encode(&mut e)?;
|
||||
}
|
||||
}
|
||||
BundleStatus::Processed { cf_filters } => {
|
||||
written += 0x03u8.consensus_encode(&mut e)?;
|
||||
written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
|
||||
for filter in cf_filters {
|
||||
written += filter.consensus_encode(&mut e)?;
|
||||
}
|
||||
}
|
||||
BundleStatus::Pruned => {
|
||||
written += 0x04u8.consensus_encode(&mut e)?;
|
||||
}
|
||||
BundleStatus::Tip { cf_filters } => {
|
||||
written += 0x05u8.consensus_encode(&mut e)?;
|
||||
written += VarInt(cf_filters.len() as u64).consensus_encode(&mut e)?;
|
||||
for filter in cf_filters {
|
||||
written += filter.consensus_encode(&mut e)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(written)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for BundleStatus {
|
||||
fn consensus_decode<D: Read>(mut d: D) -> Result<Self, bitcoin::consensus::encode::Error> {
|
||||
let byte_type = u8::consensus_decode(&mut d)?;
|
||||
match byte_type {
|
||||
0x00 => Ok(BundleStatus::Init),
|
||||
0x01 => {
|
||||
let num = VarInt::consensus_decode(&mut d)?;
|
||||
let num = num.0 as usize;
|
||||
|
||||
let mut cf_headers = Vec::with_capacity(num);
|
||||
for _ in 0..num {
|
||||
cf_headers.push(FilterHeader::consensus_decode(&mut d)?);
|
||||
}
|
||||
|
||||
Ok(BundleStatus::CFHeaders { cf_headers })
|
||||
}
|
||||
0x02 => {
|
||||
let num = VarInt::consensus_decode(&mut d)?;
|
||||
let num = num.0 as usize;
|
||||
|
||||
let mut cf_filters = Vec::with_capacity(num);
|
||||
for _ in 0..num {
|
||||
cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
|
||||
}
|
||||
|
||||
Ok(BundleStatus::CFilters { cf_filters })
|
||||
}
|
||||
0x03 => {
|
||||
let num = VarInt::consensus_decode(&mut d)?;
|
||||
let num = num.0 as usize;
|
||||
|
||||
let mut cf_filters = Vec::with_capacity(num);
|
||||
for _ in 0..num {
|
||||
cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
|
||||
}
|
||||
|
||||
Ok(BundleStatus::Processed { cf_filters })
|
||||
}
|
||||
0x04 => Ok(BundleStatus::Pruned),
|
||||
0x05 => {
|
||||
let num = VarInt::consensus_decode(&mut d)?;
|
||||
let num = num.0 as usize;
|
||||
|
||||
let mut cf_filters = Vec::with_capacity(num);
|
||||
for _ in 0..num {
|
||||
cf_filters.push(Vec::<u8>::consensus_decode(&mut d)?);
|
||||
}
|
||||
|
||||
Ok(BundleStatus::Tip { cf_filters })
|
||||
}
|
||||
_ => Err(bitcoin::consensus::encode::Error::ParseFailed(
|
||||
"Invalid byte type",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HeadersStore<T: StoreType> {
|
||||
store: Arc<RwLock<DB>>,
|
||||
cf_name: String,
|
||||
min_height: usize,
|
||||
network: Network,
|
||||
phantom: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl HeadersStore<Full> {
|
||||
pub fn new(store: DB, network: Network) -> Result<Self, CompactFiltersError> {
|
||||
let genesis = match network {
|
||||
Network::Bitcoin => MAINNET_GENESIS.deref(),
|
||||
Network::Testnet => TESTNET_GENESIS.deref(),
|
||||
Network::Regtest => REGTEST_GENESIS.deref(),
|
||||
};
|
||||
|
||||
let cf_name = "default".to_string();
|
||||
let cf_handle = store.cf_handle(&cf_name).unwrap();
|
||||
|
||||
let genesis_key = StoreEntry::BlockHeader(Some(0)).get_key();
|
||||
|
||||
if store.get_pinned_cf(cf_handle, &genesis_key)?.is_none() {
|
||||
let mut batch = WriteBatch::default();
|
||||
batch.put_cf(
|
||||
cf_handle,
|
||||
genesis_key,
|
||||
(genesis.header, genesis.header.work()).serialize(),
|
||||
);
|
||||
batch.put_cf(
|
||||
cf_handle,
|
||||
StoreEntry::BlockHeaderIndex(Some(genesis.bitcoin_hash())).get_key(),
|
||||
&0usize.to_be_bytes(),
|
||||
);
|
||||
store.write(batch)?;
|
||||
}
|
||||
|
||||
Ok(HeadersStore {
|
||||
store: Arc::new(RwLock::new(store)),
|
||||
cf_name,
|
||||
min_height: 0,
|
||||
network,
|
||||
phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_locators(&self) -> Result<Vec<(BlockHash, usize)>, CompactFiltersError> {
|
||||
let mut step = 1;
|
||||
let mut index = self.get_height()?;
|
||||
let mut answer = Vec::new();
|
||||
|
||||
let store_read = self.store.read().unwrap();
|
||||
let cf_handle = store_read.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
loop {
|
||||
if answer.len() > 10 {
|
||||
step *= 2;
|
||||
}
|
||||
|
||||
let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(
|
||||
&store_read
|
||||
.get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(index)).get_key())?
|
||||
.unwrap(),
|
||||
)?;
|
||||
answer.push((header.bitcoin_hash(), index));
|
||||
|
||||
if let Some(new_index) = index.checked_sub(step) {
|
||||
index = new_index;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(answer)
|
||||
}
|
||||
|
||||
pub fn start_snapshot(
|
||||
&self,
|
||||
from: usize,
|
||||
) -> Result<HeadersStore<Snapshot>, CompactFiltersError> {
|
||||
let new_cf_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
|
||||
let new_cf_name = format!("_headers:{}", new_cf_name);
|
||||
|
||||
let mut write_store = self.store.write().unwrap();
|
||||
|
||||
write_store.create_cf(&new_cf_name, &Default::default())?;
|
||||
|
||||
let cf_handle = write_store.cf_handle(&self.cf_name).unwrap();
|
||||
let new_cf_handle = write_store.cf_handle(&new_cf_name).unwrap();
|
||||
|
||||
let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize(
|
||||
&write_store
|
||||
.get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
|
||||
.ok_or(CompactFiltersError::DataCorruption)?,
|
||||
)?;
|
||||
|
||||
let mut batch = WriteBatch::default();
|
||||
batch.put_cf(
|
||||
new_cf_handle,
|
||||
StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
|
||||
&from.to_be_bytes(),
|
||||
);
|
||||
batch.put_cf(
|
||||
new_cf_handle,
|
||||
StoreEntry::BlockHeader(Some(from)).get_key(),
|
||||
(header, work).serialize(),
|
||||
);
|
||||
write_store.write(batch)?;
|
||||
|
||||
let store = Arc::clone(&self.store);
|
||||
Ok(HeadersStore {
|
||||
store,
|
||||
cf_name: new_cf_name,
|
||||
min_height: from,
|
||||
network: self.network,
|
||||
phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn recover_snapshot(&self, cf_name: &str) -> Result<(), CompactFiltersError> {
|
||||
let mut write_store = self.store.write().unwrap();
|
||||
let snapshot_cf_handle = write_store.cf_handle(cf_name).unwrap();
|
||||
|
||||
let prefix = StoreEntry::BlockHeader(None).get_key();
|
||||
let mut iterator = write_store.prefix_iterator_cf(snapshot_cf_handle, prefix);
|
||||
|
||||
let min_height = match iterator
|
||||
.next()
|
||||
.and_then(|(k, _)| k[1..].try_into().ok())
|
||||
.map(|bytes| usize::from_be_bytes(bytes))
|
||||
{
|
||||
None => {
|
||||
std::mem::drop(iterator);
|
||||
write_store.drop_cf(cf_name).ok();
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
Some(x) => x,
|
||||
};
|
||||
std::mem::drop(iterator);
|
||||
std::mem::drop(write_store);
|
||||
|
||||
let snapshot = HeadersStore {
|
||||
store: Arc::clone(&self.store),
|
||||
cf_name: cf_name.into(),
|
||||
min_height,
|
||||
network: self.network,
|
||||
phantom: PhantomData,
|
||||
};
|
||||
if snapshot.work()? > self.work()? {
|
||||
self.apply_snapshot(snapshot)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn apply_snapshot(
|
||||
&self,
|
||||
snaphost: HeadersStore<Snapshot>,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
let mut batch = WriteBatch::default();
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
let snapshot_cf_handle = read_store.cf_handle(&snaphost.cf_name).unwrap();
|
||||
|
||||
let from_key = StoreEntry::BlockHeader(Some(snaphost.min_height)).get_key();
|
||||
let to_key = StoreEntry::BlockHeader(Some(usize::MAX)).get_key();
|
||||
|
||||
let mut opts = ReadOptions::default();
|
||||
opts.set_iterate_upper_bound(to_key.clone());
|
||||
|
||||
log::debug!("Removing items");
|
||||
batch.delete_range_cf(cf_handle, &from_key, &to_key);
|
||||
for (_, v) in read_store.iterator_cf_opt(
|
||||
cf_handle,
|
||||
opts,
|
||||
IteratorMode::From(&from_key, Direction::Forward),
|
||||
) {
|
||||
let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
|
||||
|
||||
batch.delete_cf(
|
||||
cf_handle,
|
||||
StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
|
||||
);
|
||||
}
|
||||
|
||||
// Delete full blocks overriden by snapshot
|
||||
let from_key = StoreEntry::Block(Some(snaphost.min_height)).get_key();
|
||||
let to_key = StoreEntry::Block(Some(usize::MAX)).get_key();
|
||||
batch.delete_range(&from_key, &to_key);
|
||||
|
||||
log::debug!("Copying over new items");
|
||||
for (k, v) in read_store.iterator_cf(snapshot_cf_handle, IteratorMode::Start) {
|
||||
batch.put_cf(cf_handle, k, v);
|
||||
}
|
||||
|
||||
read_store.write(batch)?;
|
||||
|
||||
std::mem::drop(snapshot_cf_handle);
|
||||
std::mem::drop(cf_handle);
|
||||
std::mem::drop(read_store);
|
||||
|
||||
self.store.write().unwrap().drop_cf(&snaphost.cf_name)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_height_for(
|
||||
&self,
|
||||
block_hash: &BlockHash,
|
||||
) -> Result<Option<usize>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let key = StoreEntry::BlockHeaderIndex(Some(block_hash.clone())).get_key();
|
||||
let data = read_store.get_pinned_cf(cf_handle, key)?;
|
||||
Ok(data
|
||||
.map(|data| {
|
||||
Ok::<_, CompactFiltersError>(usize::from_be_bytes(
|
||||
data.as_ref()
|
||||
.try_into()
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?,
|
||||
))
|
||||
})
|
||||
.transpose()?)
|
||||
}
|
||||
|
||||
pub fn get_block_hash(&self, height: usize) -> Result<Option<BlockHash>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let key = StoreEntry::BlockHeader(Some(height)).get_key();
|
||||
let data = read_store.get_pinned_cf(cf_handle, key)?;
|
||||
Ok(data
|
||||
.map(|data| {
|
||||
let (header, _): (BlockHeader, Uint256) =
|
||||
deserialize(&data).map_err(|_| CompactFiltersError::DataCorruption)?;
|
||||
Ok::<_, CompactFiltersError>(header.bitcoin_hash())
|
||||
})
|
||||
.transpose()?)
|
||||
}
|
||||
|
||||
pub fn save_full_block(&self, block: &Block, height: usize) -> Result<(), CompactFiltersError> {
|
||||
let key = StoreEntry::Block(Some(height)).get_key();
|
||||
self.store.read().unwrap().put(key, block.serialize())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_full_block(&self, height: usize) -> Result<Option<Block>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
|
||||
let key = StoreEntry::Block(Some(height)).get_key();
|
||||
let opt_block = read_store.get_pinned(key)?;
|
||||
|
||||
Ok(opt_block
|
||||
.map(|data| deserialize(&data))
|
||||
.transpose()
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?)
|
||||
}
|
||||
|
||||
pub fn delete_blocks_until(&self, height: usize) -> Result<(), CompactFiltersError> {
|
||||
let from_key = StoreEntry::Block(Some(0)).get_key();
|
||||
let to_key = StoreEntry::Block(Some(height)).get_key();
|
||||
|
||||
let mut batch = WriteBatch::default();
|
||||
batch.delete_range(&from_key, &to_key);
|
||||
|
||||
self.store.read().unwrap().write(batch)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn iter_full_blocks(&self) -> Result<Vec<(usize, Block)>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
|
||||
let prefix = StoreEntry::Block(None).get_key();
|
||||
|
||||
let iterator = read_store.prefix_iterator(&prefix);
|
||||
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
|
||||
// have the right prefix
|
||||
iterator
|
||||
.filter(|(k, _)| k.starts_with(&prefix))
|
||||
.map(|(k, v)| {
|
||||
let height: usize = usize::from_be_bytes(
|
||||
k[1..]
|
||||
.try_into()
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?,
|
||||
);
|
||||
let block = SerializeDb::deserialize(&v)?;
|
||||
|
||||
Ok((height, block))
|
||||
})
|
||||
.collect::<Result<_, _>>()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: StoreType> HeadersStore<T> {
|
||||
pub fn work(&self) -> Result<Uint256, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let prefix = StoreEntry::BlockHeader(None).get_key();
|
||||
let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
|
||||
|
||||
Ok(iterator
|
||||
.last()
|
||||
.map(|(_, v)| -> Result<_, CompactFiltersError> {
|
||||
let (_, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
|
||||
|
||||
Ok(work)
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
pub fn get_height(&self) -> Result<usize, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let prefix = StoreEntry::BlockHeader(None).get_key();
|
||||
let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
|
||||
|
||||
Ok(iterator
|
||||
.last()
|
||||
.map(|(k, _)| -> Result<_, CompactFiltersError> {
|
||||
let height = usize::from_be_bytes(
|
||||
k[1..]
|
||||
.try_into()
|
||||
.map_err(|_| CompactFiltersError::DataCorruption)?,
|
||||
);
|
||||
|
||||
Ok(height)
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
pub fn get_tip_hash(&self) -> Result<Option<BlockHash>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let prefix = StoreEntry::BlockHeader(None).get_key();
|
||||
let iterator = read_store.prefix_iterator_cf(cf_handle, prefix);
|
||||
|
||||
Ok(iterator
|
||||
.last()
|
||||
.map(|(_, v)| -> Result<_, CompactFiltersError> {
|
||||
let (header, _): (BlockHeader, Uint256) = SerializeDb::deserialize(&v)?;
|
||||
|
||||
Ok(header.bitcoin_hash())
|
||||
})
|
||||
.transpose()?)
|
||||
}
|
||||
|
||||
pub fn apply(
|
||||
&mut self,
|
||||
from: usize,
|
||||
headers: Vec<BlockHeader>,
|
||||
) -> Result<BlockHash, CompactFiltersError> {
|
||||
let mut batch = WriteBatch::default();
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
let cf_handle = read_store.cf_handle(&self.cf_name).unwrap();
|
||||
|
||||
let (mut last_hash, mut accumulated_work) = read_store
|
||||
.get_pinned_cf(cf_handle, StoreEntry::BlockHeader(Some(from)).get_key())?
|
||||
.map(|result| {
|
||||
let (header, work): (BlockHeader, Uint256) = SerializeDb::deserialize(&result)?;
|
||||
Ok::<_, CompactFiltersError>((header.bitcoin_hash(), work))
|
||||
})
|
||||
.transpose()?
|
||||
.ok_or(CompactFiltersError::DataCorruption)?;
|
||||
|
||||
for (index, header) in headers.into_iter().enumerate() {
|
||||
if header.prev_blockhash != last_hash {
|
||||
return Err(CompactFiltersError::InvalidHeaders);
|
||||
}
|
||||
|
||||
last_hash = header.bitcoin_hash();
|
||||
accumulated_work = accumulated_work + header.work();
|
||||
|
||||
let height = from + index + 1;
|
||||
batch.put_cf(
|
||||
cf_handle,
|
||||
StoreEntry::BlockHeaderIndex(Some(header.bitcoin_hash())).get_key(),
|
||||
&(height).to_be_bytes(),
|
||||
);
|
||||
batch.put_cf(
|
||||
cf_handle,
|
||||
StoreEntry::BlockHeader(Some(height)).get_key(),
|
||||
(header, accumulated_work).serialize(),
|
||||
);
|
||||
}
|
||||
|
||||
std::mem::drop(cf_handle);
|
||||
std::mem::drop(read_store);
|
||||
|
||||
self.store.write().unwrap().write(batch)?;
|
||||
Ok(last_hash)
|
||||
}
|
||||
}
|
||||
|
||||
pub type FilterHeaderHash = FilterHash;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FilterHeader {
|
||||
prev_header_hash: FilterHeaderHash,
|
||||
filter_hash: FilterHash,
|
||||
}
|
||||
|
||||
impl BitcoinHash<FilterHeaderHash> for FilterHeader {
|
||||
fn bitcoin_hash(&self) -> FilterHeaderHash {
|
||||
let mut hash_data = self.filter_hash.into_inner().to_vec();
|
||||
hash_data.extend_from_slice(&self.prev_header_hash);
|
||||
sha256d::Hash::hash(&hash_data).into()
|
||||
}
|
||||
}
|
||||
|
||||
pub enum BundleStatus {
|
||||
Init,
|
||||
CFHeaders { cf_headers: Vec<FilterHeader> },
|
||||
CFilters { cf_filters: Vec<Vec<u8>> },
|
||||
Processed { cf_filters: Vec<Vec<u8>> },
|
||||
Tip { cf_filters: Vec<Vec<u8>> },
|
||||
Pruned,
|
||||
}
|
||||
|
||||
pub struct CFStore {
|
||||
store: Arc<RwLock<DB>>,
|
||||
filter_type: u8,
|
||||
}
|
||||
|
||||
type BundleEntry = (BundleStatus, FilterHeaderHash);
|
||||
|
||||
impl CFStore {
|
||||
pub fn new(
|
||||
headers_store: &HeadersStore<Full>,
|
||||
filter_type: u8,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
let cf_store = CFStore {
|
||||
store: Arc::clone(&headers_store.store),
|
||||
filter_type,
|
||||
};
|
||||
|
||||
let genesis = match headers_store.network {
|
||||
Network::Bitcoin => MAINNET_GENESIS.deref(),
|
||||
Network::Testnet => TESTNET_GENESIS.deref(),
|
||||
Network::Regtest => REGTEST_GENESIS.deref(),
|
||||
};
|
||||
|
||||
let filter = BlockFilter::new_script_filter(genesis, |utxo| {
|
||||
Err(bitcoin::util::bip158::Error::UtxoMissing(*utxo))
|
||||
})?;
|
||||
let first_key = StoreEntry::CFilterTable((filter_type, Some(0))).get_key();
|
||||
|
||||
// Add the genesis' filter
|
||||
{
|
||||
let read_store = cf_store.store.read().unwrap();
|
||||
if read_store.get_pinned(&first_key)?.is_none() {
|
||||
read_store.put(
|
||||
&first_key,
|
||||
(BundleStatus::Init, filter.filter_id(&FilterHash::default())).serialize(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cf_store)
|
||||
}
|
||||
|
||||
pub fn get_filter_type(&self) -> u8 {
|
||||
self.filter_type
|
||||
}
|
||||
|
||||
pub fn get_bundles(&self) -> Result<Vec<BundleEntry>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
|
||||
let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
|
||||
let iterator = read_store.prefix_iterator(&prefix);
|
||||
|
||||
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
|
||||
// have the right prefix
|
||||
iterator
|
||||
.filter(|(k, _)| k.starts_with(&prefix))
|
||||
.map(|(_, data)| BundleEntry::deserialize(&data))
|
||||
.collect::<Result<_, _>>()
|
||||
}
|
||||
|
||||
pub fn get_checkpoints(&self) -> Result<Vec<FilterHash>, CompactFiltersError> {
|
||||
let read_store = self.store.read().unwrap();
|
||||
|
||||
let prefix = StoreEntry::CFilterTable((self.filter_type, None)).get_key();
|
||||
let iterator = read_store.prefix_iterator(&prefix);
|
||||
|
||||
// FIXME: we have to filter manually because rocksdb sometimes returns stuff that doesn't
|
||||
// have the right prefix
|
||||
Ok(iterator
|
||||
.filter(|(k, _)| k.starts_with(&prefix))
|
||||
.skip(1)
|
||||
.map(|(_, data)| Ok::<_, CompactFiltersError>(BundleEntry::deserialize(&data)?.1))
|
||||
.collect::<Result<_, _>>()?)
|
||||
}
|
||||
|
||||
pub fn replace_checkpoints(
|
||||
&self,
|
||||
checkpoints: Vec<FilterHash>,
|
||||
) -> Result<(), CompactFiltersError> {
|
||||
let current_checkpoints = self.get_checkpoints()?;
|
||||
|
||||
let mut equal_bundles = 0;
|
||||
for (index, (our, their)) in current_checkpoints
|
||||
.iter()
|
||||
.zip(checkpoints.iter())
|
||||
.enumerate()
|
||||
{
|
||||
equal_bundles = index;
|
||||
|
||||
if our != their {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
let mut batch = WriteBatch::default();
|
||||
|
||||
for (index, filter_hash) in checkpoints.iter().enumerate().skip(equal_bundles) {
|
||||
let key = StoreEntry::CFilterTable((self.filter_type, Some(index + 1))).get_key(); // +1 to skip the genesis' filter
|
||||
|
||||
if let Some((BundleStatus::Tip { .. }, _)) = read_store
|
||||
.get_pinned(&key)?
|
||||
.map(|data| BundleEntry::deserialize(&data))
|
||||
.transpose()?
|
||||
{
|
||||
println!("Keeping bundle #{} as Tip", index);
|
||||
} else {
|
||||
batch.put(&key, (BundleStatus::Init, *filter_hash).serialize());
|
||||
}
|
||||
}
|
||||
|
||||
read_store.write(batch)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn advance_to_cf_headers(
|
||||
&self,
|
||||
bundle: usize,
|
||||
checkpoint_hash: FilterHeaderHash,
|
||||
filter_headers: Vec<FilterHash>,
|
||||
) -> Result<BundleStatus, CompactFiltersError> {
|
||||
let mut last_hash = checkpoint_hash;
|
||||
let cf_headers = filter_headers
|
||||
.into_iter()
|
||||
.map(|filter_hash| {
|
||||
let filter_header = FilterHeader {
|
||||
prev_header_hash: last_hash,
|
||||
filter_hash,
|
||||
};
|
||||
last_hash = filter_header.bitcoin_hash();
|
||||
|
||||
filter_header
|
||||
})
|
||||
.collect();
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
|
||||
let next_key = StoreEntry::CFilterTable((self.filter_type, Some(bundle + 1))).get_key(); // +1 to skip the genesis' filter
|
||||
if let Some((_, next_checkpoint)) = read_store
|
||||
.get_pinned(&next_key)?
|
||||
.map(|data| BundleEntry::deserialize(&data))
|
||||
.transpose()?
|
||||
{
|
||||
// check connection with the next bundle if present
|
||||
if last_hash != next_checkpoint {
|
||||
return Err(CompactFiltersError::InvalidFilterHeader);
|
||||
}
|
||||
}
|
||||
|
||||
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
|
||||
let value = (BundleStatus::CFHeaders { cf_headers }, checkpoint_hash);
|
||||
|
||||
read_store.put(key, value.serialize())?;
|
||||
|
||||
Ok(value.0)
|
||||
}
|
||||
|
||||
pub fn advance_to_cf_filters(
|
||||
&self,
|
||||
bundle: usize,
|
||||
checkpoint_hash: FilterHeaderHash,
|
||||
headers: Vec<FilterHeader>,
|
||||
filters: Vec<(usize, Vec<u8>)>,
|
||||
) -> Result<BundleStatus, CompactFiltersError> {
|
||||
let cf_filters = filters
|
||||
.into_iter()
|
||||
.zip(headers.iter())
|
||||
.map(|((_, filter_content), header)| {
|
||||
if header.filter_hash != sha256d::Hash::hash(&filter_content).into() {
|
||||
return Err(CompactFiltersError::InvalidFilter);
|
||||
}
|
||||
|
||||
Ok::<_, CompactFiltersError>(filter_content)
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
|
||||
let value = (BundleStatus::CFilters { cf_filters }, checkpoint_hash);
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
read_store.put(key, value.serialize())?;
|
||||
|
||||
Ok(value.0)
|
||||
}
|
||||
|
||||
pub fn prune_filters(
|
||||
&self,
|
||||
bundle: usize,
|
||||
checkpoint_hash: FilterHeaderHash,
|
||||
) -> Result<BundleStatus, CompactFiltersError> {
|
||||
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
|
||||
let value = (BundleStatus::Pruned, checkpoint_hash);
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
read_store.put(key, value.serialize())?;
|
||||
|
||||
Ok(value.0)
|
||||
}
|
||||
|
||||
pub fn mark_as_tip(
|
||||
&self,
|
||||
bundle: usize,
|
||||
cf_filters: Vec<Vec<u8>>,
|
||||
checkpoint_hash: FilterHeaderHash,
|
||||
) -> Result<BundleStatus, CompactFiltersError> {
|
||||
let key = StoreEntry::CFilterTable((self.filter_type, Some(bundle))).get_key();
|
||||
let value = (BundleStatus::Tip { cf_filters }, checkpoint_hash);
|
||||
|
||||
let read_store = self.store.read().unwrap();
|
||||
read_store.put(key, value.serialize())?;
|
||||
|
||||
Ok(value.0)
|
||||
}
|
||||
}
|
282
src/blockchain/compact_filters/sync.rs
Normal file
282
src/blockchain/compact_filters/sync.rs
Normal file
@ -0,0 +1,282 @@
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use bitcoin::hash_types::{BlockHash, FilterHash};
|
||||
use bitcoin::network::message::NetworkMessage;
|
||||
use bitcoin::network::message_blockdata::GetHeadersMessage;
|
||||
use bitcoin::util::bip158::BlockFilter;
|
||||
|
||||
use super::peer::*;
|
||||
use super::store::*;
|
||||
use super::CompactFiltersError;
|
||||
|
||||
pub(crate) const BURIED_CONFIRMATIONS: usize = 100;
|
||||
|
||||
pub struct CFSync {
|
||||
headers_store: Arc<HeadersStore<Full>>,
|
||||
cf_store: Arc<CFStore>,
|
||||
skip_blocks: usize,
|
||||
bundles: Mutex<VecDeque<(BundleStatus, FilterHash, usize)>>,
|
||||
}
|
||||
|
||||
impl CFSync {
|
||||
pub fn new(
|
||||
headers_store: Arc<HeadersStore<Full>>,
|
||||
skip_blocks: usize,
|
||||
filter_type: u8,
|
||||
) -> Result<Self, CompactFiltersError> {
|
||||
let cf_store = Arc::new(CFStore::new(&headers_store, filter_type)?);
|
||||
|
||||
Ok(CFSync {
|
||||
headers_store,
|
||||
cf_store,
|
||||
skip_blocks,
|
||||
bundles: Mutex::new(VecDeque::new()),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn pruned_bundles(&self) -> Result<usize, CompactFiltersError> {
|
||||
Ok(self
|
||||
.cf_store
|
||||
.get_bundles()?
|
||||
.into_iter()
|
||||
.skip(self.skip_blocks / 1000)
|
||||
.fold(0, |acc, (status, _)| match status {
|
||||
BundleStatus::Pruned => acc + 1,
|
||||
_ => acc,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn prepare_sync(&self, peer: Arc<Peer>) -> Result<(), CompactFiltersError> {
|
||||
let mut bundles_lock = self.bundles.lock().unwrap();
|
||||
|
||||
let resp = peer.get_cf_checkpt(
|
||||
self.cf_store.get_filter_type(),
|
||||
self.headers_store.get_tip_hash()?.unwrap(),
|
||||
)?;
|
||||
self.cf_store.replace_checkpoints(resp.filter_headers)?;
|
||||
|
||||
bundles_lock.clear();
|
||||
for (index, (status, checkpoint)) in self.cf_store.get_bundles()?.into_iter().enumerate() {
|
||||
bundles_lock.push_back((status, checkpoint, index));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn capture_thread_for_sync<F, Q>(
|
||||
&self,
|
||||
peer: Arc<Peer>,
|
||||
process: F,
|
||||
completed_bundle: Q,
|
||||
) -> Result<(), CompactFiltersError>
|
||||
where
|
||||
F: Fn(&BlockHash, &BlockFilter) -> Result<bool, CompactFiltersError>,
|
||||
Q: Fn(usize) -> Result<(), crate::error::Error>,
|
||||
{
|
||||
let current_height = self.headers_store.get_height()?; // TODO: we should update it in case headers_store is also updated
|
||||
|
||||
loop {
|
||||
let (mut status, checkpoint, index) = match self.bundles.lock().unwrap().pop_front() {
|
||||
None => break,
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
log::debug!(
|
||||
"Processing bundle #{} - height {} to {}",
|
||||
index,
|
||||
index * 1000 + 1,
|
||||
(index + 1) * 1000
|
||||
);
|
||||
|
||||
let process_received_filters =
|
||||
|expected_filters| -> Result<BTreeMap<usize, Vec<u8>>, CompactFiltersError> {
|
||||
let mut filters_map = BTreeMap::new();
|
||||
for _ in 0..expected_filters {
|
||||
let filter = peer.pop_cf_filter_resp()?;
|
||||
if filter.filter_type != self.cf_store.get_filter_type() {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
}
|
||||
|
||||
match self.headers_store.get_height_for(&filter.block_hash)? {
|
||||
Some(height) => filters_map.insert(height, filter.filter),
|
||||
None => return Err(CompactFiltersError::InvalidFilter),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(filters_map)
|
||||
};
|
||||
|
||||
let start_height = index * 1000 + 1;
|
||||
let mut already_processed = 0;
|
||||
|
||||
if start_height < self.skip_blocks {
|
||||
status = self.cf_store.prune_filters(index, checkpoint)?;
|
||||
}
|
||||
|
||||
let stop_height = std::cmp::min(current_height, start_height + 999);
|
||||
let stop_hash = self.headers_store.get_block_hash(stop_height)?.unwrap();
|
||||
|
||||
if let BundleStatus::Init = status {
|
||||
log::trace!("status: Init");
|
||||
|
||||
let resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
|
||||
|
||||
assert!(resp.previous_filter == checkpoint);
|
||||
status =
|
||||
self.cf_store
|
||||
.advance_to_cf_headers(index, checkpoint, resp.filter_hashes)?;
|
||||
}
|
||||
if let BundleStatus::Tip { cf_filters } = status {
|
||||
log::trace!("status: Tip (beginning) ");
|
||||
|
||||
already_processed = cf_filters.len();
|
||||
let headers_resp = peer.get_cf_headers(0x00, start_height as u32, stop_hash)?;
|
||||
|
||||
let cf_headers = match self.cf_store.advance_to_cf_headers(
|
||||
index,
|
||||
checkpoint,
|
||||
headers_resp.filter_hashes,
|
||||
)? {
|
||||
BundleStatus::CFHeaders { cf_headers } => cf_headers,
|
||||
_ => return Err(CompactFiltersError::InvalidResponse),
|
||||
};
|
||||
|
||||
peer.get_cf_filters(
|
||||
self.cf_store.get_filter_type(),
|
||||
(start_height + cf_filters.len()) as u32,
|
||||
stop_hash,
|
||||
)?;
|
||||
let expected_filters = stop_height - start_height + 1 - cf_filters.len();
|
||||
let filters_map = process_received_filters(expected_filters)?;
|
||||
let filters = cf_filters
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.chain(filters_map.into_iter())
|
||||
.collect();
|
||||
status = self
|
||||
.cf_store
|
||||
.advance_to_cf_filters(index, checkpoint, cf_headers, filters)?;
|
||||
}
|
||||
if let BundleStatus::CFHeaders { cf_headers } = status {
|
||||
log::trace!("status: CFHeaders");
|
||||
|
||||
peer.get_cf_filters(
|
||||
self.cf_store.get_filter_type(),
|
||||
start_height as u32,
|
||||
stop_hash,
|
||||
)?;
|
||||
let expected_filters = stop_height - start_height + 1;
|
||||
let filters_map = process_received_filters(expected_filters)?;
|
||||
status = self.cf_store.advance_to_cf_filters(
|
||||
index,
|
||||
checkpoint,
|
||||
cf_headers,
|
||||
filters_map.into_iter().collect(),
|
||||
)?;
|
||||
}
|
||||
if let BundleStatus::CFilters { cf_filters } = status {
|
||||
log::trace!("status: CFilters");
|
||||
|
||||
let last_sync_buried_height = (start_height + already_processed)
|
||||
.checked_sub(BURIED_CONFIRMATIONS)
|
||||
.unwrap_or(0);
|
||||
|
||||
for (filter_index, filter) in cf_filters.iter().enumerate() {
|
||||
let height = filter_index + start_height;
|
||||
|
||||
// do not download blocks that were already "buried" since the last sync
|
||||
if height < last_sync_buried_height {
|
||||
continue;
|
||||
}
|
||||
|
||||
let block_hash = self.headers_store.get_block_hash(height)?.unwrap();
|
||||
|
||||
// TODO: also download random blocks?
|
||||
if process(&block_hash, &BlockFilter::new(&filter))? {
|
||||
log::debug!("Downloading block {}", block_hash);
|
||||
|
||||
let block = peer
|
||||
.get_block(block_hash)?
|
||||
.ok_or(CompactFiltersError::MissingBlock)?;
|
||||
self.headers_store.save_full_block(&block, height)?;
|
||||
}
|
||||
}
|
||||
|
||||
status = BundleStatus::Processed { cf_filters };
|
||||
}
|
||||
if let BundleStatus::Processed { cf_filters } = status {
|
||||
log::trace!("status: Processed");
|
||||
|
||||
if current_height - stop_height > 1000 {
|
||||
status = self.cf_store.prune_filters(index, checkpoint)?;
|
||||
} else {
|
||||
status = self.cf_store.mark_as_tip(index, cf_filters, checkpoint)?;
|
||||
}
|
||||
|
||||
completed_bundle(index)?;
|
||||
}
|
||||
if let BundleStatus::Pruned = status {
|
||||
log::trace!("status: Pruned");
|
||||
}
|
||||
if let BundleStatus::Tip { .. } = status {
|
||||
log::trace!("status: Tip");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_headers<F>(
|
||||
peer: Arc<Peer>,
|
||||
store: Arc<HeadersStore<Full>>,
|
||||
sync_fn: F,
|
||||
) -> Result<Option<HeadersStore<Snapshot>>, CompactFiltersError>
|
||||
where
|
||||
F: Fn(usize) -> Result<(), crate::error::Error>,
|
||||
{
|
||||
let locators = store.get_locators()?;
|
||||
let locators_vec = locators.iter().map(|(hash, _)| hash).cloned().collect();
|
||||
let locators_map: HashMap<_, _> = locators.into_iter().collect();
|
||||
|
||||
peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new(
|
||||
locators_vec,
|
||||
Default::default(),
|
||||
)))?;
|
||||
let (mut snapshot, mut last_hash) =
|
||||
if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
|
||||
if headers.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
match locators_map.get(&headers[0].prev_blockhash) {
|
||||
None => return Err(CompactFiltersError::InvalidHeaders),
|
||||
Some(from) => (
|
||||
store.start_snapshot(*from)?,
|
||||
headers[0].prev_blockhash.clone(),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
};
|
||||
|
||||
let mut sync_height = store.get_height()?;
|
||||
while sync_height < peer.get_version().start_height as usize {
|
||||
peer.send(NetworkMessage::GetHeaders(GetHeadersMessage::new(
|
||||
vec![last_hash],
|
||||
Default::default(),
|
||||
)))?;
|
||||
if let Some(NetworkMessage::Headers(headers)) = peer.recv("headers", None)? {
|
||||
let batch_len = headers.len();
|
||||
last_hash = snapshot.apply(sync_height, headers)?;
|
||||
|
||||
sync_height += batch_len;
|
||||
sync_fn(sync_height)?;
|
||||
} else {
|
||||
return Err(CompactFiltersError::InvalidResponse);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(snapshot))
|
||||
}
|
@ -40,9 +40,13 @@ impl Blockchain for ElectrumBlockchain {
|
||||
|
||||
impl OnlineBlockchain for ElectrumBlockchain {
|
||||
fn get_capabilities(&self) -> HashSet<Capability> {
|
||||
vec![Capability::FullHistory, Capability::GetAnyTx]
|
||||
.into_iter()
|
||||
.collect()
|
||||
vec![
|
||||
Capability::FullHistory,
|
||||
Capability::GetAnyTx,
|
||||
Capability::AccurateFees,
|
||||
]
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
|
||||
|
@ -59,9 +59,13 @@ impl Blockchain for EsploraBlockchain {
|
||||
#[maybe_async]
|
||||
impl OnlineBlockchain for EsploraBlockchain {
|
||||
fn get_capabilities(&self) -> HashSet<Capability> {
|
||||
vec![Capability::FullHistory, Capability::GetAnyTx]
|
||||
.into_iter()
|
||||
.collect()
|
||||
vec![
|
||||
Capability::FullHistory,
|
||||
Capability::GetAnyTx,
|
||||
Capability::AccurateFees,
|
||||
]
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
|
||||
|
@ -19,10 +19,14 @@ pub mod esplora;
|
||||
#[cfg(feature = "esplora")]
|
||||
pub use self::esplora::EsploraBlockchain;
|
||||
|
||||
#[cfg(feature = "compact_filters")]
|
||||
pub mod compact_filters;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum Capability {
|
||||
FullHistory,
|
||||
GetAnyTx,
|
||||
AccurateFees,
|
||||
}
|
||||
|
||||
pub trait Blockchain {
|
||||
@ -46,13 +50,13 @@ impl Blockchain for OfflineBlockchain {
|
||||
pub trait OnlineBlockchain: Blockchain {
|
||||
fn get_capabilities(&self) -> HashSet<Capability>;
|
||||
|
||||
fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>(
|
||||
fn setup<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
progress_update: P,
|
||||
) -> Result<(), Error>;
|
||||
fn sync<D: BatchDatabase + DatabaseUtils, P: Progress>(
|
||||
fn sync<D: BatchDatabase + DatabaseUtils, P: 'static + Progress>(
|
||||
&self,
|
||||
stop_gap: Option<usize>,
|
||||
database: &mut D,
|
||||
@ -70,7 +74,7 @@ pub trait OnlineBlockchain: Blockchain {
|
||||
|
||||
pub type ProgressData = (f32, Option<String>);
|
||||
|
||||
pub trait Progress {
|
||||
pub trait Progress: Send {
|
||||
fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
@ -89,6 +93,7 @@ impl Progress for Sender<ProgressData> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NoopProgress;
|
||||
|
||||
pub fn noop_progress() -> NoopProgress {
|
||||
@ -100,3 +105,18 @@ impl Progress for NoopProgress {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LogProgress;
|
||||
|
||||
pub fn log_progress() -> LogProgress {
|
||||
LogProgress
|
||||
}
|
||||
|
||||
impl Progress for LogProgress {
|
||||
fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error> {
|
||||
log::info!("Sync {:.3}%: `{}`", progress, message.unwrap_or("".into()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
|
||||
use bitcoin::util::psbt::PartiallySignedTransaction;
|
||||
use bitcoin::{Address, OutPoint, Txid};
|
||||
|
||||
use crate::blockchain::log_progress;
|
||||
use crate::error::Error;
|
||||
use crate::types::ScriptType;
|
||||
use crate::{FeeRate, TxBuilder, Wallet};
|
||||
@ -344,7 +345,7 @@ where
|
||||
if let Some(_sub_matches) = matches.subcommand_matches("get_new_address") {
|
||||
Ok(Some(format!("{}", wallet.get_new_address()?)))
|
||||
} else if let Some(_sub_matches) = matches.subcommand_matches("sync") {
|
||||
maybe_await!(wallet.sync(None))?;
|
||||
maybe_await!(wallet.sync(log_progress(), None))?;
|
||||
Ok(None)
|
||||
} else if let Some(_sub_matches) = matches.subcommand_matches("list_unspent") {
|
||||
let mut res = String::new();
|
||||
|
12
src/error.rs
12
src/error.rs
@ -54,6 +54,8 @@ pub enum Error {
|
||||
Electrum(electrum_client::Error),
|
||||
#[cfg(feature = "esplora")]
|
||||
Esplora(crate::blockchain::esplora::EsploraError),
|
||||
#[cfg(feature = "compact_filters")]
|
||||
CompactFilters(crate::blockchain::compact_filters::CompactFiltersError),
|
||||
#[cfg(feature = "key-value-db")]
|
||||
Sled(sled::Error),
|
||||
}
|
||||
@ -87,3 +89,13 @@ impl_error!(electrum_client::Error, Electrum);
|
||||
impl_error!(crate::blockchain::esplora::EsploraError, Esplora);
|
||||
#[cfg(feature = "key-value-db")]
|
||||
impl_error!(sled::Error, Sled);
|
||||
|
||||
#[cfg(feature = "compact_filters")]
|
||||
impl From<crate::blockchain::compact_filters::CompactFiltersError> for Error {
|
||||
fn from(other: crate::blockchain::compact_filters::CompactFiltersError) -> Self {
|
||||
match other {
|
||||
crate::blockchain::compact_filters::CompactFiltersError::Global(e) => *e,
|
||||
err @ _ => Error::CompactFilters(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ extern crate async_trait;
|
||||
#[macro_use]
|
||||
extern crate magical_macros;
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(any(test, feature = "compact_filters"))]
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
|
@ -27,7 +27,7 @@ pub mod utils;
|
||||
use tx_builder::TxBuilder;
|
||||
use utils::{FeeRate, IsDust};
|
||||
|
||||
use crate::blockchain::{noop_progress, Blockchain, OfflineBlockchain, OnlineBlockchain};
|
||||
use crate::blockchain::{Blockchain, OfflineBlockchain, OnlineBlockchain, Progress};
|
||||
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
|
||||
use crate::descriptor::{get_checksum, DescriptorMeta, ExtendedDescriptor, ExtractPolicy, Policy};
|
||||
use crate::error::Error;
|
||||
@ -1015,7 +1015,11 @@ where
|
||||
}
|
||||
|
||||
#[maybe_async]
|
||||
pub fn sync(&self, max_address_param: Option<u32>) -> Result<(), Error> {
|
||||
pub fn sync<P: 'static + Progress>(
|
||||
&self,
|
||||
progress_update: P,
|
||||
max_address_param: Option<u32>,
|
||||
) -> Result<(), Error> {
|
||||
debug!("Begin sync...");
|
||||
|
||||
let mut run_setup = false;
|
||||
@ -1057,13 +1061,13 @@ where
|
||||
maybe_await!(self.client.setup(
|
||||
None,
|
||||
self.database.borrow_mut().deref_mut(),
|
||||
noop_progress(),
|
||||
progress_update,
|
||||
))
|
||||
} else {
|
||||
maybe_await!(self.client.sync(
|
||||
None,
|
||||
self.database.borrow_mut().deref_mut(),
|
||||
noop_progress(),
|
||||
progress_update,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
|
||||
use testutils::{TestClient, serial};
|
||||
|
||||
use #root_ident::blockchain::OnlineBlockchain;
|
||||
use #root_ident::blockchain::{OnlineBlockchain, noop_progress};
|
||||
use #root_ident::descriptor::ExtendedDescriptor;
|
||||
use #root_ident::database::MemoryDatabase;
|
||||
use #root_ident::types::ScriptType;
|
||||
@ -92,7 +92,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
};
|
||||
let txid = test_client.receive(tx);
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
assert_eq!(wallet.list_unspent().unwrap()[0].is_internal, false);
|
||||
@ -116,7 +116,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 25) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 100_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
|
||||
@ -127,14 +127,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
fn test_sync_before_and_after_receive() {
|
||||
let (wallet, descriptors, mut test_client) = init_single_sig();
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 0);
|
||||
|
||||
test_client.receive(testutils! {
|
||||
@tx ( (@external descriptors, 0) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
|
||||
@ -149,7 +149,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000, (@external descriptors, 5) => 30_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 105_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
|
||||
@ -174,7 +174,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 5) => 25_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 75_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
|
||||
@ -190,14 +190,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
test_client.receive(testutils! {
|
||||
@tx ( (@external descriptors, 0) => 25_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 75_000);
|
||||
}
|
||||
|
||||
@ -210,7 +210,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 ) ( @replaceable true )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
|
||||
@ -224,7 +224,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
|
||||
let new_txid = test_client.bump_fee(&txid);
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
|
||||
@ -246,7 +246,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 ) ( @confirmations 1 ) ( @replaceable true )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 1);
|
||||
@ -259,7 +259,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
// Invalidate 1 block
|
||||
test_client.invalidate(1);
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
@ -278,7 +278,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap();
|
||||
@ -286,7 +286,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received);
|
||||
|
||||
assert_eq!(wallet.list_transactions(false).unwrap().len(), 2);
|
||||
@ -303,7 +303,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr, 25_000)])).unwrap();
|
||||
@ -311,12 +311,12 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
let sent_txid = wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received);
|
||||
|
||||
// empty wallet
|
||||
let wallet = get_wallet_from_descriptors(&descriptors);
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
let tx_map = wallet.list_transactions(false).unwrap().into_iter().map(|tx| (tx.txid, tx)).collect::<std::collections::HashMap<_, _>>();
|
||||
|
||||
@ -340,7 +340,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 )
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
let mut total_sent = 0;
|
||||
@ -350,17 +350,17 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
|
||||
total_sent += 5_000 + details.fees;
|
||||
}
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent);
|
||||
|
||||
// empty wallet
|
||||
let wallet = get_wallet_from_descriptors(&descriptors);
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000 - total_sent);
|
||||
}
|
||||
|
||||
@ -374,14 +374,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1)
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 5_000)]).enable_rbf()).unwrap();
|
||||
let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000 - details.fees - 5_000);
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received);
|
||||
|
||||
@ -389,7 +389,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(new_psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000 - new_details.fees - 5_000);
|
||||
assert_eq!(wallet.get_balance().unwrap(), new_details.received);
|
||||
|
||||
@ -406,14 +406,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000 ) (@confirmations 1)
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 50_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
|
||||
let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 1_000 - details.fees);
|
||||
assert_eq!(wallet.get_balance().unwrap(), details.received);
|
||||
|
||||
@ -422,7 +422,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(new_psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 0);
|
||||
assert_eq!(new_details.received, 0);
|
||||
|
||||
@ -439,14 +439,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 75_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
|
||||
let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees);
|
||||
assert_eq!(details.received, 1_000 - details.fees);
|
||||
|
||||
@ -455,7 +455,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(new_psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(new_details.sent, 75_000);
|
||||
assert_eq!(wallet.get_balance().unwrap(), new_details.received);
|
||||
}
|
||||
@ -470,14 +470,14 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
@tx ( (@external descriptors, 0) => 50_000, (@external descriptors, 1) => 25_000 ) (@confirmations 1)
|
||||
});
|
||||
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 75_000);
|
||||
|
||||
let (psbt, details) = wallet.create_tx(TxBuilder::from_addressees(vec![(node_addr.clone(), 49_000)]).enable_rbf()).unwrap();
|
||||
let (psbt, finalized) = wallet.sign(psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(wallet.get_balance().unwrap(), 26_000 - details.fees);
|
||||
assert_eq!(details.received, 1_000 - details.fees);
|
||||
|
||||
@ -487,7 +487,7 @@ pub fn magical_blockchain_tests(attr: TokenStream, item: TokenStream) -> TokenSt
|
||||
let (new_psbt, finalized) = wallet.sign(new_psbt, None).unwrap();
|
||||
assert!(finalized, "Cannot finalize transaction");
|
||||
wallet.broadcast(new_psbt.extract_tx()).unwrap();
|
||||
wallet.sync(None).unwrap();
|
||||
wallet.sync(noop_progress(), None).unwrap();
|
||||
assert_eq!(new_details.sent, 75_000);
|
||||
assert_eq!(wallet.get_balance().unwrap(), 0);
|
||||
assert_eq!(new_details.received, 0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user