chain: improvements to IndexedTxGraph and TxGraph APIs

For `IndexedTxGraph`:
- Remove `InsertTxItem` type (this is too complex).
    - `batch_insert_relevant` now uses a simple tuple `(&tx, anchors)`.
    - `batch_insert` is now also removed, as the same functionality can be
      done elsewhere.
- Add internal helper method `index_tx_graph_changeset` so we don't need
  to create a seprate `TxGraph` update in each method.
- `batch_insert_<relevant>_unconfirmed` no longer takes in an option of
  last_seen.
- `batch_insert_unconfirmed` no longer takes a reference of a
  transaction (since we apply all transactions anyway, so there is no
  need to clone).

For `TxGraph`:
- Add `batch_insert_unconfirmed` method.
This commit is contained in:
志宇 2023-10-06 02:05:31 +08:00
parent 150d6f8ab6
commit 4f5695d43a
No known key found for this signature in database
GPG Key ID: F6345C9837C2BDE8
6 changed files with 104 additions and 115 deletions

View File

@ -509,7 +509,7 @@ impl<D> Wallet<D> {
where where
D: PersistBackend<ChangeSet>, D: PersistBackend<ChangeSet>,
{ {
let additions = self.indexed_graph.insert_txout(outpoint, &txout); let additions = self.indexed_graph.insert_txout(outpoint, txout);
self.persist.stage(ChangeSet::from(additions)); self.persist.stage(ChangeSet::from(additions));
} }

View File

@ -3,7 +3,6 @@ use std::collections::{BTreeMap, BTreeSet};
use bdk_bitcoind_rpc::Emitter; use bdk_bitcoind_rpc::Emitter;
use bdk_chain::{ use bdk_chain::{
bitcoin::{Address, Amount, BlockHash, Txid}, bitcoin::{Address, Amount, BlockHash, Txid},
indexed_tx_graph::InsertTxItem,
keychain::Balance, keychain::Balance,
local_chain::{self, CheckPoint, LocalChain}, local_chain::{self, CheckPoint, LocalChain},
Append, BlockId, IndexedTxGraph, SpkTxOutIndex, Append, BlockId, IndexedTxGraph, SpkTxOutIndex,
@ -180,17 +179,6 @@ fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Up
} }
} }
fn block_to_tx_graph_update(
block: &bitcoin::Block,
height: u32,
) -> impl Iterator<Item = InsertTxItem<'_, Option<BlockId>>> {
let anchor = BlockId {
hash: block.block_hash(),
height,
};
block.txdata.iter().map(move |tx| (tx, Some(anchor), None))
}
/// Ensure that blocks are emitted in order even after reorg. /// Ensure that blocks are emitted in order even after reorg.
/// ///
/// 1. Mine 101 blocks. /// 1. Mine 101 blocks.
@ -321,8 +309,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
while let Some((height, block)) = emitter.next_block()? { while let Some((height, block)) = emitter.next_block()? {
let _ = chain.apply_update(block_to_chain_update(&block, height))?; let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let indexed_additions = let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height));
assert!(indexed_additions.is_empty()); assert!(indexed_additions.is_empty());
} }
@ -350,8 +337,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
assert!(emitter.next_block()?.is_none()); assert!(emitter.next_block()?.is_none());
let mempool_txs = emitter.mempool()?; let mempool_txs = emitter.mempool()?;
let indexed_additions = indexed_tx_graph let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
assert_eq!( assert_eq!(
indexed_additions indexed_additions
.graph .graph
@ -383,8 +369,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
{ {
let (height, block) = emitter.next_block()?.expect("must get mined block"); let (height, block) = emitter.next_block()?.expect("must get mined block");
let _ = chain.apply_update(block_to_chain_update(&block, height))?; let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let indexed_additions = let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
indexed_tx_graph.batch_insert_relevant(block_to_tx_graph_update(&block, height));
assert!(indexed_additions.graph.txs.is_empty()); assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty()); assert!(indexed_additions.graph.txouts.is_empty());
assert_eq!(indexed_additions.graph.anchors, exp_anchors); assert_eq!(indexed_additions.graph.anchors, exp_anchors);

View File

@ -72,32 +72,34 @@ impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
where where
I::ChangeSet: Default + Append, I::ChangeSet: Default + Append,
{ {
fn index_tx_graph_changeset(
&mut self,
tx_graph_changeset: &tx_graph::ChangeSet<A>,
) -> I::ChangeSet {
let mut changeset = I::ChangeSet::default();
for added_tx in &tx_graph_changeset.txs {
changeset.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts {
changeset.append(self.index.index_txout(added_outpoint, added_txout));
}
changeset
}
/// Apply an `update` directly. /// Apply an `update` directly.
/// ///
/// `update` is a [`TxGraph<A>`] and the resultant changes is returned as [`ChangeSet`]. /// `update` is a [`TxGraph<A>`] and the resultant changes is returned as [`ChangeSet`].
pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A, I::ChangeSet> { pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.apply_update(update); let graph = self.graph.apply_update(update);
let indexer = self.index_tx_graph_changeset(&graph);
let mut indexer = I::ChangeSet::default();
for added_tx in &graph.txs {
indexer.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &graph.txouts {
indexer.append(self.index.index_txout(added_outpoint, added_txout));
}
ChangeSet { graph, indexer } ChangeSet { graph, indexer }
} }
/// Insert a floating `txout` of given `outpoint`. /// Insert a floating `txout` of given `outpoint`.
pub fn insert_txout( pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
&mut self, let graph = self.graph.insert_txout(outpoint, txout);
outpoint: OutPoint, let indexer = self.index_tx_graph_changeset(&graph);
txout: &TxOut, ChangeSet { graph, indexer }
) -> ChangeSet<A, I::ChangeSet> {
let mut update = TxGraph::<A>::default();
let _ = update.insert_txout(outpoint, txout.clone());
self.apply_update(update)
} }
/// Insert and index a transaction into the graph. /// Insert and index a transaction into the graph.
@ -112,18 +114,19 @@ where
) -> ChangeSet<A, I::ChangeSet> { ) -> ChangeSet<A, I::ChangeSet> {
let txid = tx.txid(); let txid = tx.txid();
let mut update = TxGraph::<A>::default(); let mut graph = tx_graph::ChangeSet::default();
if self.graph.get_tx(txid).is_none() { if self.graph.get_tx(txid).is_none() {
let _ = update.insert_tx(tx.clone()); graph.append(self.graph.insert_tx(tx.clone()));
} }
for anchor in anchors.into_iter() { for anchor in anchors.into_iter() {
let _ = update.insert_anchor(txid, anchor); graph.append(self.graph.insert_anchor(txid, anchor));
} }
if let Some(seen_at) = seen_at { if let Some(seen_at) = seen_at {
let _ = update.insert_seen_at(txid, seen_at); graph.append(self.graph.insert_seen_at(txid, seen_at));
} }
self.apply_update(update) let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
} }
/// Batch insert transactions, filtering out those that are irrelevant. /// Batch insert transactions, filtering out those that are irrelevant.
@ -132,7 +135,7 @@ where
/// transactions in `txs` will be ignored. `txs` do not need to be in topological order. /// transactions in `txs` will be ignored. `txs` do not need to be in topological order.
pub fn batch_insert_relevant<'t>( pub fn batch_insert_relevant<'t>(
&mut self, &mut self,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>, txs: impl IntoIterator<Item = (&'t Transaction, impl IntoIterator<Item = A>)>,
) -> ChangeSet<A, I::ChangeSet> { ) -> ChangeSet<A, I::ChangeSet> {
// The algorithm below allows for non-topologically ordered transactions by using two loops. // The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by: // This is achieved by:
@ -140,38 +143,25 @@ where
// not store anything about them. // not store anything about them.
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant` // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop). // returns true or not. (in a second loop).
let mut changeset = ChangeSet::<A, I::ChangeSet>::default(); let txs = txs.into_iter().collect::<Vec<_>>();
let txs = txs let mut indexer = I::ChangeSet::default();
.into_iter() for (tx, _) in &txs {
.inspect(|(tx, _, _)| changeset.indexer.append(self.index.index_tx(tx))) indexer.append(self.index.index_tx(tx));
.collect::<Vec<_>>(); }
for (tx, anchors, seen_at) in txs { let mut graph = tx_graph::ChangeSet::default();
for (tx, anchors) in txs {
if self.index.is_tx_relevant(tx) { if self.index.is_tx_relevant(tx) {
changeset.append(self.insert_tx(tx, anchors, seen_at)); let txid = tx.txid();
graph.append(self.graph.insert_tx(tx.clone()));
for anchor in anchors {
graph.append(self.graph.insert_anchor(txid, anchor));
}
} }
} }
changeset ChangeSet { graph, indexer }
}
/// Batch insert transactions.
///
/// All transactions in `txs` will be inserted. To filter out irrelevant transactions, use
/// [`batch_insert_relevant`] instead.
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
pub fn batch_insert<'t>(
&mut self,
txs: impl IntoIterator<Item = InsertTxItem<'t, impl IntoIterator<Item = A>>>,
) -> ChangeSet<A, I::ChangeSet> {
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
for (tx, anchors, seen_at) in txs {
changeset.indexer.append(self.index.index_tx(tx));
changeset.append(self.insert_tx(tx, anchors, seen_at));
}
changeset
} }
/// Batch insert unconfirmed transactions, filtering out those that are irrelevant. /// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
@ -179,38 +169,51 @@ where
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`. /// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored. /// Irrelevant tansactions in `txs` will be ignored.
/// ///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// The *last seen* communicates when the transaction is last seen in the mempool which is used /// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_relevant_unconfirmed<'t>( pub fn batch_insert_relevant_unconfirmed<'t>(
&mut self, &mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>, unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> { ) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert_relevant( // The algorithm below allows for non-topologically ordered transactions by using two loops.
unconfirmed_txs // This is achieved by:
.into_iter() // 1. insert all txs into the index. If they are irrelevant then that's fine it will just
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)), // not store anything about them.
) // 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let txs = unconfirmed_txs.into_iter().collect::<Vec<_>>();
let mut indexer = I::ChangeSet::default();
for (tx, _) in &txs {
indexer.append(self.index.index_tx(tx));
}
let graph = self.graph.batch_insert_unconfirmed(
txs.into_iter()
.filter(|(tx, _)| self.index.is_tx_relevant(tx))
.map(|(tx, seen_at)| (tx.clone(), seen_at)),
);
ChangeSet { graph, indexer }
} }
/// Batch insert unconfirmed transactions. /// Batch insert unconfirmed transactions.
/// ///
/// Items of `txs` are tuples containing the transaction and an optional *last seen* timestamp. /// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// The *last seen* communicates when the transaction is last seen in the mempool which is used /// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// for conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details). /// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
/// ///
/// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead. /// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead.
/// ///
/// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed /// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed
pub fn batch_insert_unconfirmed<'t>( pub fn batch_insert_unconfirmed(
&mut self, &mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, Option<u64>)>, txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> { ) -> ChangeSet<A, I::ChangeSet> {
self.batch_insert( let graph = self.graph.batch_insert_unconfirmed(txs);
unconfirmed_txs let indexer = self.index_tx_graph_changeset(&graph);
.into_iter() ChangeSet { graph, indexer }
.map(|(tx, last_seen)| (tx, core::iter::empty(), last_seen)),
)
} }
} }
@ -241,7 +244,6 @@ where
( (
tx, tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)), core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
None,
) )
}); });
self.batch_insert_relevant(txs) self.batch_insert_relevant(txs)
@ -260,30 +262,17 @@ where
hash: block.block_hash(), hash: block.block_hash(),
height, height,
}; };
let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| { let mut graph = tx_graph::ChangeSet::default();
( for (tx_pos, tx) in block.txdata.iter().enumerate() {
tx, let anchor = A::from_block_position(&block, block_id, tx_pos);
core::iter::once(A::from_block_position(&block, block_id, tx_pos)), graph.append(self.graph.insert_anchor(tx.txid(), anchor));
None, graph.append(self.graph.insert_tx(tx.clone()));
) }
}); let indexer = self.index_tx_graph_changeset(&graph);
self.batch_insert(txs) ChangeSet { graph, indexer }
} }
} }
/// A tuple of a transaction, and associated metadata, that are to be inserted into [`IndexedTxGraph`].
///
/// This tuple contains fields in the following order:
/// * A reference to the transaction.
/// * A collection of [`Anchor`]s.
/// * An optional last-seen timestamp.
///
/// This is used as a input item of [`batch_insert_relevant`] and [`batch_insert`].
///
/// [`batch_insert_relevant`]: IndexedTxGraph::batch_insert_relevant
/// [`batch_insert`]: IndexedTxGraph::batch_insert
pub type InsertTxItem<'t, A> = (&'t Transaction, A, Option<u64>);
/// A structure that represents changes to an [`IndexedTxGraph`]. /// A structure that represents changes to an [`IndexedTxGraph`].
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
#[cfg_attr( #[cfg_attr(

