[blockchain] add the Esplora backend
This commit is contained in:
		
							parent
							
								
									de4a6a47e6
								
							
						
					
					
						commit
						95b2cd4c32
					
				| @ -10,6 +10,7 @@ script: | ||||
|   - cargo test --verbose --all | ||||
|   - cargo build --verbose --all | ||||
|   - cargo build --verbose --no-default-features --features=minimal | ||||
|   - cargo build --verbose --no-default-features --features=minimal,esplora | ||||
|   - cargo build --verbose --no-default-features --features=key-value-db | ||||
|   - cargo build --verbose --no-default-features --features=electrum | ||||
| 
 | ||||
|  | ||||
| @ -14,12 +14,14 @@ base64 = "^0.11" | ||||
| # Optional dependencies | ||||
| sled = { version = "0.31.0", optional = true } | ||||
| electrum-client = { version = "0.1.0-beta.5", optional = true } | ||||
| reqwest = { version = "0.10", optional = true, features = ["blocking", "json"] } | ||||
| 
 | ||||
| [features] | ||||
| minimal = [] | ||||
| compiler = ["miniscript/compiler"] | ||||
| default = ["key-value-db", "electrum"] | ||||
| electrum = ["electrum-client"] | ||||
| esplora = ["reqwest"] | ||||
| key-value-db = ["sled"] | ||||
| 
 | ||||
| [dev-dependencies] | ||||
|  | ||||
| @ -1,21 +1,17 @@ | ||||
| use std::cmp; | ||||
| use std::collections::{HashSet, VecDeque}; | ||||
| use std::convert::TryFrom; | ||||
| use std::collections::HashSet; | ||||
| use std::io::{Read, Write}; | ||||
| 
 | ||||
| #[allow(unused_imports)] | ||||
| use log::{debug, error, info, trace}; | ||||
| 
 | ||||
| use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid}; | ||||
| use bitcoin::{Script, Transaction, Txid}; | ||||
| 
 | ||||
| use electrum_client::types::*; | ||||
| use electrum_client::Client; | ||||
| 
 | ||||
