508 lines
17 KiB
Rust
508 lines
17 KiB
Rust
|
use bdk_chain::{
|
||
|
bitcoin::{hashes::hex::FromHex, BlockHash, OutPoint, Script, Transaction, Txid},
|
||
|
keychain::LocalUpdate,
|
||
|
local_chain::LocalChain,
|
||
|
tx_graph::{self, TxGraph},
|
||
|
Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor,
|
||
|
};
|
||
|
use electrum_client::{Client, ElectrumApi, Error};
|
||
|
use std::{
|
||
|
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
|
||
|
fmt::Debug,
|
||
|
};
|
||
|
|
||
|
use crate::InternalError;
|
||
|
|
||
|
#[derive(Debug, Clone)]
|
||
|
pub struct ElectrumUpdate<K, A> {
|
||
|
pub graph_update: HashMap<Txid, BTreeSet<A>>,
|
||
|
pub chain_update: LocalChain,
|
||
|
pub keychain_update: BTreeMap<K, u32>,
|
||
|
}
|
||
|
|
||
|
impl<K, A> Default for ElectrumUpdate<K, A> {
|
||
|
fn default() -> Self {
|
||
|
Self {
|
||
|
graph_update: Default::default(),
|
||
|
chain_update: Default::default(),
|
||
|
keychain_update: Default::default(),
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl<'a, K, A: Anchor> ElectrumUpdate<K, A> {
|
||
|
pub fn missing_full_txs<A2>(
|
||
|
&'a self,
|
||
|
graph: &'a TxGraph<A2>,
|
||
|
) -> impl Iterator<Item = &'a Txid> + 'a {
|
||
|
self.graph_update
|
||
|
.keys()
|
||
|
.filter(move |&&txid| graph.as_ref().get_tx(txid).is_none())
|
||
|
}
|
||
|
|
||
|
pub fn finalize<T>(self, seen_at: Option<u64>, new_txs: T) -> LocalUpdate<K, A>
|
||
|
where
|
||
|
T: IntoIterator<Item = Transaction>,
|
||
|
{
|
||
|
let mut graph_update = TxGraph::<A>::new(new_txs);
|
||
|
for (txid, anchors) in self.graph_update {
|
||
|
if let Some(seen_at) = seen_at {
|
||
|
let _ = graph_update.insert_seen_at(txid, seen_at);
|
||
|
}
|
||
|
for anchor in anchors {
|
||
|
let _ = graph_update.insert_anchor(txid, anchor);
|
||
|
}
|
||
|
}
|
||
|
dbg!(graph_update.full_txs().count());
|
||
|
LocalUpdate {
|
||
|
keychain: self.keychain_update,
|
||
|
graph: graph_update,
|
||
|
chain: self.chain_update,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl<K> ElectrumUpdate<K, ConfirmationHeightAnchor> {
|
||
|
pub fn finalize_as_confirmation_time<T>(
|
||
|
self,
|
||
|
client: &Client,
|
||
|
seen_at: Option<u64>,
|
||
|
new_txs: T,
|
||
|
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>
|
||
|
where
|
||
|
T: IntoIterator<Item = Transaction>,
|
||
|
{
|
||
|
let update = self.finalize(seen_at, new_txs);
|
||
|
let update_tip = update.chain.tip().expect("must have tip");
|
||
|
|
||
|
let relevant_heights = {
|
||
|
let mut visited_heights = HashSet::new();
|
||
|
update
|
||
|
.graph
|
||
|
.all_anchors()
|
||
|
.iter()
|
||
|
.map(|(a, _)| a.confirmation_height_upper_bound())
|
||
|
.filter(move |&h| visited_heights.insert(h))
|
||
|
.collect::<Vec<_>>()
|
||
|
};
|
||
|
|
||
|
let height_to_time = relevant_heights
|
||
|
.clone()
|
||
|
.into_iter()
|
||
|
.zip(
|
||
|
client
|
||
|
.batch_block_header(relevant_heights)?
|
||
|
.into_iter()
|
||
|
.map(|bh| bh.time as u64),
|
||
|
)
|
||
|
.collect::<HashMap<u32, u64>>();
|
||
|
|
||
|
if update_tip.hash != client.block_header(update_tip.height as _)?.block_hash() {
|
||
|
// [TODO] We should alter the logic so we won't have to return an error. This is to
|
||
|
// [TODO] ensure obtained block times are "anchored" to our tip. If we exclude this, it
|
||
|
// [TODO] should be "safe" as well. Tx confirmation times would just slightly vary.
|
||
|
return Err(Error::Message(format!(
|
||
|
"tip changed during update: update_tip={:?}",
|
||
|
update_tip
|
||
|
)));
|
||
|
}
|
||
|
|
||
|
let graph_additions = {
|
||
|
let old_additions = TxGraph::default().determine_additions(&update.graph);
|
||
|
tx_graph::Additions {
|
||
|
tx: old_additions.tx,
|
||
|
txout: old_additions.txout,
|
||
|
last_seen: old_additions.last_seen,
|
||
|
anchors: old_additions
|
||
|
.anchors
|
||
|
.into_iter()
|
||
|
.map(|(height_anchor, txid)| {
|
||
|
let confirmation_height = dbg!(height_anchor.confirmation_height);
|
||
|
let confirmation_time = height_to_time[&confirmation_height];
|
||
|
let time_anchor = ConfirmationTimeAnchor {
|
||
|
anchor_block: height_anchor.anchor_block,
|
||
|
confirmation_height,
|
||
|
confirmation_time,
|
||
|
};
|
||
|
(time_anchor, txid)
|
||
|
})
|
||
|
.collect(),
|
||
|
}
|
||
|
};
|
||
|
|
||
|
Ok(LocalUpdate {
|
||
|
keychain: update.keychain,
|
||
|
graph: {
|
||
|
let mut graph = TxGraph::default();
|
||
|
graph.apply_additions(graph_additions);
|
||
|
graph
|
||
|
},
|
||
|
chain: update.chain,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub trait ElectrumExt<A> {
|
||
|
fn get_tip(&self) -> Result<(u32, BlockHash), Error>;
|
||
|
|
||
|
fn scan<K: Ord + Clone>(
|
||
|
&self,
|
||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||
|
txids: impl IntoIterator<Item = Txid>,
|
||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||
|
stop_gap: usize,
|
||
|
batch_size: usize,
|
||
|
) -> Result<ElectrumUpdate<K, A>, Error>;
|
||
|
|
||
|
fn scan_without_keychain(
|
||
|
&self,
|
||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||
|
misc_spks: impl IntoIterator<Item = Script>,
|
||
|
txids: impl IntoIterator<Item = Txid>,
|
||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||
|
batch_size: usize,
|
||
|
) -> Result<ElectrumUpdate<(), A>, Error> {
|
||
|
let spk_iter = misc_spks
|
||
|
.into_iter()
|
||
|
.enumerate()
|
||
|
.map(|(i, spk)| (i as u32, spk));
|
||
|
|
||
|
self.scan(
|
||
|
local_chain,
|
||
|
[((), spk_iter)].into(),
|
||
|
txids,
|
||
|
outpoints,
|
||
|
usize::MAX,
|
||
|
batch_size,
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl ElectrumExt<ConfirmationHeightAnchor> for Client {
|
||
|
fn get_tip(&self) -> Result<(u32, BlockHash), Error> {
|
||
|
// TODO: unsubscribe when added to the client, or is there a better call to use here?
|
||
|
self.block_headers_subscribe()
|
||
|
.map(|data| (data.height as u32, data.header.block_hash()))
|
||
|
}
|
||
|
|
||
|
fn scan<K: Ord + Clone>(
|
||
|
&self,
|
||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||
|
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||
|
txids: impl IntoIterator<Item = Txid>,
|
||
|
outpoints: impl IntoIterator<Item = OutPoint>,
|
||
|
stop_gap: usize,
|
||
|
batch_size: usize,
|
||
|
) -> Result<ElectrumUpdate<K, ConfirmationHeightAnchor>, Error> {
|
||
|
let mut request_spks = keychain_spks
|
||
|
.into_iter()
|
||
|
.map(|(k, s)| (k, s.into_iter()))
|
||
|
.collect::<BTreeMap<K, _>>();
|
||
|
let mut scanned_spks = BTreeMap::<(K, u32), (Script, bool)>::new();
|
||
|
|
||
|
let txids = txids.into_iter().collect::<Vec<_>>();
|
||
|
let outpoints = outpoints.into_iter().collect::<Vec<_>>();
|
||
|
|
||
|
let update = loop {
|
||
|
let mut update = ElectrumUpdate::<K, ConfirmationHeightAnchor> {
|
||
|
chain_update: prepare_chain_update(self, local_chain)?,
|
||
|
..Default::default()
|
||
|
};
|
||
|
let anchor_block = update
|
||
|
.chain_update
|
||
|
.tip()
|
||
|
.expect("must have atleast one block");
|
||
|
|
||
|
if !request_spks.is_empty() {
|
||
|
if !scanned_spks.is_empty() {
|
||
|
let mut scanned_spk_iter = scanned_spks
|
||
|
.iter()
|
||
|
.map(|(i, (spk, _))| (i.clone(), spk.clone()));
|
||
|
match populate_with_spks(
|
||
|
self,
|
||
|
anchor_block,
|
||
|
&mut update,
|
||
|
&mut scanned_spk_iter,
|
||
|
stop_gap,
|
||
|
batch_size,
|
||
|
) {
|
||
|
Err(InternalError::Reorg) => continue,
|
||
|
Err(InternalError::ElectrumError(e)) => return Err(e),
|
||
|
Ok(mut spks) => scanned_spks.append(&mut spks),
|
||
|
};
|
||
|
}
|
||
|
for (keychain, keychain_spks) in &mut request_spks {
|
||
|
match populate_with_spks(
|
||
|
self,
|
||
|
anchor_block,
|
||
|
&mut update,
|
||
|
keychain_spks,
|
||
|
stop_gap,
|
||
|
batch_size,
|
||
|
) {
|
||
|
Err(InternalError::Reorg) => continue,
|
||
|
Err(InternalError::ElectrumError(e)) => return Err(e),
|
||
|
Ok(spks) => scanned_spks.extend(
|
||
|
spks.into_iter()
|
||
|
.map(|(spk_i, spk)| ((keychain.clone(), spk_i), spk)),
|
||
|
),
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
match populate_with_txids(self, anchor_block, &mut update, &mut txids.iter().cloned()) {
|
||
|
Err(InternalError::Reorg) => continue,
|
||
|
Err(InternalError::ElectrumError(e)) => return Err(e),
|
||
|
Ok(_) => {}
|
||
|
}
|
||
|
|
||
|
match populate_with_outpoints(
|
||
|
self,
|
||
|
anchor_block,
|
||
|
&mut update,
|
||
|
&mut outpoints.iter().cloned(),
|
||
|
) {
|
||
|
Err(InternalError::Reorg) => continue,
|
||
|
Err(InternalError::ElectrumError(e)) => return Err(e),
|
||
|
Ok(_txs) => { /* [TODO] cache full txs to reduce bandwidth */ }
|
||
|
}
|
||
|
|
||
|
// check for reorgs during scan process
|
||
|
let server_blockhash = self
|
||
|
.block_header(anchor_block.height as usize)?
|
||
|
.block_hash();
|
||
|
if anchor_block.hash != server_blockhash {
|
||
|
continue; // reorg
|
||
|
}
|
||
|
|
||
|
update.keychain_update = request_spks
|
||
|
.into_keys()
|
||
|
.filter_map(|k| {
|
||
|
scanned_spks
|
||
|
.range((k.clone(), u32::MIN)..=(k.clone(), u32::MAX))
|
||
|
.rev()
|
||
|
.find(|(_, (_, active))| *active)
|
||
|
.map(|((_, i), _)| (k, *i))
|
||
|
})
|
||
|
.collect::<BTreeMap<_, _>>();
|
||
|
break update;
|
||
|
};
|
||
|
|
||
|
Ok(update)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Prepare an update "template" based on the checkpoints of the `local_chain`.
|
||
|
fn prepare_chain_update(
|
||
|
client: &Client,
|
||
|
local_chain: &BTreeMap<u32, BlockHash>,
|
||
|
) -> Result<LocalChain, Error> {
|
||
|
let mut update = LocalChain::default();
|
||
|
|
||
|
// Find the local chain block that is still there so our update can connect to the local chain.
|
||
|
for (&existing_height, &existing_hash) in local_chain.iter().rev() {
|
||
|
// TODO: a batch request may be safer, as a reorg that happens when we are obtaining
|
||
|
// `block_header`s will result in inconsistencies
|
||
|
let current_hash = client.block_header(existing_height as usize)?.block_hash();
|
||
|
let _ = update
|
||
|
.insert_block(BlockId {
|
||
|
height: existing_height,
|
||
|
hash: current_hash,
|
||
|
})
|
||
|
.expect("This never errors because we are working with a fresh chain");
|
||
|
|
||
|
if current_hash == existing_hash {
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Insert the new tip so new transactions will be accepted into the sparsechain.
|
||
|
let tip = {
|
||
|
let (height, hash) = crate::get_tip(client)?;
|
||
|
BlockId { height, hash }
|
||
|
};
|
||
|
if update.insert_block(tip).is_err() {
|
||
|
// There has been a re-org before we even begin scanning addresses.
|
||
|
// Just recursively call (this should never happen).
|
||
|
return prepare_chain_update(client, local_chain);
|
||
|
}
|
||
|
|
||
|
Ok(update)
|
||
|
}
|
||
|
|
||
|
fn determine_tx_anchor(
|
||
|
anchor_block: BlockId,
|
||
|
raw_height: i32,
|
||
|
txid: Txid,
|
||
|
) -> Option<ConfirmationHeightAnchor> {
|
||
|
if txid
|
||
|
== Txid::from_hex("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b")
|
||
|
.expect("must deserialize genesis coinbase txid")
|
||
|
{
|
||
|
return Some(ConfirmationHeightAnchor {
|
||
|
anchor_block,
|
||
|
confirmation_height: 0,
|
||
|
});
|
||
|
}
|
||
|
match raw_height {
|
||
|
h if h <= 0 => {
|
||
|
debug_assert!(h == 0 || h == -1, "unexpected height ({}) from electrum", h);
|
||
|
None
|
||
|
}
|
||
|
h => {
|
||
|
let h = h as u32;
|
||
|
if h > anchor_block.height {
|
||
|
None
|
||
|
} else {
|
||
|
Some(ConfirmationHeightAnchor {
|
||
|
anchor_block,
|
||
|
confirmation_height: h,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
fn populate_with_outpoints<K>(
|
||
|
client: &Client,
|
||
|
anchor_block: BlockId,
|
||
|
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
|
||
|
outpoints: &mut impl Iterator<Item = OutPoint>,
|
||
|
) -> Result<HashMap<Txid, Transaction>, InternalError> {
|
||
|
let mut full_txs = HashMap::new();
|
||
|
for outpoint in outpoints {
|
||
|
let txid = outpoint.txid;
|
||
|
let tx = client.transaction_get(&txid)?;
|
||
|
debug_assert_eq!(tx.txid(), txid);
|
||
|
let txout = match tx.output.get(outpoint.vout as usize) {
|
||
|
Some(txout) => txout,
|
||
|
None => continue,
|
||
|
};
|
||
|
// attempt to find the following transactions (alongside their chain positions), and
|
||
|
// add to our sparsechain `update`:
|
||
|
let mut has_residing = false; // tx in which the outpoint resides
|
||
|
let mut has_spending = false; // tx that spends the outpoint
|
||
|
for res in client.script_get_history(&txout.script_pubkey)? {
|
||
|
if has_residing && has_spending {
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
if res.tx_hash == txid {
|
||
|
if has_residing {
|
||
|
continue;
|
||
|
}
|
||
|
has_residing = true;
|
||
|
full_txs.insert(res.tx_hash, tx.clone());
|
||
|
} else {
|
||
|
if has_spending {
|
||
|
continue;
|
||
|
}
|
||
|
let res_tx = match full_txs.get(&res.tx_hash) {
|
||
|
Some(tx) => tx,
|
||
|
None => {
|
||
|
let res_tx = client.transaction_get(&res.tx_hash)?;
|
||
|
full_txs.insert(res.tx_hash, res_tx);
|
||
|
full_txs.get(&res.tx_hash).expect("just inserted")
|
||
|
}
|
||
|
};
|
||
|
has_spending = res_tx
|
||
|
.input
|
||
|
.iter()
|
||
|
.any(|txin| txin.previous_output == outpoint);
|
||
|
if !has_spending {
|
||
|
continue;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
let anchor = determine_tx_anchor(anchor_block, res.height, res.tx_hash);
|
||
|
|
||
|
let tx_entry = update.graph_update.entry(res.tx_hash).or_default();
|
||
|
if let Some(anchor) = anchor {
|
||
|
tx_entry.insert(anchor);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
Ok(full_txs)
|
||
|
}
|
||
|
|
||
|
fn populate_with_txids<K>(
|
||
|
client: &Client,
|
||
|
anchor_block: BlockId,
|
||
|
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
|
||
|
txids: &mut impl Iterator<Item = Txid>,
|
||
|
) -> Result<(), InternalError> {
|
||
|
for txid in txids {
|
||
|
let tx = match client.transaction_get(&txid) {
|
||
|
Ok(tx) => tx,
|
||
|
Err(electrum_client::Error::Protocol(_)) => continue,
|
||
|
Err(other_err) => return Err(other_err.into()),
|
||
|
};
|
||
|
|
||
|
let spk = tx
|
||
|
.output
|
||
|
.get(0)
|
||
|
.map(|txo| &txo.script_pubkey)
|
||
|
.expect("tx must have an output");
|
||
|
|
||
|
let anchor = match client
|
||
|
.script_get_history(spk)?
|
||
|
.into_iter()
|
||
|
.find(|r| r.tx_hash == txid)
|
||
|
{
|
||
|
Some(r) => determine_tx_anchor(anchor_block, r.height, txid),
|
||
|
None => continue,
|
||
|
};
|
||
|
|
||
|
let tx_entry = update.graph_update.entry(txid).or_default();
|
||
|
if let Some(anchor) = anchor {
|
||
|
tx_entry.insert(anchor);
|
||
|
}
|
||
|
}
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
fn populate_with_spks<K, I: Ord + Clone>(
|
||
|
client: &Client,
|
||
|
anchor_block: BlockId,
|
||
|
update: &mut ElectrumUpdate<K, ConfirmationHeightAnchor>,
|
||
|
spks: &mut impl Iterator<Item = (I, Script)>,
|
||
|
stop_gap: usize,
|
||
|
batch_size: usize,
|
||
|
) -> Result<BTreeMap<I, (Script, bool)>, InternalError> {
|
||
|
let mut unused_spk_count = 0_usize;
|
||
|
let mut scanned_spks = BTreeMap::new();
|
||
|
|
||
|
loop {
|
||
|
let spks = (0..batch_size)
|
||
|
.map_while(|_| spks.next())
|
||
|
.collect::<Vec<_>>();
|
||
|
if spks.is_empty() {
|
||
|
return Ok(scanned_spks);
|
||
|
}
|
||
|
|
||
|
let spk_histories = client.batch_script_get_history(spks.iter().map(|(_, s)| s))?;
|
||
|
|
||
|
for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
|
||
|
if spk_history.is_empty() {
|
||
|
scanned_spks.insert(spk_index, (spk, false));
|
||
|
unused_spk_count += 1;
|
||
|
if unused_spk_count > stop_gap {
|
||
|
return Ok(scanned_spks);
|
||
|
}
|
||
|
continue;
|
||
|
} else {
|
||
|
scanned_spks.insert(spk_index, (spk, true));
|
||
|
unused_spk_count = 0;
|
||
|
}
|
||
|
|
||
|
for tx in spk_history {
|
||
|
let tx_entry = update.graph_update.entry(tx.tx_hash).or_default();
|
||
|
if let Some(anchor) = determine_tx_anchor(anchor_block, tx.height, tx.tx_hash) {
|
||
|
tx_entry.insert(anchor);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|