Add a generalized "Blockchain" interface
This commit is contained in:
		
							parent
							
								
									0988c8b8d5
								
							
						
					
					
						commit
						75a9c30c9a
					
				| @ -18,7 +18,7 @@ electrum-client = { version = "0.1.0-beta.5", optional = true } | ||||
| [features] | ||||
| minimal = [] | ||||
| compiler = ["miniscript/compiler"] | ||||
| default = ["sled", "electrum-client"] | ||||
| default = ["key-value-db", "electrum"] | ||||
| electrum = ["electrum-client"] | ||||
| key-value-db = ["sled"] | ||||
| 
 | ||||
|  | ||||
| @ -23,6 +23,7 @@ use bitcoin::util::psbt::PartiallySignedTransaction; | ||||
| use bitcoin::{Address, Network, OutPoint}; | ||||
| 
 | ||||
| use magical_bitcoin_wallet::bitcoin; | ||||
| use magical_bitcoin_wallet::blockchain::ElectrumBlockchain; | ||||
| use magical_bitcoin_wallet::sled; | ||||
| use magical_bitcoin_wallet::types::ScriptType; | ||||
| use magical_bitcoin_wallet::{Client, Wallet}; | ||||
| @ -255,7 +256,14 @@ fn main() { | ||||
|     debug!("database opened successfully"); | ||||
| 
 | ||||
|     let client = Client::new(matches.value_of("server").unwrap()).unwrap(); | ||||
|     let wallet = Wallet::new(descriptor, change_descriptor, network, tree, client).unwrap(); | ||||
|     let wallet = Wallet::new( | ||||
|         descriptor, | ||||
|         change_descriptor, | ||||
|         network, | ||||
|         tree, | ||||
|         ElectrumBlockchain::from(client), | ||||
|     ) | ||||
|     .unwrap(); | ||||
| 
 | ||||
|     // TODO: print errors in a nice way
 | ||||
|     let handle_matches = |matches: ArgMatches<'_>| { | ||||
|  | ||||
							
								
								
									
										333
									
								
								src/blockchain/electrum.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										333
									
								
								src/blockchain/electrum.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,333 @@ | ||||
| use std::cmp; | ||||
| use std::collections::{HashSet, VecDeque}; | ||||
| use std::convert::TryFrom; | ||||
| use std::io::{Read, Write}; | ||||
| 
 | ||||
| #[allow(unused_imports)] | ||||
| use log::{debug, error, info, trace}; | ||||
| 
 | ||||
| use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid}; | ||||
| 
 | ||||
| use electrum_client::types::*; | ||||
| use electrum_client::Client; | ||||
| 
 | ||||
| use super::*; | ||||
| use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| use crate::types::{ScriptType, TransactionDetails, UTXO}; | ||||
| use crate::wallet::utils::ChunksIterator; | ||||
| 
 | ||||
| pub struct ElectrumBlockchain<T: Read + Write>(Option<Client<T>>); | ||||
| 
 | ||||
