[sync] Improve sync

Make every request in batch, to save round trip times
Fetch timestamp of blockheader to populate timestamp field in transaction
Remove listunspent requests because we can compute it from our history
This commit is contained in:
Riccardo Casatta 2020-11-16 12:18:34 +01:00
parent 755d76bf54
commit c5dba115a0
No known key found for this signature in database
GPG Key ID: FD986A969E450397
4 changed files with 440 additions and 345 deletions

View File

@ -42,11 +42,11 @@ use std::collections::HashSet;
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use bitcoin::{Script, Transaction, Txid};
use bitcoin::{BlockHeader, Script, Transaction, Txid};
use electrum_client::{Client, ElectrumApi};
use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync};
use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
use super::*;
use crate::database::BatchDatabase;
use crate::error::Error;
@ -141,36 +141,18 @@ impl ElectrumLikeSync for Client {
.map_err(Error::Electrum)
}
fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> {
self.batch_script_list_unspent(scripts)
.map(|v| {
v.into_iter()
.map(|v| {
v.into_iter()
.map(
|electrum_client::ListUnspentRes {
height,
tx_hash,
tx_pos,
..
}| ELSListUnspentRes {
height,
tx_hash,
tx_pos,
},
)
.collect()
})
.collect()
})
.map_err(Error::Electrum)
txids: I,
) -> Result<Vec<Transaction>, Error> {
self.batch_transaction_get(txids).map_err(Error::Electrum)
}
fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error> {
self.transaction_get(txid).map_err(Error::Electrum)
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
self.batch_block_header(heights).map_err(Error::Electrum)
}
}

View File

