Less intermediary data states in sync

Use BTrees to store ordered sets rather than HashSets -> VecDequeue
This commit is contained in:
LLFourn 2021-11-05 12:34:17 +11:00
parent dfb63d389b
commit d39401162f
No known key found for this signature in database
GPG Key ID: A27093B54DA11F65

View File

@ -10,7 +10,7 @@ use crate::{
}; };
use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid}; use bitcoin::{OutPoint, Script, Transaction, TxOut, Txid};
use log::*; use log::*;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
/// A request for on-chain information /// A request for on-chain information
pub enum Request<'a, D: BatchDatabase> { pub enum Request<'a, D: BatchDatabase> {
@ -44,8 +44,6 @@ pub fn start<D: BatchDatabase>(db: &D, stop_gap: usize) -> Result<Request<'_, D>
stop_gap, stop_gap,
keychain, keychain,
next_keychains: keychains, next_keychains: keychains,
tx_interested: HashSet::default(),
tx_conftime_interested: HashSet::default(),
})) }))
} }
@ -56,8 +54,6 @@ pub struct ScriptReq<'a, D: BatchDatabase> {
stop_gap: usize, stop_gap: usize,
keychain: KeychainKind, keychain: KeychainKind,
next_keychains: Vec<KeychainKind>, next_keychains: Vec<KeychainKind>,
tx_interested: HashSet<Txid>,
tx_conftime_interested: HashSet<Txid>,
} }
/// The sync starts by returning script pubkeys we are interested in. /// The sync starts by returning script pubkeys we are interested in.
@ -93,11 +89,11 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
(None, Some(_)) => { (None, Some(_)) => {
// It looks like the tx has confirmed since we last saw it -- we // It looks like the tx has confirmed since we last saw it -- we
// need to know the confirmation time. // need to know the confirmation time.
self.tx_conftime_interested.insert(*txid); self.state.tx_missing_conftime.insert(*txid, details);
} }
(Some(old_height), Some(new_height)) if old_height != *new_height => { (Some(old_height), Some(new_height)) if old_height != *new_height => {
// The height of the tx has changed !? -- get the confirmation time. // The height of the tx has changed !? -- get the confirmation time.
self.tx_conftime_interested.insert(*txid); self.state.tx_missing_conftime.insert(*txid, details);
} }
(Some(_), None) => { (Some(_), None) => {
details.confirmation_time = None; details.confirmation_time = None;
@ -108,7 +104,7 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
} }
None => { None => {
// we've never seen it let's get the whole thing // we've never seen it let's get the whole thing
self.tx_interested.insert(*txid); self.state.tx_needed.insert(*txid);
} }
}; };
} }
@ -147,8 +143,6 @@ impl<'a, D: BatchDatabase> ScriptReq<'a, D> {
.collect(); .collect();
Request::Script(self) Request::Script(self)
} else { } else {
self.state.conftime_needed = self.tx_conftime_interested.into_iter().collect();
self.state.tx_needed = self.tx_interested.into_iter().collect();
Request::Tx(TxReq { state: self.state }) Request::Tx(TxReq { state: self.state })
} }
} else { } else {
@ -225,11 +219,10 @@ impl<'a, D: BatchDatabase> TxReq<'a, D> {
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
for tx_detail in tx_details { for tx_detail in tx_details {
self.state.conftime_needed.push_back(tx_detail.txid); self.state.tx_needed.remove(&tx_detail.txid);
self.state self.state
.tx_missing_conftime .tx_missing_conftime
.insert(tx_detail.txid, tx_detail); .insert(tx_detail.txid, tx_detail);
self.state.tx_needed.pop_front();
} }
if !self.state.tx_needed.is_empty() { if !self.state.tx_needed.is_empty() {
@ -247,34 +240,28 @@ pub struct ConftimeReq<'a, D> {
impl<'a, D: BatchDatabase> ConftimeReq<'a, D> { impl<'a, D: BatchDatabase> ConftimeReq<'a, D> {
pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone { pub fn request(&self) -> impl Iterator<Item = &Txid> + Clone {
self.state.conftime_needed.iter() self.state.tx_missing_conftime.keys()
} }
pub fn satisfy( pub fn satisfy(
mut self, mut self,
confirmation_times: Vec<Option<ConfirmationTime>>, confirmation_times: Vec<Option<ConfirmationTime>>,
) -> Result<Request<'a, D>, Error> { ) -> Result<Request<'a, D>, Error> {
let n = confirmation_times.len(); let conftime_needed = self
let conftime_needed = self.state.conftime_needed.iter(); .request()
for (confirmation_time, txid) in confirmation_times.into_iter().zip(conftime_needed) { .cloned()
.take(confirmation_times.len())
.collect::<Vec<_>>();
for (confirmation_time, txid) in confirmation_times.into_iter().zip(conftime_needed.iter())
{
debug!("confirmation time for {} was {:?}", txid, confirmation_time); debug!("confirmation time for {} was {:?}", txid, confirmation_time);
// this is written awkwardly to avoid lifetime issues with using cleaner .or_else if let Some(mut tx_details) = self.state.tx_missing_conftime.remove(txid) {
let mut tx_details = self.state.tx_missing_conftime.remove(txid);
if tx_details.is_none() {
tx_details = self.state.db.get_tx(txid, true)?
}
if let Some(mut tx_details) = tx_details {
tx_details.confirmation_time = confirmation_time; tx_details.confirmation_time = confirmation_time;
self.state.finished_txs.push(tx_details); self.state.finished_txs.push(tx_details);
} }
} }
for _ in 0..n { if self.state.tx_missing_conftime.is_empty() {
self.state.conftime_needed.pop_front();
}
if self.state.conftime_needed.is_empty() {
Ok(Request::Finish(self.state.into_db_update()?)) Ok(Request::Finish(self.state.into_db_update()?))
} else { } else {
Ok(Request::Conftime(self)) Ok(Request::Conftime(self))
@ -286,13 +273,11 @@ struct State<'a, D> {
db: &'a D, db: &'a D,
last_active_index: HashMap<KeychainKind, usize>, last_active_index: HashMap<KeychainKind, usize>,
/// Transactions where we need to get the full details /// Transactions where we need to get the full details
tx_needed: VecDeque<Txid>, tx_needed: BTreeSet<Txid>,
/// Transactions where we need to get the confirmation time
conftime_needed: VecDeque<Txid>,
/// Transacitions that we know everything about /// Transacitions that we know everything about
finished_txs: Vec<TransactionDetails>, finished_txs: Vec<TransactionDetails>,
/// Transactions that discovered conftimes should be inserted into /// Transactions that discovered conftimes should be inserted into
tx_missing_conftime: HashMap<Txid, TransactionDetails>, tx_missing_conftime: BTreeMap<Txid, TransactionDetails>,
/// The start of the sync /// The start of the sync
start_time: Instant, start_time: Instant,
} }
@ -302,19 +287,14 @@ impl<'a, D: BatchDatabase> State<'a, D> {
State { State {
db, db,
last_active_index: HashMap::default(), last_active_index: HashMap::default(),
conftime_needed: VecDeque::default(),
finished_txs: vec![], finished_txs: vec![],
tx_needed: VecDeque::default(), tx_needed: BTreeSet::default(),
tx_missing_conftime: HashMap::default(), tx_missing_conftime: BTreeMap::default(),
start_time: Instant::new(), start_time: Instant::new(),
} }
} }
fn into_db_update(self) -> Result<D::Batch, Error> { fn into_db_update(self) -> Result<D::Batch, Error> {
debug_assert!( debug_assert!(self.tx_needed.is_empty() && self.tx_missing_conftime.is_empty());
self.tx_needed.is_empty()
&& self.tx_missing_conftime.is_empty()
&& self.conftime_needed.is_empty()
);
let existing_txs = self.db.iter_txs(false)?; let existing_txs = self.db.iter_txs(false)?;
let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect(); let existing_txids: HashSet<Txid> = existing_txs.iter().map(|tx| tx.txid).collect();
let finished_txs = make_txs_consistent(&self.finished_txs); let finished_txs = make_txs_consistent(&self.finished_txs);