| use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync}; | ||||
| use super::*; | ||||
| use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; | ||||
| use crate::database::{BatchDatabase, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| use crate::types::{ScriptType, TransactionDetails, UTXO}; | ||||
| use crate::wallet::utils::ChunksIterator; | ||||
| 
 | ||||
| pub struct ElectrumBlockchain<T: Read + Write>(Option<Client<T>>); | ||||
| 
 | ||||
| @ -46,133 +42,12 @@ impl<T: Read + Write> OnlineBlockchain for ElectrumBlockchain<T> { | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         _progress_update: P, | ||||
|         progress_update: P, | ||||
|     ) -> Result<(), Error> { | ||||
|         // TODO: progress
 | ||||
| 
 | ||||
|         let stop_gap = stop_gap.unwrap_or(20); | ||||
|         let batch_query_size = 20; | ||||
| 
 | ||||
|         // check unconfirmed tx, delete so they are retrieved later
 | ||||
|         let mut del_batch = database.begin_batch(); | ||||
|         for tx in database.iter_txs(false)? { | ||||
|             if tx.height.is_none() { | ||||
|                 del_batch.del_tx(&tx.txid, false)?; | ||||
|             } | ||||
|         } | ||||
|         database.commit_batch(del_batch)?; | ||||
| 
 | ||||
|         // maximum derivation index for a change address that we've seen during sync
 | ||||
|         let mut change_max_deriv = 0; | ||||
| 
 | ||||
|         let mut already_checked: HashSet<Script> = HashSet::new(); | ||||
|         let mut to_check_later = VecDeque::with_capacity(batch_query_size); | ||||
| 
 | ||||
|         // insert the first chunk
 | ||||
|         let mut iter_scriptpubkeys = database | ||||
|             .iter_script_pubkeys(Some(ScriptType::External))? | ||||
|             .into_iter(); | ||||
|         let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|         for item in chunk.into_iter().rev() { | ||||
|             to_check_later.push_front(item); | ||||
|         } | ||||
| 
 | ||||
|         let mut iterating_external = true; | ||||
|         let mut index = 0; | ||||
|         let mut last_found = 0; | ||||
|         while !to_check_later.is_empty() { | ||||
|             trace!("to_check_later size {}", to_check_later.len()); | ||||
| 
 | ||||
|             let until = cmp::min(to_check_later.len(), batch_query_size); | ||||
|             let chunk: Vec<Script> = to_check_later.drain(..until).collect(); | ||||
|             let call_result = self | ||||
|                 .0 | ||||
|                 .as_mut() | ||||
|                 .ok_or(Error::OfflineClient)? | ||||
|                 .batch_script_get_history(chunk.iter())?; | ||||
| 
 | ||||
|             for (script, history) in chunk.into_iter().zip(call_result.into_iter()) { | ||||
|                 trace!("received history for {:?}, size {}", script, history.len()); | ||||
| 
 | ||||
|                 if !history.is_empty() { | ||||
|                     last_found = index; | ||||
| 
 | ||||
|                     let mut check_later_scripts = self | ||||
|                         .check_history(database, script, history, &mut change_max_deriv)? | ||||
|                         .into_iter() | ||||
|                         .filter(|x| already_checked.insert(x.clone())) | ||||
|                         .collect(); | ||||
|                     to_check_later.append(&mut check_later_scripts); | ||||
|                 } | ||||
| 
 | ||||
|                 index += 1; | ||||
|             } | ||||
| 
 | ||||
|             match iterating_external { | ||||
|                 true if index - last_found >= stop_gap => iterating_external = false, | ||||
|                 true => { | ||||
|                     trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap); | ||||
| 
 | ||||
|                     let chunk: Vec<Script> = | ||||
|                         iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|                     for item in chunk.into_iter().rev() { | ||||
|                         to_check_later.push_front(item); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => {} | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // check utxo
 | ||||
|         // TODO: try to minimize network requests and re-use scripts if possible
 | ||||
|         let mut batch = database.begin_batch(); | ||||
|         for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) { | ||||
|             let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect(); | ||||
|             let call_result = self | ||||
|                 .0 | ||||
|                 .as_mut() | ||||
|                 .ok_or(Error::OfflineClient)? | ||||
|                 .batch_script_list_unspent(scripts)?; | ||||
| 
 | ||||
|             // check which utxos are actually still unspent
 | ||||
|             for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) { | ||||
|                 debug!( | ||||
|                     "outpoint {:?} is unspent for me, list unspent is {:?}", | ||||
|                     utxo.outpoint, list_unspent | ||||
|                 ); | ||||
| 
 | ||||
|                 let mut spent = true; | ||||
|                 for unspent in list_unspent { | ||||
|                     let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32); | ||||
|                     if utxo.outpoint == res_outpoint { | ||||
|                         spent = false; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 if spent { | ||||
|                     info!("{} not anymore unspent, removing", utxo.outpoint); | ||||
|                     batch.del_utxo(&utxo.outpoint)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0); | ||||
|         let first_ext_new = last_found as u32 + 1; | ||||
|         if first_ext_new > current_ext { | ||||
|             info!("Setting external index to {}", first_ext_new); | ||||
|             database.set_last_index(ScriptType::External, first_ext_new)?; | ||||
|         } | ||||
| 
 | ||||
|         let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0); | ||||
|         let first_int_new = change_max_deriv + 1; | ||||
|         if first_int_new > current_int { | ||||
|             info!("Setting internal index to {}", first_int_new); | ||||
|             database.set_last_index(ScriptType::Internal, first_int_new)?; | ||||
|         } | ||||
| 
 | ||||
|         database.commit_batch(batch)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|         self.0 | ||||
|             .as_mut() | ||||
|             .ok_or(Error::OfflineClient)? | ||||
|             .electrum_like_setup(stop_gap, database, progress_update) | ||||
|     } | ||||
| 
 | ||||
|     fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error> { | ||||
| @ -205,139 +80,60 @@ impl<T: Read + Write> OnlineBlockchain for ElectrumBlockchain<T> { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T: Read + Write> ElectrumBlockchain<T> { | ||||
|     fn check_tx_and_descendant<D: DatabaseUtils + BatchDatabase>( | ||||
| impl<T: Read + Write> ElectrumLikeSync for Client<T> { | ||||
|     fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         txid: &Txid, | ||||
|         height: Option<u32>, | ||||
|         cur_script: &Script, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         debug!( | ||||
|             "check_tx_and_descendant of {}, height: {:?}, script: {}", | ||||
|             txid, height, cur_script | ||||
|         ); | ||||
|         let mut updates = database.begin_batch(); | ||||
|         let tx = match database.get_tx(&txid, true)? { | ||||
|             // TODO: do we need the raw?
 | ||||
|             Some(mut saved_tx) => { | ||||
|                 // update the height if it's different (in case of reorg)
 | ||||
|                 if saved_tx.height != height { | ||||
|                     info!( | ||||
|                         "updating height from {:?} to {:?} for tx {}", | ||||
|                         saved_tx.height, height, txid | ||||
|                     ); | ||||
|                     saved_tx.height = height; | ||||
|                     updates.set_tx(&saved_tx)?; | ||||
|                 } | ||||
| 
 | ||||
|                 debug!("already have {} in db, returning the cached version", txid); | ||||
| 
 | ||||
|                 // unwrap since we explicitly ask for the raw_tx, if it's not present something
 | ||||
|                 // went wrong
 | ||||
|                 saved_tx.transaction.unwrap() | ||||
|             } | ||||
|             None => self | ||||
|                 .0 | ||||
|                 .as_mut() | ||||
|                 .ok_or(Error::OfflineClient)? | ||||
|                 .transaction_get(&txid)?, | ||||
|         }; | ||||
| 
 | ||||
|         let mut incoming: u64 = 0; | ||||
|         let mut outgoing: u64 = 0; | ||||
| 
 | ||||
|         // look for our own inputs
 | ||||
|         for (i, input) in tx.input.iter().enumerate() { | ||||
|             // the fact that we visit addresses in a BFS fashion starting from the external addresses
 | ||||
|             // should ensure that this query is always consistent (i.e. when we get to call this all
 | ||||
|             // the transactions at a lower depth have already been indexed, so if an outpoint is ours
 | ||||
|             // we are guaranteed to have it in the db).
 | ||||
|             if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { | ||||
|                 if database.is_mine(&previous_output.script_pubkey)? { | ||||
|                     outgoing += previous_output.value; | ||||
| 
 | ||||
|                     debug!("{} input #{} is mine, removing from utxo", txid, i); | ||||
|                     updates.del_utxo(&input.previous_output)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let mut to_check_later = vec![]; | ||||
|         for (i, output) in tx.output.iter().enumerate() { | ||||
|             // this output is ours, we have a path to derive it
 | ||||
|             if let Some((script_type, path)) = | ||||
|                 database.get_path_from_script_pubkey(&output.script_pubkey)? | ||||
|             { | ||||
|                 debug!("{} output #{} is mine, adding utxo", txid, i); | ||||
|                 updates.set_utxo(&UTXO { | ||||
|                     outpoint: OutPoint::new(tx.txid(), i as u32), | ||||
|                     txout: output.clone(), | ||||
|                 })?; | ||||
|                 incoming += output.value; | ||||
| 
 | ||||
|                 if output.script_pubkey != *cur_script { | ||||
|                     debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey); | ||||
|                     to_check_later.push(output.script_pubkey.clone()) | ||||
|                 } | ||||
| 
 | ||||
|                 // derive as many change addrs as external addresses that we've seen
 | ||||
|                 if script_type == ScriptType::Internal | ||||
|                     && u32::from(path.as_ref()[0]) > *change_max_deriv | ||||
|                 { | ||||
|                     *change_max_deriv = u32::from(path.as_ref()[0]); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let tx = TransactionDetails { | ||||
|             txid: tx.txid(), | ||||
|             transaction: Some(tx), | ||||
|             received: incoming, | ||||
|             sent: outgoing, | ||||
|             height, | ||||
|             timestamp: 0, | ||||
|         }; | ||||
|         info!("Saving tx {}", txid); | ||||
|         updates.set_tx(&tx)?; | ||||
| 
 | ||||
|         database.commit_batch(updates)?; | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> { | ||||
|         self.batch_script_get_history(scripts) | ||||
|             .map(|v| { | ||||
|                 v.into_iter() | ||||
|                     .map(|v| { | ||||
|                         v.into_iter() | ||||
|                             .map( | ||||
|                                 |electrum_client::GetHistoryRes { | ||||
|                                      height, tx_hash, .. | ||||
|                                  }| ELSGetHistoryRes { | ||||
|                                     height, | ||||
|                                     tx_hash, | ||||
|                                 }, | ||||
|                             ) | ||||
|                             .collect() | ||||
|                     }) | ||||
|                     .collect() | ||||
|             }) | ||||
|             .map_err(Error::Electrum) | ||||
|     } | ||||
| 
 | ||||
|     fn check_history<D: DatabaseUtils + BatchDatabase>( | ||||
|     fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         script_pubkey: Script, | ||||
|         txs: Vec<GetHistoryRes>, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         let mut to_check_later = Vec::new(); | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> { | ||||
|         self.batch_script_list_unspent(scripts) | ||||
|             .map(|v| { | ||||
|                 v.into_iter() | ||||
|                     .map(|v| { | ||||
|                         v.into_iter() | ||||
|                             .map( | ||||
|                                 |electrum_client::ListUnspentRes { | ||||
|                                      height, | ||||
|                                      tx_hash, | ||||
|                                      tx_pos, | ||||
|                                      .. | ||||
|                                  }| ELSListUnspentRes { | ||||
|                                     height, | ||||
|                                     tx_hash, | ||||
|                                     tx_pos, | ||||
|                                 }, | ||||
|                             ) | ||||
|                             .collect() | ||||
|                     }) | ||||
|                     .collect() | ||||
|             }) | ||||
|             .map_err(Error::Electrum) | ||||
|     } | ||||
| 
 | ||||
|         debug!( | ||||
|             "history of {} script {} has {} tx", | ||||
|             Address::from_script(&script_pubkey, Network::Testnet).unwrap(), | ||||
|             script_pubkey, | ||||
|             txs.len() | ||||
|         ); | ||||
| 
 | ||||
|         for tx in txs { | ||||
|             let height: Option<u32> = match tx.height { | ||||
|                 0 | -1 => None, | ||||
|                 x => u32::try_from(x).ok(), | ||||
|             }; | ||||
| 
 | ||||
|             to_check_later.extend_from_slice(&self.check_tx_and_descendant( | ||||
|                 database, | ||||
|                 &tx.tx_hash, | ||||
|                 height, | ||||
|                 &script_pubkey, | ||||
|                 change_max_deriv, | ||||
|             )?); | ||||
|         } | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     fn els_transaction_get(&mut self, txid: &Txid) -> Result<Transaction, Error> { | ||||
|         self.transaction_get(txid).map_err(Error::Electrum) | ||||
|     } | ||||
| } | ||||
|  | ||||
							
								
								
									
										284
									
								
								src/blockchain/esplora.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										284
									
								
								src/blockchain/esplora.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,284 @@ | ||||
| use std::collections::HashSet; | ||||
| 
 | ||||
| #[allow(unused_imports)] | ||||
| use log::{debug, error, info, trace}; | ||||
| 
 | ||||
| use serde::Deserialize; | ||||
| 
 | ||||
| use reqwest::blocking::Client; | ||||
| use reqwest::StatusCode; | ||||
| 
 | ||||
| use bitcoin::consensus::{deserialize, serialize}; | ||||
| use bitcoin::hashes::hex::ToHex; | ||||
| use bitcoin::hashes::{sha256, Hash}; | ||||
| use bitcoin::{Script, Transaction, Txid}; | ||||
| 
 | ||||
| use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync}; | ||||
| use super::*; | ||||
| use crate::database::{BatchDatabase, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| pub struct UrlClient { | ||||
|     url: String, | ||||
|     client: Client, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| pub struct EsploraBlockchain(Option<UrlClient>); | ||||
| 
 | ||||
| impl std::convert::From<UrlClient> for EsploraBlockchain { | ||||
|     fn from(url_client: UrlClient) -> Self { | ||||
|         EsploraBlockchain(Some(url_client)) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl EsploraBlockchain { | ||||
|     pub fn new(base_url: &str) -> Self { | ||||
|         EsploraBlockchain(Some(UrlClient { | ||||
|             url: base_url.to_string(), | ||||
|             client: Client::new(), | ||||
|         })) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Blockchain for EsploraBlockchain { | ||||
|     fn offline() -> Self { | ||||
|         EsploraBlockchain(None) | ||||
|     } | ||||
| 
 | ||||
|     fn is_online(&self) -> bool { | ||||
|         self.0.is_some() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl OnlineBlockchain for EsploraBlockchain { | ||||
|     fn get_capabilities(&self) -> HashSet<Capability> { | ||||
|         vec![Capability::FullHistory, Capability::GetAnyTx] | ||||
|             .into_iter() | ||||
|             .collect() | ||||
|     } | ||||
| 
 | ||||
|     fn setup<D: BatchDatabase + DatabaseUtils, P: Progress>( | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         progress_update: P, | ||||
|     ) -> Result<(), Error> { | ||||
|         self.0 | ||||
|             .as_mut() | ||||
|             .ok_or(Error::OfflineClient)? | ||||
|             .electrum_like_setup(stop_gap, database, progress_update) | ||||
|     } | ||||
| 
 | ||||
|     fn get_tx(&mut self, txid: &Txid) -> Result<Option<Transaction>, Error> { | ||||
|         Ok(self.0.as_mut().ok_or(Error::OfflineClient)?._get_tx(txid)?) | ||||
|     } | ||||
| 
 | ||||
|     fn broadcast(&mut self, tx: &Transaction) -> Result<(), Error> { | ||||
|         Ok(self | ||||
|             .0 | ||||
|             .as_mut() | ||||
|             .ok_or(Error::OfflineClient)? | ||||
|             ._broadcast(tx)?) | ||||
|     } | ||||
| 
 | ||||
|     fn get_height(&mut self) -> Result<usize, Error> { | ||||
|         Ok(self.0.as_mut().ok_or(Error::OfflineClient)?._get_height()?) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl UrlClient { | ||||
|     fn script_to_scripthash(script: &Script) -> String { | ||||
|         sha256::Hash::hash(script.as_bytes()).into_inner().to_hex() | ||||
|     } | ||||
| 
 | ||||
|     fn _get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, EsploraError> { | ||||
|         let resp = self | ||||
|             .client | ||||
|             .get(&format!("{}/api/tx/{}/raw", self.url, txid)) | ||||
|             .send()?; | ||||
| 
 | ||||
|         if let StatusCode::NOT_FOUND = resp.status() { | ||||
|             return Ok(None); | ||||
|         } | ||||
| 
 | ||||
|         Ok(Some(deserialize(&resp.error_for_status()?.bytes()?)?)) | ||||
|     } | ||||
| 
 | ||||
|     fn _broadcast(&self, transaction: &Transaction) -> Result<(), EsploraError> { | ||||
|         self.client | ||||
|             .post(&format!("{}/api/tx", self.url)) | ||||
|             .body(serialize(transaction).to_hex()) | ||||
|             .send()? | ||||
|             .error_for_status()?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn _get_height(&self) -> Result<usize, EsploraError> { | ||||
|         Ok(self | ||||
|             .client | ||||
|             .get(&format!("{}/api/blocks/tip/height", self.url)) | ||||
|             .send()? | ||||
|             .error_for_status()? | ||||
|             .text()? | ||||
|             .parse()?) | ||||
|     } | ||||
| 
 | ||||
|     fn _script_get_history(&self, script: &Script) -> Result<Vec<ELSGetHistoryRes>, EsploraError> { | ||||
|         let mut result = Vec::new(); | ||||
|         let scripthash = Self::script_to_scripthash(script); | ||||
| 
 | ||||
|         // Add the unconfirmed transactions first
 | ||||
|         result.extend( | ||||
|             self.client | ||||
|                 .get(&format!( | ||||
|                     "{}/api/scripthash/{}/txs/mempool", | ||||
|                     self.url, scripthash | ||||
|                 )) | ||||
|                 .send()? | ||||
|                 .error_for_status()? | ||||
|                 .json::<Vec<EsploraGetHistory>>()? | ||||
|                 .into_iter() | ||||
|                 .map(|x| ELSGetHistoryRes { | ||||
|                     tx_hash: x.txid, | ||||
|                     height: x.status.block_height.unwrap_or(0) as i32, | ||||
|                 }), | ||||
|         ); | ||||
| 
 | ||||
|         debug!( | ||||
|             "Found {} mempool txs for {} - {:?}", | ||||
|             result.len(), | ||||
|             scripthash, | ||||
|             script | ||||
|         ); | ||||
| 
 | ||||
|         // Then go through all the pages of confirmed transactions
 | ||||
|         let mut last_txid = String::new(); | ||||
|         loop { | ||||
|             let response = self | ||||
|                 .client | ||||
|                 .get(&format!( | ||||
|                     "{}/api/scripthash/{}/txs/chain/{}", | ||||
|                     self.url, scripthash, last_txid | ||||
|                 )) | ||||
|                 .send()? | ||||
|                 .error_for_status()? | ||||
|                 .json::<Vec<EsploraGetHistory>>()?; | ||||
|             let len = response.len(); | ||||
|             if let Some(elem) = response.last() { | ||||
|                 last_txid = elem.txid.to_hex(); | ||||
|             } | ||||
| 
 | ||||
|             debug!("... adding {} confirmed transactions", len); | ||||
| 
 | ||||
|             result.extend(response.into_iter().map(|x| ELSGetHistoryRes { | ||||
|                 tx_hash: x.txid, | ||||
|                 height: x.status.block_height.unwrap_or(0) as i32, | ||||
|             })); | ||||
| 
 | ||||
|             if len < 25 { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Ok(result) | ||||
|     } | ||||
| 
 | ||||
|     fn _script_list_unspent( | ||||
|         &self, | ||||
|         script: &Script, | ||||
|     ) -> Result<Vec<ELSListUnspentRes>, EsploraError> { | ||||
|         Ok(self | ||||
|             .client | ||||
|             .get(&format!( | ||||
|                 "{}/api/scripthash/{}/utxo", | ||||
|                 self.url, | ||||
|                 Self::script_to_scripthash(script) | ||||
|             )) | ||||
|             .send()? | ||||
|             .error_for_status()? | ||||
|             .json::<Vec<EsploraListUnspent>>()? | ||||
|             .into_iter() | ||||
|             .map(|x| ELSListUnspentRes { | ||||
|                 tx_hash: x.txid, | ||||
|                 height: x.status.block_height.unwrap_or(0), | ||||
|                 tx_pos: x.vout, | ||||
|             }) | ||||
|             .collect()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl ElectrumLikeSync for UrlClient { | ||||
|     fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error> { | ||||
|         Ok(scripts | ||||
|             .into_iter() | ||||
|             .map(|script| self._script_get_history(script)) | ||||
|             .collect::<Result<Vec<_>, _>>()?) | ||||
|     } | ||||
| 
 | ||||
|     fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> { | ||||
|         Ok(scripts | ||||
|             .into_iter() | ||||
|             .map(|script| self._script_list_unspent(script)) | ||||
|             .collect::<Result<Vec<_>, _>>()?) | ||||
|     } | ||||
| 
 | ||||
|     fn els_transaction_get(&mut self, txid: &Txid) -> Result<Transaction, Error> { | ||||
|         Ok(self | ||||
|             ._get_tx(txid)? | ||||
|             .ok_or_else(|| EsploraError::TransactionNotFound(*txid))?) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Deserialize)] | ||||
| struct EsploraGetHistoryStatus { | ||||
|     block_height: Option<usize>, | ||||
| } | ||||
| 
 | ||||
| #[derive(Deserialize)] | ||||
| struct EsploraGetHistory { | ||||
|     txid: Txid, | ||||
|     status: EsploraGetHistoryStatus, | ||||
| } | ||||
| 
 | ||||
| #[derive(Deserialize)] | ||||
| struct EsploraListUnspent { | ||||
|     txid: Txid, | ||||
|     vout: usize, | ||||
|     status: EsploraGetHistoryStatus, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| pub enum EsploraError { | ||||
|     Reqwest(reqwest::Error), | ||||
|     Parsing(std::num::ParseIntError), | ||||
|     BitcoinEncoding(bitcoin::consensus::encode::Error), | ||||
| 
 | ||||
|     TransactionNotFound(Txid), | ||||
| } | ||||
| 
 | ||||
| impl From<reqwest::Error> for EsploraError { | ||||
|     fn from(other: reqwest::Error) -> Self { | ||||
|         EsploraError::Reqwest(other) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<std::num::ParseIntError> for EsploraError { | ||||
|     fn from(other: std::num::ParseIntError) -> Self { | ||||
|         EsploraError::Parsing(other) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<bitcoin::consensus::encode::Error> for EsploraError { | ||||
|     fn from(other: bitcoin::consensus::encode::Error) -> Self { | ||||
|         EsploraError::BitcoinEncoding(other) | ||||
|     } | ||||
| } | ||||
| @ -6,11 +6,18 @@ use bitcoin::{Transaction, Txid}; | ||||
| use crate::database::{BatchDatabase, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| 
 | ||||
| pub mod utils; | ||||
| 
 | ||||
| #[cfg(feature = "electrum")] | ||||
| pub mod electrum; | ||||
| #[cfg(feature = "electrum")] | ||||
| pub use self::electrum::ElectrumBlockchain; | ||||
| 
 | ||||
| #[cfg(feature = "esplora")] | ||||
| pub mod esplora; | ||||
| #[cfg(feature = "esplora")] | ||||
| pub use self::esplora::EsploraBlockchain; | ||||
| 
 | ||||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||||
| pub enum Capability { | ||||
|     FullHistory, | ||||
|  | ||||
							
								
								
									
										300
									
								
								src/blockchain/utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										300
									
								
								src/blockchain/utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,300 @@ | ||||
| use std::cmp; | ||||
| use std::collections::{HashSet, VecDeque}; | ||||
| use std::convert::TryFrom; | ||||
| 
 | ||||
| #[allow(unused_imports)] | ||||
| use log::{debug, error, info, trace}; | ||||
| 
 | ||||
| use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid}; | ||||
| 
 | ||||
| use super::*; | ||||
| use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils}; | ||||
| use crate::error::Error; | ||||
| use crate::types::{ScriptType, TransactionDetails, UTXO}; | ||||
| use crate::wallet::utils::ChunksIterator; | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| pub struct ELSGetHistoryRes { | ||||
|     pub height: i32, | ||||
|     pub tx_hash: Txid, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| pub struct ELSListUnspentRes { | ||||
|     pub height: usize, | ||||
|     pub tx_hash: Txid, | ||||
|     pub tx_pos: usize, | ||||
| } | ||||
| 
 | ||||
| /// Implements the synchronization logic for an Electrum-like client.
 | ||||
| pub trait ElectrumLikeSync { | ||||
|     fn els_batch_script_get_history<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error>; | ||||
| 
 | ||||
|     fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>( | ||||
|         &mut self, | ||||
|         scripts: I, | ||||
|     ) -> Result<Vec<Vec<ELSListUnspentRes>>, Error>; | ||||
| 
 | ||||
|     fn els_transaction_get(&mut self, txid: &Txid) -> Result<Transaction, Error>; | ||||
| 
 | ||||
|     // Provided methods down here...
 | ||||
| 
 | ||||
|     fn electrum_like_setup<D: BatchDatabase + DatabaseUtils, P: Progress>( | ||||
|         &mut self, | ||||
|         stop_gap: Option<usize>, | ||||
|         database: &mut D, | ||||
|         _progress_update: P, | ||||
|     ) -> Result<(), Error> { | ||||
|         // TODO: progress
 | ||||
| 
 | ||||
|         let stop_gap = stop_gap.unwrap_or(20); | ||||
|         let batch_query_size = 20; | ||||
| 
 | ||||
|         // check unconfirmed tx, delete so they are retrieved later
 | ||||
|         let mut del_batch = database.begin_batch(); | ||||
|         for tx in database.iter_txs(false)? { | ||||
|             if tx.height.is_none() { | ||||
|                 del_batch.del_tx(&tx.txid, false)?; | ||||
|             } | ||||
|         } | ||||
|         database.commit_batch(del_batch)?; | ||||
| 
 | ||||
|         // maximum derivation index for a change address that we've seen during sync
 | ||||
|         let mut change_max_deriv = 0; | ||||
| 
 | ||||
|         let mut already_checked: HashSet<Script> = HashSet::new(); | ||||
|         let mut to_check_later = VecDeque::with_capacity(batch_query_size); | ||||
| 
 | ||||
|         // insert the first chunk
 | ||||
|         let mut iter_scriptpubkeys = database | ||||
|             .iter_script_pubkeys(Some(ScriptType::External))? | ||||
|             .into_iter(); | ||||
|         let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|         for item in chunk.into_iter().rev() { | ||||
|             to_check_later.push_front(item); | ||||
|         } | ||||
| 
 | ||||
|         let mut iterating_external = true; | ||||
|         let mut index = 0; | ||||
|         let mut last_found = 0; | ||||
|         while !to_check_later.is_empty() { | ||||
|             trace!("to_check_later size {}", to_check_later.len()); | ||||
| 
 | ||||
|             let until = cmp::min(to_check_later.len(), batch_query_size); | ||||
|             let chunk: Vec<Script> = to_check_later.drain(..until).collect(); | ||||
|             let call_result = self.els_batch_script_get_history(chunk.iter())?; | ||||
| 
 | ||||
|             for (script, history) in chunk.into_iter().zip(call_result.into_iter()) { | ||||
|                 trace!("received history for {:?}, size {}", script, history.len()); | ||||
| 
 | ||||
|                 if !history.is_empty() { | ||||
|                     last_found = index; | ||||
| 
 | ||||
|                     let mut check_later_scripts = self | ||||
|                         .check_history(database, script, history, &mut change_max_deriv)? | ||||
|                         .into_iter() | ||||
|                         .filter(|x| already_checked.insert(x.clone())) | ||||
|                         .collect(); | ||||
|                     to_check_later.append(&mut check_later_scripts); | ||||
|                 } | ||||
| 
 | ||||
|                 index += 1; | ||||
|             } | ||||
| 
 | ||||
|             match iterating_external { | ||||
|                 true if index - last_found >= stop_gap => iterating_external = false, | ||||
|                 true => { | ||||
|                     trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {}, stop_gap = {}", index, last_found, stop_gap); | ||||
| 
 | ||||
|                     let chunk: Vec<Script> = | ||||
|                         iter_scriptpubkeys.by_ref().take(batch_query_size).collect(); | ||||
|                     for item in chunk.into_iter().rev() { | ||||
|                         to_check_later.push_front(item); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => {} | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // check utxo
 | ||||
|         // TODO: try to minimize network requests and re-use scripts if possible
 | ||||
|         let mut batch = database.begin_batch(); | ||||
|         for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) { | ||||
|             let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect(); | ||||
|             let call_result = self.els_batch_script_list_unspent(scripts)?; | ||||
| 
 | ||||
|             // check which utxos are actually still unspent
 | ||||
|             for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) { | ||||
|                 debug!( | ||||
|                     "outpoint {:?} is unspent for me, list unspent is {:?}", | ||||
|                     utxo.outpoint, list_unspent | ||||
|                 ); | ||||
| 
 | ||||
|                 let mut spent = true; | ||||
|                 for unspent in list_unspent { | ||||
|                     let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32); | ||||
|                     if utxo.outpoint == res_outpoint { | ||||
|                         spent = false; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 if spent { | ||||
|                     info!("{} not anymore unspent, removing", utxo.outpoint); | ||||
|                     batch.del_utxo(&utxo.outpoint)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0); | ||||
|         let first_ext_new = last_found as u32 + 1; | ||||
|         if first_ext_new > current_ext { | ||||
|             info!("Setting external index to {}", first_ext_new); | ||||
|             database.set_last_index(ScriptType::External, first_ext_new)?; | ||||
|         } | ||||
| 
 | ||||
|         let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0); | ||||
|         let first_int_new = change_max_deriv + 1; | ||||
|         if first_int_new > current_int { | ||||
|             info!("Setting internal index to {}", first_int_new); | ||||
|             database.set_last_index(ScriptType::Internal, first_int_new)?; | ||||
|         } | ||||
| 
 | ||||
|         database.commit_batch(batch)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn check_tx_and_descendant<D: DatabaseUtils + BatchDatabase>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         txid: &Txid, | ||||
|         height: Option<u32>, | ||||
|         cur_script: &Script, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         debug!( | ||||
|             "check_tx_and_descendant of {}, height: {:?}, script: {}", | ||||
|             txid, height, cur_script | ||||
|         ); | ||||
|         let mut updates = database.begin_batch(); | ||||
|         let tx = match database.get_tx(&txid, true)? { | ||||
|             // TODO: do we need the raw?
 | ||||
|             Some(mut saved_tx) => { | ||||
|                 // update the height if it's different (in case of reorg)
 | ||||
|                 if saved_tx.height != height { | ||||
|                     info!( | ||||
|                         "updating height from {:?} to {:?} for tx {}", | ||||
|                         saved_tx.height, height, txid | ||||
|                     ); | ||||
|                     saved_tx.height = height; | ||||
|                     updates.set_tx(&saved_tx)?; | ||||
|                 } | ||||
| 
 | ||||
|                 debug!("already have {} in db, returning the cached version", txid); | ||||
| 
 | ||||
|                 // unwrap since we explicitly ask for the raw_tx, if it's not present something
 | ||||
|                 // went wrong
 | ||||
|                 saved_tx.transaction.unwrap() | ||||
|             } | ||||
|             None => self.els_transaction_get(&txid)?, | ||||
|         }; | ||||
| 
 | ||||
|         let mut incoming: u64 = 0; | ||||
|         let mut outgoing: u64 = 0; | ||||
| 
 | ||||
|         // look for our own inputs
 | ||||
|         for (i, input) in tx.input.iter().enumerate() { | ||||
|             // the fact that we visit addresses in a BFS fashion starting from the external addresses
 | ||||
|             // should ensure that this query is always consistent (i.e. when we get to call this all
 | ||||
|             // the transactions at a lower depth have already been indexed, so if an outpoint is ours
 | ||||
|             // we are guaranteed to have it in the db).
 | ||||
|             if let Some(previous_output) = database.get_previous_output(&input.previous_output)? { | ||||
|                 if database.is_mine(&previous_output.script_pubkey)? { | ||||
|                     outgoing += previous_output.value; | ||||
| 
 | ||||
|                     debug!("{} input #{} is mine, removing from utxo", txid, i); | ||||
|                     updates.del_utxo(&input.previous_output)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let mut to_check_later = vec![]; | ||||
|         for (i, output) in tx.output.iter().enumerate() { | ||||
|             // this output is ours, we have a path to derive it
 | ||||
|             if let Some((script_type, path)) = | ||||
|                 database.get_path_from_script_pubkey(&output.script_pubkey)? | ||||
|             { | ||||
|                 debug!("{} output #{} is mine, adding utxo", txid, i); | ||||
|                 updates.set_utxo(&UTXO { | ||||
|                     outpoint: OutPoint::new(tx.txid(), i as u32), | ||||
|                     txout: output.clone(), | ||||
|                 })?; | ||||
|                 incoming += output.value; | ||||
| 
 | ||||
|                 if output.script_pubkey != *cur_script { | ||||
|                     debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey); | ||||
|                     to_check_later.push(output.script_pubkey.clone()) | ||||
|                 } | ||||
| 
 | ||||
|                 // derive as many change addrs as external addresses that we've seen
 | ||||
|                 if script_type == ScriptType::Internal | ||||
|                     && u32::from(path.as_ref()[0]) > *change_max_deriv | ||||
|                 { | ||||
|                     *change_max_deriv = u32::from(path.as_ref()[0]); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let tx = TransactionDetails { | ||||
|             txid: tx.txid(), | ||||
|             transaction: Some(tx), | ||||
|             received: incoming, | ||||
|             sent: outgoing, | ||||
|             height, | ||||
|             timestamp: 0, | ||||
|         }; | ||||
|         info!("Saving tx {}", txid); | ||||
|         updates.set_tx(&tx)?; | ||||
| 
 | ||||
|         database.commit_batch(updates)?; | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| 
 | ||||
|     fn check_history<D: DatabaseUtils + BatchDatabase>( | ||||
|         &mut self, | ||||
|         database: &mut D, | ||||
|         script_pubkey: Script, | ||||
|         txs: Vec<ELSGetHistoryRes>, | ||||
|         change_max_deriv: &mut u32, | ||||
|     ) -> Result<Vec<Script>, Error> { | ||||
|         let mut to_check_later = Vec::new(); | ||||
| 
 | ||||
|         debug!( | ||||
|             "history of {} script {} has {} tx", | ||||
|             Address::from_script(&script_pubkey, Network::Testnet).unwrap(), | ||||
|             script_pubkey, | ||||
|             txs.len() | ||||
|         ); | ||||
| 
 | ||||
|         for tx in txs { | ||||
|             let height: Option<u32> = match tx.height { | ||||
|                 0 | -1 => None, | ||||
|                 x => u32::try_from(x).ok(), | ||||
|             }; | ||||
| 
 | ||||
|             to_check_later.extend_from_slice(&self.check_tx_and_descendant( | ||||
|                 database, | ||||
|                 &tx.tx_hash, | ||||
|                 height, | ||||
|                 &script_pubkey, | ||||
|                 change_max_deriv, | ||||
|             )?); | ||||
|         } | ||||
| 
 | ||||
|         Ok(to_check_later) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										12
									
								
								src/error.rs
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								src/error.rs
									
									
									
									
									
								
							| @ -44,9 +44,11 @@ pub enum Error { | ||||
|     Hex(bitcoin::hashes::hex::Error), | ||||
|     PSBT(bitcoin::util::psbt::Error), | ||||
| 
 | ||||
|     #[cfg(any(feature = "electrum", feature = "default"))] | ||||
|     #[cfg(feature = "electrum")] | ||||
|     Electrum(electrum_client::Error), | ||||
|     #[cfg(any(feature = "key-value-db", feature = "default"))] | ||||
|     #[cfg(feature = "esplora")] | ||||
|     Esplora(crate::blockchain::esplora::EsploraError), | ||||
|     #[cfg(feature = "key-value-db")] | ||||
|     Sled(sled::Error), | ||||
| } | ||||
| 
 | ||||
| @ -73,7 +75,9 @@ impl_error!(serde_json::Error, JSON); | ||||
| impl_error!(bitcoin::hashes::hex::Error, Hex); | ||||
| impl_error!(bitcoin::util::psbt::Error, PSBT); | ||||
| 
 | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| #[cfg(feature = "electrum")] | ||||
| impl_error!(electrum_client::Error, Electrum); | ||||
| #[cfg(any(feature = "key-value-db", feature = "default"))] | ||||
| #[cfg(feature = "esplora")] | ||||
| impl_error!(crate::blockchain::esplora::EsploraError, Esplora); | ||||
| #[cfg(feature = "key-value-db")] | ||||
| impl_error!(sled::Error, Sled); | ||||
|  | ||||
							
								
								
									
										12
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								src/lib.rs
									
									
									
									
									
								
							| @ -9,11 +9,17 @@ extern crate serde_json; | ||||
| #[macro_use] | ||||
| extern crate lazy_static; | ||||
| 
 | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| #[cfg(feature = "electrum")] | ||||
| pub extern crate electrum_client; | ||||
| #[cfg(any(feature = "electrum", feature = "default"))] | ||||
| #[cfg(feature = "electrum")] | ||||
| pub use electrum_client::client::Client; | ||||
| #[cfg(any(feature = "key-value-db", feature = "default"))] | ||||
| 
 | ||||
| #[cfg(feature = "esplora")] | ||||
| pub extern crate reqwest; | ||||
| #[cfg(feature = "esplora")] | ||||
| pub use blockchain::esplora::EsploraBlockchain; | ||||
| 
 | ||||
| #[cfg(feature = "key-value-db")] | ||||
| pub extern crate sled; | ||||
| 
 | ||||
| #[macro_use] | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user