Write the docs for blockchain::*

This commit is contained in:
Alekos Filini 2020-09-03 11:36:07 +02:00
parent c0867a6adc
commit 6b9c363937
No known key found for this signature in database
GPG Key ID: 5E8AFC3034FDFA4F
6 changed files with 231 additions and 11 deletions

View File

@ -22,6 +22,42 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Compact Filters
//!
//! This module contains a multithreaded implementation of an [`OnlineBlockchain`] backend that
//! uses BIP157 (aka "Neutrino") to populate the wallet's [database](crate::database::Database)
//! by downloading compact filters from the P2P network.
//!
//! Since there are currently very few peers "in the wild" that advertise the required service
//! flag, this implementation requires that one or more known peers are provided by the user.
//! No dns or other kinds of peer discovery are done internally.
//!
//! Moreover, this module doesn't currently support detecting and resolving conflicts between
//! messages received by different peers. Thus, it's recommended to use this module by only
//! connecting to a single peer at a time, optionally by opening multiple connections if it's
//! desirable to use multiple threads at once to sync in parallel.
//!
//! ## Example
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use bitcoin::*;
//! # use magical_bitcoin_wallet::*;
//! # use magical_bitcoin_wallet::blockchain::compact_filters::*;
//! let num_threads = 4;
//!
//! let mempool = Arc::new(Mempool::default());
//! let peers = (0..num_threads)
//! .map(|_| Peer::connect(
//! "btcd-mainnet.lightning.computer:8333",
//! Arc::clone(&mempool),
//! Network::Bitcoin,
//! ))
//! .collect::<Result<_, _>>()?;
//! let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?;
//! # Ok::<(), magical_bitcoin_wallet::error::Error>(())
//! ```
use std::collections::HashSet;
use std::fmt;
use std::path::Path;
@ -56,10 +92,22 @@ const SYNC_HEADERS_COST: f32 = 1.0;
const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
const PROCESS_BLOCKS_COST: f32 = 20_000.0;
/// Structure implementing the required blockchain traits
///
/// ## Example
/// See the [`blockchain::compact_filters`](crate::blockchain::compact_filters) module for a usage example.
#[derive(Debug)]
pub struct CompactFiltersBlockchain(Option<CompactFilters>);
impl CompactFiltersBlockchain {
/// Construct a new instance given a list of peers, a path to store headers and block
/// filters downloaded during the sync and optionally a number of blocks to ignore starting
/// from the genesis while scanning for the wallet's outputs.
///
/// For each [`Peer`] specified a new thread will be spawned to download and verify the filters
/// in parallel. It's currently recommended to only connect to a single peer to avoid
/// inconsistencies in the data returned, optionally with multiple connections in parallel to
/// speed-up the sync process.
pub fn new<P: AsRef<Path>>(
peers: Vec<Peer>,
storage_dir: P,
@ -73,6 +121,7 @@ impl CompactFiltersBlockchain {
}
}
/// Internal struct that contains the state of a [`CompactFiltersBlockchain`]
#[derive(Debug)]
struct CompactFilters {
peers: Vec<Arc<Peer>>,
@ -81,6 +130,7 @@ struct CompactFilters {
}
impl CompactFilters {
/// Constructor, see [`CompactFiltersBlockchain::new`] for the documentation
pub fn new<P: AsRef<Path>>(
peers: Vec<Peer>,
storage_dir: P,
@ -117,6 +167,8 @@ impl CompactFilters {
})
}
/// Process a transaction by looking for inputs that spend from a UTXO in the database or
/// outputs that send funds to a know script_pubkey.
fn process_tx<D: BatchDatabase>(
&self,
database: &mut D,
@ -439,25 +491,40 @@ impl OnlineBlockchain for CompactFiltersBlockchain {
}
}
/// An error that can occur during sync with a [`CompactFiltersBlockchain`]
#[derive(Debug)]
pub enum CompactFiltersError {
/// A peer sent an invalid or unexpected response
InvalidResponse,
/// The headers returned are invalid
InvalidHeaders,
/// The compact filter headers returned are invalid
InvalidFilterHeader,
/// The compact filter returned is invalid
InvalidFilter,
/// The peer is missing a block in the valid chain
MissingBlock,
/// The data stored in the block filters storage are corrupted
DataCorruption,
/// A peer is not connected
NotConnected,
/// A peer took too long to reply to one of our messages
Timeout,
/// No peers have been specified
NoPeers,
/// Internal database error
DB(rocksdb::Error),
/// Internal I/O error
IO(std::io::Error),
/// Invalid BIP158 filter
BIP158(bitcoin::util::bip158::Error),
/// Internal system time error
Time(std::time::SystemTimeError),
/// Wrapper for [`crate::error::Error`]
Global(Box<crate::error::Error>),
}

View File

@ -48,18 +48,27 @@ use super::CompactFiltersError;
type ResponsesMap = HashMap<&'static str, Arc<(Mutex<Vec<NetworkMessage>>, Condvar)>>;
pub(crate) const TIMEOUT_SECS: u64 = 10;
pub(crate) const TIMEOUT_SECS: u64 = 30;
/// Container for unconfirmed, but valid Bitcoin transactions
///
/// It is normally shared between [`Peer`]s with the use of [`Arc`], so that transactions are not
/// duplicated in memory.
#[derive(Debug, Default)]
pub struct Mempool {
txs: RwLock<HashMap<Txid, Transaction>>,
}
impl Mempool {
/// Add a transaction to the mempool
///
/// Note that this doesn't propagate the transaction to other
/// peers. To do that, [`broadcast`](crate::blockchain::OnlineBlockchain::broadcast) should be used.
pub fn add_tx(&self, tx: Transaction) {
self.txs.write().unwrap().insert(tx.txid(), tx);
}
/// Look-up a transaction in the mempool given an [`Inventory`] request
pub fn get_tx(&self, inventory: &Inventory) -> Option<Transaction> {
let txid = match inventory {
Inventory::Error | Inventory::Block(_) | Inventory::WitnessBlock(_) => return None,
@ -69,15 +78,18 @@ impl Mempool {
self.txs.read().unwrap().get(&txid).cloned()
}
/// Return whether or not the mempool contains a transaction with a given txid
pub fn has_tx(&self, txid: &Txid) -> bool {
self.txs.read().unwrap().contains_key(txid)
}
/// Return the list of transactions contained in the mempool
pub fn iter_txs(&self) -> Vec<Transaction> {
self.txs.read().unwrap().values().cloned().collect()
}
}
/// A Bitcoin peer
#[derive(Debug)]
pub struct Peer {
writer: Arc<Mutex<TcpStream>>,
@ -93,7 +105,11 @@ pub struct Peer {
}
impl Peer {
pub fn new<A: ToSocketAddrs>(
/// Connect to a peer over a plaintext TCP connection
///
/// This function internally spawns a new thread that will monitor incoming messages from the
/// peer, and optionally reply to some of them transparently, like [pings](NetworkMessage::Ping)
pub fn connect<A: ToSocketAddrs>(
address: A,
mempool: Arc<Mempool>,
network: Network,
@ -103,7 +119,12 @@ impl Peer {
Peer::from_stream(stream, mempool, network)
}
pub fn new_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
/// Connect to a peer through a SOCKS5 proxy, optionally by using some credentials, specified
/// as a tuple of `(username, password)`
///
/// This function internally spawns a new thread that will monitor incoming messages from the
/// peer, and optionally reply to some of them transparently, like [pings](NetworkMessage::Ping)
pub fn connect_proxy<T: ToTargetAddr, P: ToSocketAddrs>(
target: T,
proxy: P,
credentials: Option<(&str, &str)>,
@ -119,6 +140,7 @@ impl Peer {
Peer::from_stream(socks_stream.into_inner(), mempool, network)
}
/// Create a [`Peer`] from an already connected TcpStream
fn from_stream(
stream: TcpStream,
mempool: Arc<Mempool>,
@ -194,6 +216,7 @@ impl Peer {
})
}
/// Send a Bitcoin network message
fn _send(
writer: &mut TcpStream,
magic: u32,
@ -210,6 +233,7 @@ impl Peer {
Ok(())
}
/// Wait for a specific incoming Bitcoin message, optionally with a timeout
fn _recv(
responses: &Arc<RwLock<ResponsesMap>>,
wait_for: &'static str,
@ -241,22 +265,27 @@ impl Peer {
Ok(messages.pop())
}
/// Return the [`VersionMessage`] sent by the peer
pub fn get_version(&self) -> &VersionMessage {
&self.version
}
/// Return the Bitcoin [`Network`] in use
pub fn get_network(&self) -> Network {
self.network
}
/// Return the mempool used by this peer
pub fn get_mempool(&self) -> Arc<Mempool> {
Arc::clone(&self.mempool)
}
/// Return whether or not the peer is still connected
pub fn is_connected(&self) -> bool {
*self.connected.read().unwrap()
}
/// Internal function called once the `reader_thread` is spawned
fn reader_thread(
network: Network,
connection: TcpStream,
@ -341,11 +370,13 @@ impl Peer {
}
}
/// Send a raw Bitcoin message to the peer
pub fn send(&self, payload: NetworkMessage) -> Result<(), CompactFiltersError> {
let mut writer = self.writer.lock().unwrap();
Self::_send(&mut writer, self.network.magic(), payload)
}
/// Waits for a specific incoming Bitcoin message, optionally with a timeout
pub fn recv(
&self,
wait_for: &'static str,

View File

@ -22,6 +22,21 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Electrum
//!
//! This module defines an [`OnlineBlockchain`] struct that wraps an [`electrum_client::Client`]
//! and implements the logic required to populate the wallet's [database](crate::database::Database) by
//! querying the inner client.
//!
//! ## Example
//!
//! ```no_run
//! # use magical_bitcoin_wallet::blockchain::electrum::ElectrumBlockchain;
//! let client = electrum_client::Client::new("ssl://electrum.blockstream.info:50002", None)?;
//! let blockchain = ElectrumBlockchain::from(client);
//! # Ok::<(), magical_bitcoin_wallet::error::Error>(())
//! ```
use std::collections::HashSet;
#[allow(unused_imports)]
@ -37,6 +52,10 @@ use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
/// Wrapper over an Electrum Client that implements the required blockchain traits
///
/// ## Example
/// See the [`blockchain::electrum`](crate::blockchain::electrum) module for a usage example.
pub struct ElectrumBlockchain(Option<Client>);
#[cfg(test)]

View File

@ -22,6 +22,19 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Esplora
//!
//! This module defines an [`OnlineBlockchain`] struct that can query an Esplora backend
//! populate the wallet's [database](crate::database::Database) by
//!
//! ## Example
//!
//! ```no_run
//! # use magical_bitcoin_wallet::blockchain::esplora::EsploraBlockchain;
//! let blockchain = EsploraBlockchain::new("https://blockstream.info/testnet/");
//! # Ok::<(), magical_bitcoin_wallet::error::Error>(())
//! ```
use std::collections::{HashMap, HashSet};
use std::fmt;
@ -41,18 +54,22 @@ use bitcoin::{Script, Transaction, Txid};
use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync};
use super::*;
use crate::database::{BatchDatabase, DatabaseUtils};
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
#[derive(Debug)]
pub struct UrlClient {
struct UrlClient {
url: String,
// We use the async client instead of the blocking one because it automatically uses `fetch`
// when the target platform is wasm32.
client: Client,
}
/// Structure that implements the logic to sync with Esplora
///
/// ## Example
/// See the [`blockchain::esplora`](crate::blockchain::esplora) module for a usage example.
#[derive(Debug)]
pub struct EsploraBlockchain(Option<UrlClient>);
@ -63,6 +80,9 @@ impl std::convert::From<UrlClient> for EsploraBlockchain {
}
impl EsploraBlockchain {
/// Create a new instance of the client from a base URL
///
/// The client internally adds the `/api` prefix to `base_url` before making any requests
pub fn new(base_url: &str) -> Self {
EsploraBlockchain(Some(UrlClient {
url: base_url.to_string(),
@ -350,12 +370,17 @@ struct EsploraListUnspent {
status: EsploraGetHistoryStatus,
}
/// Errors that can happen during a sync with [`EsploraBlockchain`]
#[derive(Debug)]
pub enum EsploraError {
/// Error with the HTTP call
Reqwest(reqwest::Error),
/// Invalid number returned
Parsing(std::num::ParseIntError),
/// Invalid Bitcoin data returned
BitcoinEncoding(bitcoin::consensus::encode::Error),
/// Transaction not found
TransactionNotFound(Txid),
}

View File

@ -22,6 +22,22 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Blockchain backends
//!
//! This module provides the implementation of a few commonly-used backends like
//! [Electrum](crate::blockchain::electrum), [Esplora](crate::blockchain::esplora) and
//! [Compact Filters/Neutrino](crate::blockchain::compact_filters), along with two generalized
//! traits [`Blockchain`] and [`OnlineBlockchain`] that can be implemented to build customized
//! backends.
//!
//! Types that only implement the [`Blockchain`] trait can be used as backends for [`Wallet`](crate::wallet::Wallet)s, but any
//! action that requires interacting with the blockchain won't be available ([`Wallet::sync`](crate::wallet::Wallet::sync) and
//! [`Wallet::broadcast`](crate::wallet::Wallet::broadcast)). This allows the creation of physically air-gapped wallets, that have no
//! ability to contact the outside world. An example of an offline-only client is [`OfflineBlockchain`].
//!
//! Types that also implement [`OnlineBlockchain`] will make the two aforementioned actions
//! available.
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::mpsc::{channel, Receiver, Sender};
@ -53,19 +69,40 @@ pub mod compact_filters;
#[cfg(feature = "compact_filters")]
pub use self::compact_filters::CompactFiltersBlockchain;
/// Capabilities that can be supported by an [`OnlineBlockchain`] backend
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Capability {
/// Can recover the full history of a wallet and not only the set of currently spendable UTXOs
FullHistory,
/// Can fetch any historical transaction given its txid
GetAnyTx,
/// Can compute accurate fees for the transactions found during sync
AccurateFees,
}
/// Base trait for a blockchain backend
///
/// This trait is always required, even for "air-gapped" backends that don't actually make any
/// external call. Clients that have the ability to make external calls must also implement `OnlineBlockchain`.
pub trait Blockchain {
/// Return whether or not the client has the ability to fullfill requests
///
/// This should always be `false` for offline-only types, and can be true for types that also
/// implement [`OnlineBlockchain`], if they have the ability to fullfill requests.
fn is_online(&self) -> bool;
/// Create a new instance of the client that is offline-only
///
/// For types that also implement [`OnlineBlockchain`], this means creating an instance that
/// returns [`Error::OfflineClient`](crate::error::Error::OfflineClient) if any of the "online"
/// methods are called.
///
/// This is generally implemented by wrapping the client in an [`Option`] that has [`Option::None`] value
/// when created with this method, and is [`Option::Some`] if properly instantiated.
fn offline() -> Self;
}
/// Type that only implements [`Blockchain`] and is always offline
pub struct OfflineBlockchain;
impl Blockchain for OfflineBlockchain {
fn offline() -> Self {
@ -77,16 +114,47 @@ impl Blockchain for OfflineBlockchain {
}
}
/// Trait that defines the actions that must be supported by an online [`Blockchain`]
#[maybe_async]
pub trait OnlineBlockchain: Blockchain {
/// Return the set of [`Capability`] supported by this backend
fn get_capabilities(&self) -> HashSet<Capability>;
/// Setup the backend and populate the internal database for the first time
///
/// This method is the equivalent of [`OnlineBlockchain::sync`], but it's guaranteed to only be
/// called once, at the first [`Wallet::sync`](crate::wallet::Wallet::sync).
///
/// The rationale behind the distinction between `sync` and `setup` is that some custom backends
/// might need to perform specific actions only the first time they are synced.
///
/// For types that do not have that distinction, only this method can be implemented, since
/// [`OnlineBlockchain::sync`] defaults to calling this internally if not overridden.
fn setup<D: BatchDatabase, P: 'static + Progress>(
&self,
stop_gap: Option<usize>,
database: &mut D,
progress_update: P,
) -> Result<(), Error>;
/// Populate the internal database with transactions and UTXOs
///
/// If not overridden, it defaults to calling [`OnlineBlockchain::setup`] internally.
///
/// This method should implement the logic required to iterate over the list of the wallet's
/// script_pubkeys using [`Database::iter_script_pubkeys`] and look for relevant transactions
/// in the blockchain to populate the database with [`BatchOperations::set_tx`] and
/// [`BatchOperations::set_utxo`].
///
/// This method should also take care of removing UTXOs that are seen as spent in the
/// blockchain, using [`BatchOperations::del_utxo`].
///
/// The `progress_update` object can be used to give the caller updates about the progress by using
/// [`Progress::update`].
///
/// [`Database::iter_script_pubkeys`]: crate::database::Database::iter_script_pubkeys
/// [`BatchOperations::set_tx`]: crate::database::BatchOperations::set_tx
/// [`BatchOperations::set_utxo`]: crate::database::BatchOperations::set_utxo
/// [`BatchOperations::del_utxo`]: crate::database::BatchOperations::del_utxo
fn sync<D: BatchDatabase, P: 'static + Progress>(
&self,
stop_gap: Option<usize>,
@ -96,19 +164,31 @@ pub trait OnlineBlockchain: Blockchain {
maybe_await!(self.setup(stop_gap, database, progress_update))
}
/// Fetch a transaction from the blockchain given its txid
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error>;
/// Broadcast a transaction
fn broadcast(&self, tx: &Transaction) -> Result<(), Error>;
/// Return the current height
fn get_height(&self) -> Result<u32, Error>;
/// Estimate the fee rate required to confirm a transaction in a given `target` of blocks
fn estimate_fee(&self, target: usize) -> Result<FeeRate, Error>;
}
/// Data sent with a progress update over a [`channel`]
pub type ProgressData = (f32, Option<String>);
/// Trait for types that can receive and process progress updates during [`OnlineBlockchain::sync`] and
/// [`OnlineBlockchain::setup`]
pub trait Progress: Send {
/// Send a new progress update
///
/// The `progress` value should be in the range 0.0 - 100.0, and the `message` value is an
/// optional text message that can be displayed to the user.
fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error>;
}
/// Shortcut to create a [`channel`] (pair of [`Sender`] and [`Receiver`]) that can transport [`ProgressData`]
pub fn progress() -> (Sender<ProgressData>, Receiver<ProgressData>) {
channel()
}
@ -124,9 +204,11 @@ impl Progress for Sender<ProgressData> {
}
}
/// Type that implements [`Progress`] and drops every update received
#[derive(Clone)]
pub struct NoopProgress;
/// Create a new instance of [`NoopProgress`]
pub fn noop_progress() -> NoopProgress {
NoopProgress
}
@ -137,9 +219,11 @@ impl Progress for NoopProgress {
}
}
/// Type that implements [`Progress`] and logs at level `INFO` every update received
#[derive(Clone)]
pub struct LogProgress;
/// Create a nwe instance of [`LogProgress`]
pub fn log_progress() -> LogProgress {
LogProgress
}

View File

@ -80,9 +80,3 @@ pub use wallet::address_validator;
pub use wallet::signer;
pub use wallet::tx_builder::TxBuilder;
pub use wallet::{OfflineWallet, Wallet};
#[cfg(feature = "esplora")]
pub use blockchain::esplora::EsploraBlockchain;
#[cfg(feature = "electrum")]
pub use blockchain::electrum::ElectrumBlockchain;