| impl<T: Read + Write> std::convert::From<Client<T>> for ElectrumBlockchain<T> { | ||||
|     fn from(client: Client<T>) -> Self { | ||||
|         ElectrumBlockchain(Some(client)) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T: Read + Write> Blockchain for ElectrumBlockchain<T> { | ||||
|     fn offline() -> Self { | ||||
|         ElectrumBlockchain(None) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T: Read + Write> OnlineBlockchain for ElectrumBlockchain<T> { | ||||
|     fn get_capabilities(&self) -> HashSet<Capability> { | ||||
|         vec![Capability::FullHistory, Capability::GetAnyTx] | ||||
|             .into_iter() | ||||
|             .collect() | ||||
|     } | ||||
| 
 | ||||
|     fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>( | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         _progress_update: P, | ||||
|     ) -> Result<(), Error> { | ||||
|         // TODO: progress
 | ||||
| 
 | ||||
|         let stop_gap = stop_gap.unwrap_or(20); | ||||
|         let batch_query_size = 20; | ||||
| 
 | ||||
|         // check unconfirmed tx, delete so they are retrieved later
 | ||||
|         let mut del_batch = database.begin_batch(); | ||||
|         for tx in database.iter_txs(false)? { | ||||
|             if tx.height.is_none() { | ||||
|                 del_batch.del_tx(&tx.txid, false)?; | ||||
|             } | ||||
|         } | ||||
|         database.commit_batch(del_batch)?; | ||||
| 
 | ||||
|         // maximum derivation index for a change address that we've seen during sync
 | ||||
|         let mut change_max_deriv = 0; | ||||
| 
 | ||||
|         let mut already_checked: HashSet<Script> = HashSet::new(); | ||||
|         let mut to_check_later = VecDeque::with_capacity(batch_query_size); | ||||
| 
 | ||||
|         // insert the first chunk
 | ||||
|         let mut iter_scriptpubkeys = database | ||||
|             .iter_script_pubkeys(Some(ScriptType::External))? | ||||
|             .into_iter(); | ||||
|         let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|         for item in chunk.into_iter().rev() { | ||||
|             to_check_later.push_front(item); | ||||
|         } | ||||
| 
 | ||||
|         let mut iterating_external = true; | ||||
|         let mut index = 0; | ||||
|         let mut last_found = 0; | ||||
|         while !to_check_later.is_empty() { | ||||
|             trace!("to_check_later size {}", to_check_later.len()); | ||||
| 
 | ||||
|             let until = cmp::min(to_check_later.len(), batch_query_size); | ||||
|             let chunk: Vec<Script> = to_check_later.drain(..until).collect(); | ||||
|             let call_result = self | ||||
|                 .0 | ||||
|                 .as_mut() | ||||
|                 .unwrap() | ||||
|                 .batch_script_get_history(chunk.iter())?; | ||||
| 
 | ||||
|             for (script, history) in chunk.into_iter().zip(call_result.into_iter()) { | ||||
|                 trace!("received history for {:?}, size {}", script, history.len()); | ||||
| 
 | ||||
|                 if !history.is_empty() { | ||||
|                     last_found = index; | ||||
| 
 | ||||
|                     let mut check_later_scripts = self | ||||
|                         .check_history(database, script, history, &mut change_max_deriv)? | ||||
|                         .into_iter() | ||||
|                         .filter(|x| already_checked.insert(x.clone())) | ||||
|                         .collect(); | ||||
|                     to_check_later.append(&mut check_later_scripts); | ||||
|                 } | ||||
| 
 | ||||
|                 index += 1; | ||||
|             } | ||||
| 
 | ||||
|             match iterating_external { | ||||
|                 true if index - last_found >= stop_gap => iterating_external = false, | ||||
|                 true => { | ||||
|                     trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap); | ||||
| 
 | ||||
|                     let chunk: Vec<Script> = | ||||
|                         iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|                     for item in chunk.into_iter().rev() { | ||||
|                         to_check_later.push_front(item); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => {} | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // check utxo
 | ||||
|         // TODO: try to minimize network requests and re-use scripts if possible
 | ||||
|         let mut batch = database.begin_batch(); | ||||
|         for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) { | ||||
|             let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect(); | ||||
|             let call_result = self | ||||
|                 .0 | ||||
|                 .as_mut() | ||||
|                 .unwrap() | ||||
|                 .batch_script_list_unspent(scripts)?; | ||||
| 
 | ||||
|             // check which utxos are actually still unspent
 | ||||
|             for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) { | ||||
|                 debug!( | ||||
|                     "outpoint {:?} is unspent for me, list unspent is {:?}", | ||||
|                     utxo.outpoint, list_unspent | ||||
|                 ); | ||||
| 
 | ||||
|                 let mut spent = true; | ||||
|                 for unspent in list_unspent { | ||||
|                     let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32); | ||||
|                     if utxo.outpoint == res_outpoint { | ||||
|                         spent = false; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 if spent { | ||||
|                     info!("{} not anymore unspent, removing", utxo.outpoint); | ||||
|                     batch.del_utxo(&utxo.outpoint)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0); | ||||
|         let first_ext_new = last_found as u32 + 1; | ||||
|         if first_ext_new > current_ext { | ||||
|             info!("Setting external index to {}", first_ext_new); | ||||
|             database.set_last_index(ScriptType::External, first_ext_new)?; | ||||
|         } | ||||
| 
 | ||||
|         let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0); | ||||
|         let first_int_new = change_max_deriv + 1; | ||||
|         if first_int_new > current_int { | ||||
|             info!("Setting internal index to {}", first_int_new); | ||||
|             database.set_last_index(ScriptType::Internal, first_int_new)?; | ||||
|         } | ||||
| 
 | ||||
|         database.commit_batch(batch)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error> { | ||||
|         Ok(self | ||||
|             .0 | ||||
|             .as_mut() | ||||
|             .unwrap() | ||||
|             .transaction_get(txid) | ||||
|             .map(Option::Some)?) | ||||
|     } | ||||
| 
 | ||||
|     fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error> { | ||||
|         Ok(self | ||||
|             .0 | ||||
|             .as_mut() | ||||
|             .unwrap() | ||||
|             .transaction_broadcast(tx) | ||||
|             .map(|_| ())?) | ||||
|     } | ||||
| 
 | ||||
|     fn get_height(&mut self) -> Result<usize, Error> { | ||||
|         Ok(self | ||||
|             .0 | ||||
|             .as_mut() | ||||
|             .unwrap() | ||||
|             .block_headers_subscribe() | ||||
|             .map(|data| data.height)?) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T: Read + Write> ElectrumBlockchain<T> { | ||||
|     fn check_tx_and_descendant<D: DatabaseUtils + BatchDatabase>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         txid: &Txid, | ||||
|         height: Option<u32>, | ||||
|         cur_script: &Script, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         debug!( | ||||
|             "check_tx_and_descendant of {}, height: {:?}, script: {}", | ||||
|             txid, height, cur_script | ||||
|         ); | ||||
|         let mut updates = database.begin_batch(); | ||||
|         let tx = match database.get_tx(&txid, true)? { | ||||
|             // TODO: do we need the raw?
 | ||||
|             Some(mut saved_tx) => { | ||||
|                 // update the height if it's different (in case of reorg)
 | ||||
|                 if saved_tx.height != height { | ||||
|                     info!( | ||||
|                         "updating height from {:?} to {:?} for tx {}", | ||||
|                         saved_tx.height, height, txid | ||||
|                     ); | ||||
|                     saved_tx.height = height; | ||||
|                     updates.set_tx(&saved_tx)?; | ||||
|                 } | ||||
| 
 | ||||
|                 debug!("already have {} in db, returning the cached version", txid); | ||||
| 
 | ||||
|                 // unwrap since we explicitly ask for the raw_tx, if it's not present something
 | ||||
|                 // went wrong
 | ||||
|                 saved_tx.transaction.unwrap() | ||||
|             } | ||||
|             None => self.0.as_mut().unwrap().transaction_get(&txid)?, | ||||
|         }; | ||||
| 
 | ||||
|         let mut incoming: u64 = 0; | ||||
|         let mut outgoing: u64 = 0; | ||||
| 
 | ||||
|         // look for our own inputs
 | ||||
|         for (i, input) in tx.input.iter().enumerate() { | ||||
|             // the fact that we visit addresses in a BFS fashion starting from the external addresses
 | ||||
|             // should ensure that this query is always consistent (i.e. when we get to call this all
 | ||||
|             // the transactions at a lower depth have already been indexed, so if an outpoint is ours
 | ||||
|             // we are guaranteed to have it in the db).
 | ||||
|             if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { | ||||
|                 if database.is_mine(&previous_output.script_pubkey)? { | ||||
|                     outgoing += previous_output.value; | ||||
| 
 | ||||
|                     debug!("{} input #{} is mine, removing from utxo", txid, i); | ||||
|                     updates.del_utxo(&input.previous_output)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let mut to_check_later = vec![]; | ||||
|         for (i, output) in tx.output.iter().enumerate() { | ||||
|             // this output is ours, we have a path to derive it
 | ||||
|             if let Some((script_type, path)) = | ||||
|                 database.get_path_from_script_pubkey(&output.script_pubkey)? | ||||
|             { | ||||
|                 debug!("{} output #{} is mine, adding utxo", txid, i); | ||||
|                 updates.set_utxo(&UTXO { | ||||
|                     outpoint: OutPoint::new(tx.txid(), i as u32), | ||||
|                     txout: output.clone(), | ||||
|                 })?; | ||||
|                 incoming += output.value; | ||||
| 
 | ||||
|                 if output.script_pubkey != *cur_script { | ||||
|                     debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey); | ||||
|                     to_check_later.push(output.script_pubkey.clone()) | ||||
|                 } | ||||
| 
 | ||||
|                 // derive as many change addrs as external addresses that we've seen
 | ||||
|                 if script_type == ScriptType::Internal | ||||
|                     && u32::from(path.as_ref()[0]) > *change_max_deriv | ||||
|                 { | ||||
|                     *change_max_deriv = u32::from(path.as_ref()[0]); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let tx = TransactionDetails { | ||||
|             txid: tx.txid(), | ||||
|             transaction: Some(tx), | ||||
|             received: incoming, | ||||
|             sent: outgoing, | ||||
|             height, | ||||
|             timestamp: 0, | ||||
|         }; | ||||
|         info!("Saving tx {}", txid); | ||||
|         updates.set_tx(&tx)?; | ||||
| 
 | ||||
|         database.commit_batch(updates)?; | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| 
 | ||||
|     fn check_history<D: DatabaseUtils + BatchDatabase>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         script_pubkey: Script, | ||||
|         txs: Vec<GetHistoryRes>, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         let mut to_check_later = Vec::new(); | ||||
| 
 | ||||
|         debug!( | ||||
|             "history of {} script {} has {} tx", | ||||
|             Address::from_script(&script_pubkey, Network::Testnet).unwrap(), | ||||
|             script_pubkey, | ||||
|             txs.len() | ||||
|         ); | ||||
| 
 | ||||
|         for tx in txs { | ||||
|             let height: Option<u32> = match tx.height { | ||||
|                 0 | -1 => None, | ||||
|                 x => u32::try_from(x).ok(), | ||||
|             }; | ||||
| 
 | ||||
|             to_check_later.extend_from_slice(&self.check_tx_and_descendant( | ||||
|                 database, | ||||
|                 &tx.tx_hash, | ||||
|                 height, | ||||
|                 &script_pubkey, | ||||
|                 change_max_deriv, | ||||
|             )?); | ||||
|         } | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										86
									
								
								src/blockchain/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										86
									
								
								src/blockchain/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,86 @@ | ||||
| use std::collections::HashSet; | ||||
| use std::sync::mpsc::{channel, Receiver, Sender}; | ||||
| 
 | ||||
| use bitcoin::{Transaction, Txid}; | ||||
| 
 | ||||
| use crate::database::{BatchDatabase, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| 
 | ||||
| #[cfg(feature = "electrum")] | ||||
| pub mod electrum; | ||||
| #[cfg(feature = "electrum")] | ||||
| pub use self::electrum::ElectrumBlockchain; | ||||
| 
 | ||||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||||
| pub enum Capability { | ||||
|     FullHistory, | ||||
|     GetAnyTx, | ||||
| } | ||||
| 
 | ||||
| pub trait Blockchain { | ||||
|     fn offline() -> Self; | ||||
| } | ||||
| 
 | ||||
| pub struct OfflineBlockchain; | ||||
| impl Blockchain for OfflineBlockchain { | ||||
|     fn offline() -> Self { | ||||
|         OfflineBlockchain | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub trait OnlineBlockchain: Blockchain { | ||||
|     fn get_capabilities(&self) -> HashSet<Capability>; | ||||
| 
 | ||||
|     fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>( | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         progress_update: P, | ||||
|     ) -> Result<(), Error>; | ||||
|     fn sync<D: BatchDatabase + DatabaseUtils, P: Progress>( | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         progress_update: P, | ||||
|     ) -> Result<(), Error> { | ||||
|         self.setup(stop_gap, database, progress_update) | ||||
|     } | ||||
| 
 | ||||
|     fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error>; | ||||
|     fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error>; | ||||
| 
 | ||||
|     fn get_height(&mut self) -> Result<usize, Error>; | ||||
| } | ||||
| 
 | ||||
| pub type ProgressData = (f32, Option<String>); | ||||
| 
 | ||||
| pub trait Progress { | ||||
|     fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error>; | ||||
| } | ||||
| 
 | ||||
| pub fn progress() -> (Sender<ProgressData>, Receiver<ProgressData>) { | ||||
|     channel() | ||||
| } | ||||
| 
 | ||||
| impl Progress for Sender<ProgressData> { | ||||
|     fn update(&self, progress: f32, message: Option<String>) -> Result<(), Error> { | ||||
|         if progress < 0.0 || progress > 100.0 { | ||||
|             return Err(Error::InvalidProgressValue(progress)); | ||||
|         } | ||||
| 
 | ||||
|         self.send((progress, message)) | ||||
|             .map_err(|_| Error::ProgressUpdateError) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct NoopProgress; | ||||
| 
 | ||||
| pub fn noop_progress() -> NoopProgress { | ||||
|     NoopProgress | ||||
| } | ||||
| 
 | ||||
| impl Progress for NoopProgress { | ||||
|     fn update(&self, _progress: f32, _message: Option<String>) -> Result<(), Error> { | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @ -1,9 +1,6 @@ | ||||
| use std::collections::BTreeMap; | ||||
| use std::convert::{From, TryInto}; | ||||
| use std::ops::Bound::{Excluded, Included}; | ||||
| 
 | ||||
| use serde::Serialize; | ||||
| 
 | ||||
| use bitcoin::consensus::encode::{deserialize, serialize}; | ||||
| use bitcoin::hash_types::Txid; | ||||
| use bitcoin::util::bip32::{ChildNumber, DerivationPath}; | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| use bitcoin::hash_types::Txid; | ||||
| use bitcoin::util::bip32::{ChildNumber, DerivationPath}; | ||||
| use bitcoin::{OutPoint, Script, Transaction}; | ||||
| use bitcoin::{OutPoint, Script, Transaction, TxOut}; | ||||
| 
 | ||||
| use crate::error::Error; | ||||
| use crate::types::*; | ||||
| @ -76,3 +76,34 @@ pub trait BatchDatabase: Database { | ||||
|     fn begin_batch(&self) -> Self::Batch; | ||||
|     fn commit_batch(&mut self, batch: Self::Batch) -> Result<(), Error>; | ||||
| } | ||||
| 
 | ||||
| pub trait DatabaseUtils: Database { | ||||
|     fn is_mine(&self, script: &Script) -> Result<bool, Error> { | ||||
|         self.get_path_from_script_pubkey(script) | ||||
|             .map(|o| o.is_some()) | ||||
|     } | ||||
| 
 | ||||
|     fn get_raw_tx_or<F>(&self, txid: &Txid, f: F) -> Result<Option<Transaction>, Error> | ||||
|     where | ||||
|         F: FnOnce() -> Result<Option<Transaction>, Error>, | ||||
|     { | ||||
|         self.get_tx(txid, true)? | ||||
|             .map(|t| t.transaction) | ||||
|             .flatten() | ||||
|             .map_or_else(f, |t| Ok(Some(t))) | ||||
|     } | ||||
| 
 | ||||
|     fn get_previous_output(&self, outpoint: &OutPoint) -> Result<Option<TxOut>, Error> { | ||||
|         self.get_raw_tx(&outpoint.txid)? | ||||
|             .and_then(|previous_tx| { | ||||
|                 if outpoint.vout as usize >= previous_tx.output.len() { | ||||
|                     Some(Err(Error::InvalidOutpoint(outpoint.clone()))) | ||||
|                 } else { | ||||
|                     Some(Ok(previous_tx.output[outpoint.vout as usize].clone())) | ||||
|                 } | ||||
|             }) | ||||
|             .transpose() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T: Database> DatabaseUtils for T {} | ||||
|  | ||||
| @ -27,6 +27,13 @@ pub enum Error { | ||||
|     InputMissingWitnessScript(usize), | ||||
|     MissingUTXO, | ||||
| 
 | ||||
|     // Blockchain interface errors
 | ||||
|     Uncapable(crate::blockchain::Capability), | ||||
|     InvalidProgressValue(f32), | ||||
|     ProgressUpdateError, | ||||
|     MissingCachedAddresses, | ||||
|     InvalidOutpoint(OutPoint), | ||||
| 
 | ||||
|     Descriptor(crate::descriptor::error::Error), | ||||
| 
 | ||||
|     Encode(bitcoin::consensus::encode::Error), | ||||
|  | ||||
| @ -18,6 +18,7 @@ pub extern crate sled; | ||||
| 
 | ||||
| #[macro_use] | ||||
| pub mod error; | ||||
| pub mod blockchain; | ||||
| pub mod database; | ||||
| pub mod descriptor; | ||||
| pub mod psbt; | ||||
|  | ||||
| @ -1,8 +1,6 @@ | ||||
| use std::cell::RefCell; | ||||
| use std::cmp; | ||||
| use std::collections::{BTreeMap, HashSet, VecDeque}; | ||||
| use std::convert::TryFrom; | ||||
| use std::io::{Read, Write}; | ||||
| use std::collections::{BTreeMap, HashSet}; | ||||
| use std::ops::DerefMut; | ||||
| use std::str::FromStr; | ||||
| use std::time::{Instant, SystemTime, UNIX_EPOCH}; | ||||
| 
 | ||||
| @ -20,39 +18,35 @@ use miniscript::BitcoinSig; | ||||
| #[allow(unused_imports)] | ||||
| use log::{debug, error, info, trace}; | ||||
| 
 | ||||
| pub mod offline_stream; | ||||
| pub mod utils; | ||||
| 
 | ||||
| pub type OfflineWallet<D> = Wallet<offline_stream::OfflineStream, D>; | ||||
| 
 | ||||
| use self::utils::{ChunksIterator, IsDust}; | ||||
| use crate::database::{BatchDatabase, BatchOperations}; | ||||
| use self::utils::IsDust; | ||||
| use crate::blockchain::{noop_progress, Blockchain, OfflineBlockchain, OnlineBlockchain}; | ||||
| use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; | ||||
| use crate::descriptor::{get_checksum, DescriptorMeta, ExtendedDescriptor, ExtractPolicy, Policy}; | ||||
| use crate::error::Error; | ||||
| use crate::psbt::{utils::PSBTUtils, PSBTSatisfier, PSBTSigner}; | ||||
| use crate::signer::Signer; | ||||
| use crate::types::*; | ||||
| 
 | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| use electrum_client::types::*; | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| use electrum_client::Client; | ||||
| #[cfg(not(any(feature = "electrum", feature = "default")))] | ||||
| use std::marker::PhantomData as Client; | ||||
| pub type OfflineWallet<D> = Wallet<OfflineBlockchain, D>; | ||||
| 
 | ||||
| pub struct Wallet<S: Read + Write, D: BatchDatabase> { | ||||
| //#[cfg(feature = "electrum")]
 | ||||
| //pub type ElectrumWallet<S, D> = Wallet<crate::blockchain::ElectrumBlockchain<electrum_client::Client<S>>, D>;
 | ||||
| 
 | ||||
| pub struct Wallet<B: Blockchain, D: BatchDatabase> { | ||||
|     descriptor: ExtendedDescriptor, | ||||
|     change_descriptor: Option<ExtendedDescriptor>, | ||||
|     network: Network, | ||||
| 
 | ||||
|     client: Option<RefCell<Client<S>>>, | ||||
|     client: RefCell<B>, | ||||
|     database: RefCell<D>, | ||||
| } | ||||
| 
 | ||||
| // offline actions, always available
 | ||||
| impl<S, D> Wallet<S, D> | ||||
| impl<B, D> Wallet<B, D> | ||||
| where | ||||
|     S: Read + Write, | ||||
|     B: Blockchain, | ||||
|     D: BatchDatabase, | ||||
| { | ||||
|     pub fn new_offline( | ||||
| @ -88,7 +82,7 @@ where | ||||
|             change_descriptor, | ||||
|             network, | ||||
| 
 | ||||
|             client: None, | ||||
|             client: RefCell::new(B::offline()), | ||||
|             database: RefCell::new(database), | ||||
|         }) | ||||
|     } | ||||
| @ -107,7 +101,7 @@ where | ||||
|     } | ||||
| 
 | ||||
|     pub fn is_mine(&self, script: &Script) -> Result<bool, Error> { | ||||
|         self.get_path(script).map(|x| x.is_some()) | ||||
|         self.database.borrow().is_mine(script) | ||||
|     } | ||||
| 
 | ||||
|     pub fn list_unspent(&self) -> Result<Vec<UTXO>, Error> { | ||||
| @ -510,10 +504,6 @@ where | ||||
|             .as_secs() | ||||
|     } | ||||
| 
 | ||||
|     fn get_path(&self, script: &Script) -> Result<Option<(ScriptType, DerivationPath)>, Error> { | ||||
|         self.database.borrow().get_path_from_script_pubkey(script) | ||||
|     } | ||||
| 
 | ||||
|     fn get_descriptor_for(&self, script_type: ScriptType) -> &ExtendedDescriptor { | ||||
|         let desc = match script_type { | ||||
|             ScriptType::External => &self.descriptor, | ||||
| @ -679,10 +669,9 @@ where | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| impl<S, D> Wallet<S, D> | ||||
| impl<B, D> Wallet<B, D> | ||||
| where | ||||
|     S: Read + Write, | ||||
|     B: OnlineBlockchain, | ||||
|     D: BatchDatabase, | ||||
| { | ||||
|     pub fn new( | ||||
| @ -690,7 +679,7 @@ where | ||||
|         change_descriptor: Option<&str>, | ||||
|         network: Network, | ||||
|         mut database: D, | ||||
|         client: Client<S>, | ||||
|         client: B, | ||||
|     ) -> Result<Self, Error> { | ||||
|         database.check_descriptor_checksum( | ||||
|             ScriptType::External, | ||||
| @ -719,154 +708,15 @@ where | ||||
|             change_descriptor, | ||||
|             network, | ||||
| 
 | ||||
|             client: Some(RefCell::new(client)), | ||||
|             client: RefCell::new(client), | ||||
|             database: RefCell::new(database), | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     fn get_previous_output(&self, outpoint: &OutPoint) -> Option<TxOut> { | ||||
|         // the fact that we visit addresses in a BFS fashion starting from the external addresses
 | ||||
|         // should ensure that this query is always consistent (i.e. when we get to call this all
 | ||||
|         // the transactions at a lower depth have already been indexed, so if an outpoint is ours
 | ||||
|         // we are guaranteed to have it in the db).
 | ||||
|         self.database | ||||
|             .borrow() | ||||
|             .get_raw_tx(&outpoint.txid) | ||||
|             .unwrap() | ||||
|             .map(|previous_tx| previous_tx.output[outpoint.vout as usize].clone()) | ||||
|     } | ||||
| 
 | ||||
|     fn check_tx_and_descendant( | ||||
|         &self, | ||||
|         txid: &Txid, | ||||
|         height: Option<u32>, | ||||
|         cur_script: &Script, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         debug!( | ||||
|             "check_tx_and_descendant of {}, height: {:?}, script: {}", | ||||
|             txid, height, cur_script | ||||
|         ); | ||||
|         let mut updates = self.database.borrow().begin_batch(); | ||||
|         let tx = match self.database.borrow().get_tx(&txid, true)? { | ||||
|             // TODO: do we need the raw?
 | ||||
|             Some(mut saved_tx) => { | ||||
|                 // update the height if it's different (in case of reorg)
 | ||||
|                 if saved_tx.height != height { | ||||
|                     info!( | ||||
|                         "updating height from {:?} to {:?} for tx {}", | ||||
|                         saved_tx.height, height, txid | ||||
|                     ); | ||||
|                     saved_tx.height = height; | ||||
|                     updates.set_tx(&saved_tx)?; | ||||
|                 } | ||||
| 
 | ||||
|                 debug!("already have {} in db, returning the cached version", txid); | ||||
| 
 | ||||
|                 // unwrap since we explicitly ask for the raw_tx, if it's not present something
 | ||||
|                 // went wrong
 | ||||
|                 saved_tx.transaction.unwrap() | ||||
|             } | ||||
|             None => self | ||||
|                 .client | ||||
|                 .as_ref() | ||||
|                 .unwrap() | ||||
|                 .borrow_mut() | ||||
|                 .transaction_get(&txid)?, | ||||
|         }; | ||||
| 
 | ||||
|         let mut incoming: u64 = 0; | ||||
|         let mut outgoing: u64 = 0; | ||||
| 
 | ||||
|         // look for our own inputs
 | ||||
|         for (i, input) in tx.input.iter().enumerate() { | ||||
|             if let Some(previous_output) = self.get_previous_output(&input.previous_output) { | ||||
|                 if self.is_mine(&previous_output.script_pubkey)? { | ||||
|                     outgoing += previous_output.value; | ||||
| 
 | ||||
|                     debug!("{} input #{} is mine, removing from utxo", txid, i); | ||||
|                     updates.del_utxo(&input.previous_output)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let mut to_check_later = vec![]; | ||||
|         for (i, output) in tx.output.iter().enumerate() { | ||||
|             // this output is ours, we have a path to derive it
 | ||||
|             if let Some((script_type, path)) = self.get_path(&output.script_pubkey)? { | ||||
|                 debug!("{} output #{} is mine, adding utxo", txid, i); | ||||
|                 updates.set_utxo(&UTXO { | ||||
|                     outpoint: OutPoint::new(tx.txid(), i as u32), | ||||
|                     txout: output.clone(), | ||||
|                 })?; | ||||
|                 incoming += output.value; | ||||
| 
 | ||||
|                 if output.script_pubkey != *cur_script { | ||||
|                     debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey); | ||||
|                     to_check_later.push(output.script_pubkey.clone()) | ||||
|                 } | ||||
| 
 | ||||
|                 // derive as many change addrs as external addresses that we've seen
 | ||||
|                 if script_type == ScriptType::Internal | ||||
|                     && u32::from(path.as_ref()[0]) > *change_max_deriv | ||||
|                 { | ||||
|                     *change_max_deriv = u32::from(path.as_ref()[0]); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let tx = TransactionDetails { | ||||
|             txid: tx.txid(), | ||||
|             transaction: Some(tx), | ||||
|             received: incoming, | ||||
|             sent: outgoing, | ||||
|             height, | ||||
|             timestamp: 0, | ||||
|         }; | ||||
|         info!("Saving tx {}", txid); | ||||
| 
 | ||||
|         updates.set_tx(&tx)?; | ||||
|         self.database.borrow_mut().commit_batch(updates)?; | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| 
 | ||||
|     fn check_history( | ||||
|         &self, | ||||
|         script_pubkey: Script, | ||||
|         txs: Vec<GetHistoryRes>, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         let mut to_check_later = Vec::new(); | ||||
| 
 | ||||
|         debug!( | ||||
|             "history of {} script {} has {} tx", | ||||
|             Address::from_script(&script_pubkey, self.network).unwrap(), | ||||
|             script_pubkey, | ||||
|             txs.len() | ||||
|         ); | ||||
| 
 | ||||
|         for tx in txs { | ||||
|             let height: Option<u32> = match tx.height { | ||||
|                 0 | -1 => None, | ||||
|                 x => u32::try_from(x).ok(), | ||||
|             }; | ||||
| 
 | ||||
|             to_check_later.extend_from_slice(&self.check_tx_and_descendant( | ||||
|                 &tx.tx_hash, | ||||
|                 height, | ||||
|                 &script_pubkey, | ||||
|                 change_max_deriv, | ||||
|             )?); | ||||
|         } | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| 
 | ||||
|     pub fn sync( | ||||
|         &self, | ||||
|         max_address: Option<u32>, | ||||
|         batch_query_size: Option<usize>, | ||||
|         _batch_query_size: Option<usize>, | ||||
|     ) -> Result<(), Error> { | ||||
|         debug!("begin sync..."); | ||||
|         // TODO: consider taking an RwLock as writere here to prevent other "read-only" calls to
 | ||||
| @ -878,8 +728,8 @@ where | ||||
|             max_address.unwrap_or(100) | ||||
|         }; | ||||
| 
 | ||||
|         let batch_query_size = batch_query_size.unwrap_or(20); | ||||
|         let stop_gap = batch_query_size; | ||||
|         // TODO:
 | ||||
|         // let batch_query_size = batch_query_size.unwrap_or(20);
 | ||||
| 
 | ||||
|         let path = DerivationPath::from(vec![ChildNumber::Normal { index: max_address }]); | ||||
|         let last_addr = self | ||||
| @ -923,154 +773,16 @@ where | ||||
|             self.database.borrow_mut().commit_batch(address_batch)?; | ||||
|         } | ||||
| 
 | ||||
|         // check unconfirmed tx, delete so they are retrieved later
 | ||||
|         let mut del_batch = self.database.borrow().begin_batch(); | ||||
|         for tx in self.database.borrow().iter_txs(false)? { | ||||
|             if tx.height.is_none() { | ||||
|                 del_batch.del_tx(&tx.txid, false)?; | ||||
|             } | ||||
|         } | ||||
|         self.database.borrow_mut().commit_batch(del_batch)?; | ||||
| 
 | ||||
|         // maximum derivation index for a change address that we've seen during sync
 | ||||
|         let mut change_max_deriv = 0; | ||||
| 
 | ||||
|         let mut already_checked: HashSet<Script> = HashSet::new(); | ||||
|         let mut to_check_later = VecDeque::with_capacity(batch_query_size); | ||||
| 
 | ||||
|         // insert the first chunk
 | ||||
|         let mut iter_scriptpubkeys = self | ||||
|             .database | ||||
|             .borrow() | ||||
|             .iter_script_pubkeys(Some(ScriptType::External))? | ||||
|             .into_iter(); | ||||
|         let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|         for item in chunk.into_iter().rev() { | ||||
|             to_check_later.push_front(item); | ||||
|         } | ||||
| 
 | ||||
|         let mut iterating_external = true; | ||||
|         let mut index = 0; | ||||
|         let mut last_found = 0; | ||||
|         while !to_check_later.is_empty() { | ||||
|             trace!("to_check_later size {}", to_check_later.len()); | ||||
| 
 | ||||
|             let until = cmp::min(to_check_later.len(), batch_query_size); | ||||
|             let chunk: Vec<Script> = to_check_later.drain(..until).collect(); | ||||
|             let call_result = self | ||||
|                 .client | ||||
|                 .as_ref() | ||||
|                 .unwrap() | ||||
|                 .borrow_mut() | ||||
|                 .batch_script_get_history(chunk.iter())?; | ||||
| 
 | ||||
|             for (script, history) in chunk.into_iter().zip(call_result.into_iter()) { | ||||
|                 trace!("received history for {:?}, size {}", script, history.len()); | ||||
| 
 | ||||
|                 if !history.is_empty() { | ||||
|                     last_found = index; | ||||
| 
 | ||||
|                     let mut check_later_scripts = self | ||||
|                         .check_history(script, history, &mut change_max_deriv)? | ||||
|                         .into_iter() | ||||
|                         .filter(|x| already_checked.insert(x.clone())) | ||||
|                         .collect(); | ||||
|                     to_check_later.append(&mut check_later_scripts); | ||||
|                 } | ||||
| 
 | ||||
|                 index += 1; | ||||
|             } | ||||
| 
 | ||||
|             match iterating_external { | ||||
|                 true if index - last_found >= stop_gap => iterating_external = false, | ||||
|                 true => { | ||||
|                     trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap); | ||||
| 
 | ||||
|                     let chunk: Vec<Script> = | ||||
|                         iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|                     for item in chunk.into_iter().rev() { | ||||
|                         to_check_later.push_front(item); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => {} | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // check utxo
 | ||||
|         // TODO: try to minimize network requests and re-use scripts if possible
 | ||||
|         let mut batch = self.database.borrow().begin_batch(); | ||||
|         for chunk in ChunksIterator::new( | ||||
|             self.database.borrow().iter_utxos()?.into_iter(), | ||||
|             batch_query_size, | ||||
|         ) { | ||||
|             let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect(); | ||||
|             let call_result = self | ||||
|                 .client | ||||
|                 .as_ref() | ||||
|                 .unwrap() | ||||
|                 .borrow_mut() | ||||
|                 .batch_script_list_unspent(scripts)?; | ||||
| 
 | ||||
|             // check which utxos are actually still unspent
 | ||||
|             for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) { | ||||
|                 debug!( | ||||
|                     "outpoint {:?} is unspent for me, list unspent is {:?}", | ||||
|                     utxo.outpoint, list_unspent | ||||
|                 ); | ||||
| 
 | ||||
|                 let mut spent = true; | ||||
|                 for unspent in list_unspent { | ||||
|                     let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32); | ||||
|                     if utxo.outpoint == res_outpoint { | ||||
|                         spent = false; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 if spent { | ||||
|                     info!("{} not anymore unspent, removing", utxo.outpoint); | ||||
|                     batch.del_utxo(&utxo.outpoint)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let current_ext = self | ||||
|             .database | ||||
|             .borrow() | ||||
|             .get_last_index(ScriptType::External)? | ||||
|             .unwrap_or(0); | ||||
|         let first_ext_new = last_found as u32 + 1; | ||||
|         if first_ext_new > current_ext { | ||||
|             info!("Setting external index to {}", first_ext_new); | ||||
|             self.database | ||||
|                 .borrow_mut() | ||||
|                 .set_last_index(ScriptType::External, first_ext_new)?; | ||||
|         } | ||||
| 
 | ||||
|         let current_int = self | ||||
|             .database | ||||
|             .borrow() | ||||
|             .get_last_index(ScriptType::Internal)? | ||||
|             .unwrap_or(0); | ||||
|         let first_int_new = change_max_deriv + 1; | ||||
|         if first_int_new > current_int { | ||||
|             info!("Setting internal index to {}", first_int_new); | ||||
|             self.database | ||||
|                 .borrow_mut() | ||||
|                 .set_last_index(ScriptType::Internal, first_int_new)?; | ||||
|         } | ||||
| 
 | ||||
|         self.database.borrow_mut().commit_batch(batch)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|         self.client.borrow_mut().sync( | ||||
|             None, | ||||
|             self.database.borrow_mut().deref_mut(), | ||||
|             noop_progress(), | ||||
|         ) | ||||
|     } | ||||
| 
 | ||||
|     pub fn broadcast(&self, psbt: PSBT) -> Result<(Txid, Transaction), Error> { | ||||
|         let extracted = psbt.extract_tx(); | ||||
|         self.client | ||||
|             .as_ref() | ||||
|             .unwrap() | ||||
|             .borrow_mut() | ||||
|             .transaction_broadcast(&extracted)?; | ||||
|         self.client.borrow_mut().broadcast(&extracted)?; | ||||
| 
 | ||||
|         Ok((extracted.txid(), extracted)) | ||||
|     } | ||||
|  | ||||
| @ -1,7 +1,7 @@ | ||||
| use std::io::{self, Error, ErrorKind, Read, Write}; | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct OfflineStream {} | ||||
| pub struct OfflineStream; | ||||
| 
 | ||||
| impl Read for OfflineStream { | ||||
|     fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user