View File

@ -451,6 +451,23 @@ impl<A: Clone + Ord> TxGraph<A> {
self.apply_update(update) self.apply_update(update)
} }
/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_unconfirmed(
&mut self,
txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A> {
let mut changeset = ChangeSet::<A>::default();
for (tx, seen_at) in txs {
changeset.append(self.insert_seen_at(tx.txid(), seen_at));
changeset.append(self.insert_tx(tx));
}
changeset
}
/// Inserts the given `anchor` into [`TxGraph`]. /// Inserts the given `anchor` into [`TxGraph`].
/// ///
/// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in /// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in

View File

@ -74,7 +74,7 @@ fn insert_relevant_txs() {
}; };
assert_eq!( assert_eq!(
graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None, None))), graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None))),
changeset, changeset,
); );
@ -225,11 +225,10 @@ fn test_list_owned_txouts() {
anchor_block, anchor_block,
confirmation_height: anchor_block.height, confirmation_height: anchor_block.height,
}), }),
None,
) )
})); }));
let _ = graph.batch_insert_relevant([&tx4, &tx5].iter().map(|tx| (*tx, None, Some(100)))); let _ = graph.batch_insert_relevant_unconfirmed([&tx4, &tx5].iter().map(|tx| (*tx, 100)));
// A helper lambda to extract and filter data from the graph. // A helper lambda to extract and filter data from the graph.
let fetch = let fetch =

View File

@ -212,8 +212,7 @@ fn main() -> anyhow::Result<()> {
// mempool // mempool
let mempool_txs = emitter.mempool()?; let mempool_txs = emitter.mempool()?;
let graph_changeset = graph let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs);
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
db.stage((local_chain::ChangeSet::default(), graph_changeset)); db.stage((local_chain::ChangeSet::default(), graph_changeset));
// commit one last time! // commit one last time!
@ -291,7 +290,7 @@ fn main() -> anyhow::Result<()> {
} }
Emission::Mempool(mempool_txs) => { Emission::Mempool(mempool_txs) => {
let graph_changeset = graph.batch_insert_relevant_unconfirmed( let graph_changeset = graph.batch_insert_relevant_unconfirmed(
mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))), mempool_txs.iter().map(|(tx, time)| (tx, *time)),
); );
(local_chain::ChangeSet::default(), graph_changeset) (local_chain::ChangeSet::default(), graph_changeset)
} }