@ -48,15 +48,16 @@ use serde::Deserialize;
use reqwest::{Client, StatusCode};
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::hashes::hex::ToHex;
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::{sha256, Hash};
use bitcoin::{Script, Transaction, Txid};
use bitcoin::{BlockHash, BlockHeader, Script, Transaction, TxMerkleNode, Txid};
use self::utils::{ELSGetHistoryRes, ELSListUnspentRes, ElectrumLikeSync};
use self::utils::{ELSGetHistoryRes, ElectrumLikeSync};
use super::*;
use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
use std::convert::TryInto;
#[derive(Debug)]
struct UrlClient {
@ -161,6 +162,39 @@ impl UrlClient {
Ok(Some(deserialize(&resp.error_for_status()?.bytes().await?)?))
}
async fn _get_tx_no_opt(&self, txid: &Txid) -> Result<Transaction, EsploraError> {
match self._get_tx(txid).await {
Ok(Some(tx)) => Ok(tx),
Ok(None) => Err(EsploraError::TransactionNotFound(*txid)),
Err(e) => Err(e),
}
}
async fn _get_header(&self, block_height: u32) -> Result<BlockHeader, EsploraError> {
let resp = self
.client
.get(&format!("{}/block-height/{}", self.url, block_height))
.send()
.await?;
if let StatusCode::NOT_FOUND = resp.status() {
return Err(EsploraError::HeaderHeightNotFound(block_height));
}
let bytes = resp.bytes().await?;
let hash = std::str::from_utf8(&bytes)
.map_err(|_| EsploraError::HeaderHeightNotFound(block_height))?;
let resp = self
.client
.get(&format!("{}/block/{}", self.url, hash))
.send()
.await?;
let esplora_header = resp.json::<EsploraHeader>().await?;
Ok(esplora_header.try_into()?)
}
async fn _broadcast(&self, transaction: &Transaction) -> Result<(), EsploraError> {
self.client
.post(&format!("{}/tx", self.url))
@ -249,31 +283,6 @@ impl UrlClient {
Ok(result)
}
async fn _script_list_unspent(
&self,
script: &Script,
) -> Result<Vec<ELSListUnspentRes>, EsploraError> {
Ok(self
.client
.get(&format!(
"{}/scripthash/{}/utxo",
self.url,
Self::script_to_scripthash(script)
))
.send()
.await?
.error_for_status()?
.json::<Vec<EsploraListUnspent>>()
.await?
.into_iter()
.map(|x| ELSListUnspentRes {
tx_hash: x.txid,
height: x.status.block_height.unwrap_or(0),
tx_pos: x.vout,
})
.collect())
}
async fn _get_fee_estimates(&self) -> Result<HashMap<String, f64>, EsploraError> {
Ok(self
.client
@ -302,13 +311,13 @@ impl ElectrumLikeSync for UrlClient {
await_or_block!(future)
}
fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ELSListUnspentRes>>, Error> {
txids: I,
) -> Result<Vec<Transaction>, Error> {
let future = async {
Ok(stream::iter(scripts)
.then(|script| self._script_list_unspent(&script))
Ok(stream::iter(txids)
.then(|txid| self._get_tx_no_opt(&txid))
.try_collect()
.await?)
};
@ -316,9 +325,18 @@ impl ElectrumLikeSync for UrlClient {
await_or_block!(future)
}
fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error> {
Ok(await_or_block!(self._get_tx(txid))?
.ok_or_else(|| EsploraError::TransactionNotFound(*txid))?)
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error> {
let future = async {
Ok(stream::iter(heights)
.then(|h| self._get_header(h))
.try_collect()
.await?)
};
await_or_block!(future)
}
}
@ -333,11 +351,37 @@ struct EsploraGetHistory {
status: EsploraGetHistoryStatus,
}
#[derive(Deserialize)]
struct EsploraListUnspent {
txid: Txid,
vout: usize,
status: EsploraGetHistoryStatus,
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
pub struct EsploraHeader {
pub id: String,
pub height: u32,
pub version: i32,
pub timestamp: u32,
pub tx_count: u32,
pub size: u32,
pub weight: u32,
pub merkle_root: String,
pub previousblockhash: String,
pub nonce: u32,
pub bits: u32,
pub difficulty: u32,
}
impl TryInto<BlockHeader> for EsploraHeader {
type Error = EsploraError;
fn try_into(self) -> Result<BlockHeader, esplora::EsploraError> {
Ok(BlockHeader {
version: self.version,
prev_blockhash: BlockHash::from_hex(&self.previousblockhash)
.map_err(|_| EsploraError::HeaderParseFail)?,
merkle_root: TxMerkleNode::from_hex(&self.merkle_root)
.map_err(|_| EsploraError::HeaderParseFail)?,
time: self.timestamp,
bits: self.bits,
nonce: self.nonce,
})
}
}
/// Configuration for an [`EsploraBlockchain`]
@ -366,6 +410,12 @@ pub enum EsploraError {
/// Transaction not found
TransactionNotFound(Txid),
/// Header height not found
HeaderHeightNotFound(u32),
/// Header hash not found
HeaderHashNotFound(BlockHash),
/// EsploraHeader cannot be converted in BlockHeader
HeaderParseFail,
}
impl fmt::Display for EsploraError {
@ -393,3 +443,23 @@ impl From<bitcoin::consensus::encode::Error> for EsploraError {
EsploraError::BitcoinEncoding(other)
}
}
#[cfg(test)]
mod test {
use crate::blockchain::esplora::EsploraHeader;
use bitcoin::hashes::hex::FromHex;
use bitcoin::{BlockHash, BlockHeader};
use std::convert::TryInto;
#[test]
fn test_esplora_header() {
let json_str = r#"{"id":"00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206","height":1,"version":1,"timestamp":1296688928,"tx_count":1,"size":190,"weight":760,"merkle_root":"f0315ffc38709d70ad5647e22048358dd3745f3ce3874223c80a7c92fab0c8ba","previousblockhash":"000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943","nonce":1924588547,"bits":486604799,"difficulty":1}"#;
let json: EsploraHeader = serde_json::from_str(&json_str).unwrap();
let header: BlockHeader = json.try_into().unwrap();
assert_eq!(
header.block_hash(),
BlockHash::from_hex("00000000b873e79784647a6c82962c70d228557d24a747ea4d1b8bbe878e1206")
.unwrap()
);
}
}

View File

@ -40,6 +40,7 @@ use crate::database::BatchDatabase;
use crate::error::Error;
use crate::FeeRate;
#[cfg(any(feature = "electrum", feature = "esplora"))]
pub(crate) mod utils;
#[cfg(any(feature = "electrum", feature = "esplora", feature = "compact_filters"))]

View File

@ -22,20 +22,21 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
use std::cmp;
use std::collections::{HashSet, VecDeque};
use std::convert::TryFrom;
use std::collections::{HashMap, HashSet};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use bitcoin::{Address, Network, OutPoint, Script, Transaction, Txid};
use bitcoin::{BlockHeader, OutPoint, Script, Transaction, Txid};
use super::*;
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::error::Error;
use crate::types::{ScriptType, TransactionDetails, UTXO};
use crate::wallet::utils::ChunksIterator;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::time::Instant;
#[derive(Debug)]
pub struct ELSGetHistoryRes {
@ -43,13 +44,6 @@ pub struct ELSGetHistoryRes {
pub tx_hash: Txid,
}
#[derive(Debug)]
pub struct ELSListUnspentRes {
pub height: usize,
pub tx_hash: Txid,
pub tx_pos: usize,
}
/// Implements the synchronization logic for an Electrum-like client.
#[maybe_async]
pub trait ElectrumLikeSync {
@ -58,306 +52,354 @@ pub trait ElectrumLikeSync {
scripts: I,
) -> Result<Vec<Vec<ELSGetHistoryRes>>, Error>;
fn els_batch_script_list_unspent<'s, I: IntoIterator<Item = &'s Script>>(
fn els_batch_transaction_get<'s, I: IntoIterator<Item = &'s Txid>>(
&self,
scripts: I,
) -> Result<Vec<Vec<ELSListUnspentRes>>, Error>;
txids: I,
) -> Result<Vec<Transaction>, Error>;
fn els_transaction_get(&self, txid: &Txid) -> Result<Transaction, Error>;
fn els_batch_block_header<I: IntoIterator<Item = u32>>(
&self,
heights: I,
) -> Result<Vec<BlockHeader>, Error>;
// Provided methods down here...
fn electrum_like_setup<D: BatchDatabase, P: Progress>(
&self,
stop_gap: Option<usize>,
database: &mut D,
db: &mut D,
_progress_update: P,
) -> Result<(), Error> {
// TODO: progress
let start = Instant::now();
debug!("start setup");
let stop_gap = stop_gap.unwrap_or(20);
let batch_query_size = 20;
let chunk_size = stop_gap;
// check unconfirmed tx, delete so they are retrieved later
let mut del_batch = database.begin_batch();
for tx in database.iter_txs(false)? {
if tx.height.is_none() {
del_batch.del_tx(&tx.txid, false)?;
}
}
database.commit_batch(del_batch)?;
let mut history_txs_id = HashSet::new();
let mut txid_height = HashMap::new();
let mut max_indexes = HashMap::new();
// maximum derivation index for a change address that we've seen during sync
let mut change_max_deriv = None;
let mut wallet_chains = vec![ScriptType::Internal, ScriptType::External];
// shuffling improve privacy, the server doesn't know my first request is from my internal or external addresses
wallet_chains.shuffle(&mut thread_rng());
// download history of our internal and external script_pubkeys
for script_type in wallet_chains.iter() {
let script_iter = db.iter_script_pubkeys(Some(*script_type))?.into_iter();
let mut already_checked: HashSet<Script> = HashSet::new();
let mut to_check_later = VecDeque::with_capacity(batch_query_size);
// insert the first chunk
let mut iter_scriptpubkeys = database
.iter_script_pubkeys(Some(ScriptType::External))?
.into_iter();
let chunk: Vec<Script> = iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
for item in chunk.into_iter().rev() {
to_check_later.push_front(item);
}
let mut iterating_external = true;
let mut index = 0;
let mut last_found = None;
while !to_check_later.is_empty() {
trace!("to_check_later size {}", to_check_later.len());
let until = cmp::min(to_check_later.len(), batch_query_size);
let chunk: Vec<Script> = to_check_later.drain(..until).collect();
let call_result = maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
for (script, history) in chunk.into_iter().zip(call_result.into_iter()) {
trace!("received history for {:?}, size {}", script, history.len());
if !history.is_empty() {
last_found = Some(index);
let mut check_later_scripts = maybe_await!(self.check_history(
database,
script,
history,
&mut change_max_deriv
))?
.into_iter()
.filter(|x| already_checked.insert(x.clone()))
.collect();
to_check_later.append(&mut check_later_scripts);
for (i, chunk) in ChunksIterator::new(script_iter, stop_gap).enumerate() {
// TODO if i == last, should create another chunk of addresses in db
let call_result: Vec<Vec<ELSGetHistoryRes>> =
maybe_await!(self.els_batch_script_get_history(chunk.iter()))?;
let max_index = call_result
.iter()
.enumerate()
.filter(|(_, v)| !v.is_empty())
.map(|(i, _)| i as u32)
.max();
if let Some(max) = max_index {
max_indexes.insert(script_type, max + (i * chunk_size) as u32);
}
let flattened: Vec<ELSGetHistoryRes> = call_result.into_iter().flatten().collect();
debug!("#{} of {:?} results:{}", i, script_type, flattened.len());
if flattened.is_empty() {
// Didn't find anything in the last `stop_gap` script_pubkeys, breaking
break;
}
index += 1;
}
match iterating_external {
true if index - last_found.unwrap_or(0) >= stop_gap => iterating_external = false,
true => {
trace!("pushing one more batch from `iter_scriptpubkeys`. index = {}, last_found = {:?}, stop_gap = {}", index, last_found, stop_gap);
let chunk: Vec<Script> =
iter_scriptpubkeys.by_ref().take(batch_query_size).collect();
for item in chunk.into_iter().rev() {
to_check_later.push_front(item);
for el in flattened {
// el.height = -1 means unconfirmed with unconfirmed parents
// el.height = 0 means unconfirmed with confirmed parents
// but we threat those tx the same
let height = el.height.max(0);
if height == 0 {
txid_height.insert(el.tx_hash, None);
} else {
txid_height.insert(el.tx_hash, Some(height as u32));
}
}
_ => {}
}
}
// check utxo
// TODO: try to minimize network requests and re-use scripts if possible
let mut batch = database.begin_batch();
for chunk in ChunksIterator::new(database.iter_utxos()?.into_iter(), batch_query_size) {
let scripts: Vec<_> = chunk.iter().map(|u| &u.txout.script_pubkey).collect();
let call_result = maybe_await!(self.els_batch_script_list_unspent(scripts))?;
// check which utxos are actually still unspent
for (utxo, list_unspent) in chunk.into_iter().zip(call_result.iter()) {
debug!(
"outpoint {:?} is unspent for me, list unspent is {:?}",
utxo.outpoint, list_unspent
);
let mut spent = true;
for unspent in list_unspent {
let res_outpoint = OutPoint::new(unspent.tx_hash, unspent.tx_pos as u32);
if utxo.outpoint == res_outpoint {
spent = false;
break;
}
}
if spent {
info!("{} not anymore unspent, removing", utxo.outpoint);
batch.del_utxo(&utxo.outpoint)?;
history_txs_id.insert(el.tx_hash);
}
}
}
let current_ext = database.get_last_index(ScriptType::External)?.unwrap_or(0);
let first_ext_new = last_found.map(|x| x + 1).unwrap_or(0) as u32;
if first_ext_new > current_ext {
info!("Setting external index to {}", first_ext_new);
database.set_last_index(ScriptType::External, first_ext_new)?;
// saving max indexes
info!("max indexes are: {:?}", max_indexes);
for script_type in wallet_chains.iter() {
if let Some(index) = max_indexes.get(script_type) {
db.set_last_index(*script_type, *index)?;
}
}
let current_int = database.get_last_index(ScriptType::Internal)?.unwrap_or(0);
let first_int_new = change_max_deriv.map(|x| x + 1).unwrap_or(0);
if first_int_new > current_int {
info!("Setting internal index to {}", first_int_new);
database.set_last_index(ScriptType::Internal, first_int_new)?;
// get db status
let txs_details_in_db: HashMap<Txid, TransactionDetails> = db
.iter_txs(false)?
.into_iter()
.map(|tx| (tx.txid, tx))
.collect();
let txs_raw_in_db: HashMap<Txid, Transaction> = db
.iter_raw_txs()?
.into_iter()
.map(|tx| (tx.txid(), tx))
.collect();
let utxos_deps = utxos_deps(db, &txs_raw_in_db)?;
// download new txs and headers
let new_txs = maybe_await!(self.download_and_save_needed_raw_txs(
&history_txs_id,
&txs_raw_in_db,
chunk_size,
db
))?;
let new_timestamps = maybe_await!(self.download_needed_headers(
&txid_height,
&txs_details_in_db,
chunk_size
))?;
let mut batch = db.begin_batch();
// save any tx details not in db but in history_txs_id or with different height/timestamp
for txid in history_txs_id.iter() {
let height = *txid_height.get(txid).unwrap_or(&None);
let timestamp = *new_timestamps.get(txid).unwrap_or(&0u64);
if let Some(tx_details) = txs_details_in_db.get(txid) {
// check if height matches, otherwise updates it
if tx_details.height != height {
let mut new_tx_details = tx_details.clone();
new_tx_details.height = height;
new_tx_details.timestamp = timestamp;
batch.set_tx(&new_tx_details)?;
}
} else {
save_transaction_details_and_utxos(
&txid,
db,
timestamp,
height,
&mut batch,
&utxos_deps,
)?;
}
}
database.commit_batch(batch)?;
// remove any tx details in db but not in history_txs_id
for txid in txs_details_in_db.keys() {
if !history_txs_id.contains(txid) {
batch.del_tx(&txid, false)?;
}
}
// remove any spent utxo
for new_tx in new_txs.iter() {
for input in new_tx.input.iter() {
batch.del_utxo(&input.previous_output)?;
}
}
db.commit_batch(batch)?;
info!("finish setup, elapsed {:?}ms", start.elapsed().as_millis());
Ok(())
}
fn check_tx_and_descendant<D: BatchDatabase>(
/// download txs identified by `history_txs_id` and theirs previous outputs if not already present in db
fn download_and_save_needed_raw_txs<D: BatchDatabase>(
&self,
database: &mut D,
txid: &Txid,
height: Option<u32>,
cur_script: &Script,
change_max_deriv: &mut Option<u32>,
) -> Result<Vec<Script>, Error> {
debug!(
"check_tx_and_descendant of {}, height: {:?}, script: {}",
txid, height, cur_script
);
let mut updates = database.begin_batch();
let tx = match database.get_tx(&txid, true)? {
Some(mut saved_tx) => {
// update the height if it's different (in case of reorg)
if saved_tx.height != height {
info!(
"updating height from {:?} to {:?} for tx {}",
saved_tx.height, height, txid
);
saved_tx.height = height;
updates.set_tx(&saved_tx)?;
}
debug!("already have {} in db, returning the cached version", txid);
// unwrap since we explicitly ask for the raw_tx, if it's not present something
// went wrong
saved_tx.transaction.unwrap()
}
None => {
let fetched_tx = maybe_await!(self.els_transaction_get(&txid))?;
database.set_raw_tx(&fetched_tx)?;
fetched_tx
}
};
let mut incoming: u64 = 0;
let mut outgoing: u64 = 0;
let mut inputs_sum: u64 = 0;
let mut outputs_sum: u64 = 0;
// look for our own inputs
for (i, input) in tx.input.iter().enumerate() {
// skip coinbase inputs
if input.previous_output.is_null() {
continue;
}
// the fact that we visit addresses in a BFS fashion starting from the external addresses
// should ensure that this query is always consistent (i.e. when we get to call this all
// the transactions at a lower depth have already been indexed, so if an outpoint is ours
// we are guaranteed to have it in the db).
if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
inputs_sum += previous_output.value;
if database.is_mine(&previous_output.script_pubkey)? {
outgoing += previous_output.value;
debug!("{} input #{} is mine, removing from utxo", txid, i);
updates.del_utxo(&input.previous_output)?;
}
} else {
// The input is not ours, but we still need to count it for the fees. so fetch the
// tx (from the database or from network) and check it
let tx = match database.get_tx(&input.previous_output.txid, true)? {
Some(saved_tx) => saved_tx.transaction.unwrap(),
None => {
let fetched_tx =
maybe_await!(self.els_transaction_get(&input.previous_output.txid))?;
database.set_raw_tx(&fetched_tx)?;
fetched_tx
}
};
inputs_sum += tx.output[input.previous_output.vout as usize].value;
}
}
let mut to_check_later = vec![];
for (i, output) in tx.output.iter().enumerate() {
// to compute the fees later
outputs_sum += output.value;
// this output is ours, we have a path to derive it
if let Some((script_type, child)) =
database.get_path_from_script_pubkey(&output.script_pubkey)?
{
debug!("{} output #{} is mine, adding utxo", txid, i);
updates.set_utxo(&UTXO {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
is_internal: script_type.is_internal(),
})?;
incoming += output.value;
if output.script_pubkey != *cur_script {
debug!("{} output #{} script {} was not current script, adding script to be checked later", txid, i, output.script_pubkey);
to_check_later.push(output.script_pubkey.clone())
}
// derive as many change addrs as external addresses that we've seen
if script_type == ScriptType::Internal
&& (change_max_deriv.is_none() || child > change_max_deriv.unwrap_or(0))
{
*change_max_deriv = Some(child);
history_txs_id: &HashSet<Txid>,
txs_raw_in_db: &HashMap<Txid, Transaction>,
chunk_size: usize,
db: &mut D,
) -> Result<Vec<Transaction>, Error> {
let mut txs_downloaded = vec![];
let txids_raw_in_db: HashSet<Txid> = txs_raw_in_db.keys().cloned().collect();
let txids_to_download: Vec<&Txid> = history_txs_id.difference(&txids_raw_in_db).collect();
if !txids_to_download.is_empty() {
info!("got {} txs to download", txids_to_download.len());
txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
txids_to_download,
chunk_size,
db,
))?);
let mut prev_txids = HashSet::new();
let mut txids_downloaded = HashSet::new();
for tx in txs_downloaded.iter() {
txids_downloaded.insert(tx.txid());
// add every previous input tx, but skip coinbase
for input in tx.input.iter().filter(|i| !i.previous_output.is_null()) {
prev_txids.insert(input.previous_output.txid);
}
}
}
let tx = TransactionDetails {
txid: tx.txid(),
transaction: Some(tx),
received: incoming,
sent: outgoing,
height,
timestamp: 0,
fees: inputs_sum.saturating_sub(outputs_sum), // if the tx is a coinbase, fees would be negative
};
info!("Saving tx {}", txid);
updates.set_tx(&tx)?;
database.commit_batch(updates)?;
Ok(to_check_later)
}
fn check_history<D: BatchDatabase>(
&self,
database: &mut D,
script_pubkey: Script,
txs: Vec<ELSGetHistoryRes>,
change_max_deriv: &mut Option<u32>,
) -> Result<Vec<Script>, Error> {
let mut to_check_later = Vec::new();
debug!(
"history of {} script {} has {} tx",
Address::from_script(&script_pubkey, Network::Testnet).unwrap(),
script_pubkey,
txs.len()
);
for tx in txs {
let height: Option<u32> = match tx.height {
0 | -1 => None,
x => u32::try_from(x).ok(),
};
to_check_later.extend_from_slice(&maybe_await!(self.check_tx_and_descendant(
database,
&tx.tx_hash,
height,
&script_pubkey,
change_max_deriv,
let already_present: HashSet<Txid> =
txids_downloaded.union(&txids_raw_in_db).cloned().collect();
let prev_txs_to_download: Vec<&Txid> =
prev_txids.difference(&already_present).collect();
info!("{} previous txs to download", prev_txs_to_download.len());
txs_downloaded.extend(maybe_await!(self.download_and_save_in_chunks(
prev_txs_to_download,
chunk_size,
db,
))?);
}
Ok(to_check_later)
Ok(txs_downloaded)
}
/// download headers at heights in `txid_height` if tx details not already present, returns a map Txid -> timestamp
fn download_needed_headers(
&self,
txid_height: &HashMap<Txid, Option<u32>>,
txs_details_in_db: &HashMap<Txid, TransactionDetails>,
chunk_size: usize,
) -> Result<HashMap<Txid, u64>, Error> {
let mut txid_timestamp = HashMap::new();
let needed_txid_height: HashMap<&Txid, &Option<u32>> = txid_height
.iter()
.filter(|(txid, _)| txs_details_in_db.get(*txid).is_none())
.collect();
let needed_heights: HashSet<u32> =
needed_txid_height.iter().filter_map(|(_, b)| **b).collect();
if !needed_heights.is_empty() {
info!("{} headers to download for timestamp", needed_heights.len());
let mut height_timestamp: HashMap<u32, u64> = HashMap::new();
for chunk in ChunksIterator::new(needed_heights.into_iter(), chunk_size) {
let call_result: Vec<BlockHeader> =
maybe_await!(self.els_batch_block_header(chunk.clone()))?;
let vec: Vec<(u32, u64)> = chunk
.into_iter()
.zip(call_result.iter().map(|h| h.time as u64))
.collect();
height_timestamp.extend(vec);
}
for (txid, height_opt) in needed_txid_height {
if let Some(height) = height_opt {
let timestamp = height_timestamp
.get(height)
.ok_or_else(|| Error::Generic("timestamp missing".to_string()))?;
txid_timestamp.insert(*txid, *timestamp);
}
}
}
Ok(txid_timestamp)
}
fn download_and_save_in_chunks<D: BatchDatabase>(
&self,
to_download: Vec<&Txid>,
chunk_size: usize,
db: &mut D,
) -> Result<Vec<Transaction>, Error> {
let mut txs_downloaded = vec![];
for chunk in ChunksIterator::new(to_download.into_iter(), chunk_size) {
let call_result: Vec<Transaction> =
maybe_await!(self.els_batch_transaction_get(chunk))?;
let mut batch = db.begin_batch();
for new_tx in call_result.iter() {
batch.set_raw_tx(new_tx)?;
}
db.commit_batch(batch)?;
txs_downloaded.extend(call_result);
}
Ok(txs_downloaded)
}
}
fn save_transaction_details_and_utxos<D: BatchDatabase>(
txid: &Txid,
db: &mut D,
timestamp: u64,
height: Option<u32>,
updates: &mut dyn BatchOperations,
utxo_deps: &HashMap<OutPoint, UTXO>,
) -> Result<(), Error> {
let tx = db
.get_raw_tx(txid)?
.ok_or_else(|| Error::TransactionNotFound)?;
let mut incoming: u64 = 0;
let mut outgoing: u64 = 0;
let mut inputs_sum: u64 = 0;
let mut outputs_sum: u64 = 0;
// look for our own inputs
for input in tx.input.iter() {
// skip coinbase inputs
if input.previous_output.is_null() {
continue;
}
// We already downloaded all previous output txs in the previous step
if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
inputs_sum += previous_output.value;
if db.is_mine(&previous_output.script_pubkey)? {
outgoing += previous_output.value;
}
} else {
// The input is not ours, but we still need to count it for the fees
let tx = db
.get_raw_tx(&input.previous_output.txid)?
.ok_or_else(|| Error::TransactionNotFound)?;
inputs_sum += tx.output[input.previous_output.vout as usize].value;
}
// removes conflicting UTXO if any (generated from same inputs, like for example RBF)
if let Some(utxo) = utxo_deps.get(&input.previous_output) {
updates.del_utxo(&utxo.outpoint)?;
}
}
for (i, output) in tx.output.iter().enumerate() {
// to compute the fees later
outputs_sum += output.value;
// this output is ours, we have a path to derive it
if let Some((script_type, _child)) =
db.get_path_from_script_pubkey(&output.script_pubkey)?
{
debug!("{} output #{} is mine, adding utxo", txid, i);
updates.set_utxo(&UTXO {
outpoint: OutPoint::new(tx.txid(), i as u32),
txout: output.clone(),
is_internal: script_type.is_internal(),
})?;
incoming += output.value;
}
}
let tx_details = TransactionDetails {
txid: tx.txid(),
transaction: Some(tx),
received: incoming,
sent: outgoing,
height,
timestamp,
fees: inputs_sum.saturating_sub(outputs_sum), // if the tx is a coinbase, fees would be negative
};
updates.set_tx(&tx_details)?;
Ok(())
}
/// returns utxo dependency as the inputs needed for the utxo to exist
/// `tx_raw_in_db` must contains utxo's generating txs or errors witt [crate::Error::TransactionNotFound]
fn utxos_deps<D: BatchDatabase>(
db: &mut D,
tx_raw_in_db: &HashMap<Txid, Transaction>,
) -> Result<HashMap<OutPoint, UTXO>, Error> {
let utxos = db.iter_utxos()?;
let mut utxos_deps = HashMap::new();
for utxo in utxos {
let from_tx = tx_raw_in_db
.get(&utxo.outpoint.txid)
.ok_or_else(|| Error::TransactionNotFound)?;
for input in from_tx.input.iter() {
utxos_deps.insert(input.previous_output, utxo.clone());
}
}
Ok(utxos_deps)
}