Merge bitcoindevkit/bdk#1041: Add bitcoind_rpc chain source module.

85c62532a55cfc94eade4d20ca3075dc4cd4882e docs(bitcoind_rpc): better `Emitter::mempool` explanation (志宇)
b69c13ddf6aa7cfb9be8c841b255e7f5f13ad328 example_bitcoind_rpc: tweaks (志宇)
5f34df8489fedae2aa3fd001036cc9ef6abe9a7a bitcoind_rpc!: bring back `CheckPoint`s to `Emitter` (志宇)
57590e0a1f2dad09a63fadb11f01e9f704cdcffb bitcoind_rpc: rm `BlockHash` from `Emitter::last_mempool_tip` (志宇)
6d4b33ef91a6c3e3443f6321cf3e3d186f77c595 chain: split `IndexedTxGraph::insert_tx` into 3 methods (志宇)
4f5695d43add3eab37ab12e897ac7c49f0d0787e chain: improvements to `IndexedTxGraph` and `TxGraph` APIs (志宇)
150d6f8ab6cd1eb1c9448d61e7bd71db0dd32a01 feat(example_bitcoind_rpc_polling): add example for RPC polling (志宇)
4f10463d9eaad9365b87dd99d49f0ddb8be673ee test(bitcoind_rpc): add no_agreement_point test (志宇)
a73dac2d91b29c4ba05f606f81e511fbf1f9ec7f test(bitcoind_rpc): initial tests for `Emitter` (志宇)
bb7424d11d3a4bd837ddde0f42f9abd93d56aee1 feat(bitcoind_rpc): introduce `bitcoind_rpc` crate (志宇)
240657b1674ad901c77090ee1fa96dc0d71e91d7 chain: add batch-insert methods for `IndexedTxGraph` (志宇)
43bc813c6498b5b021d70a6127ff7e57d7337813 chain: add helper methods on `CheckPoint` (志宇)
b3db5ca9df4302fddcd5474569849371f2be7e2d feat(chain): add `AnchorFromBlockPosition` trait (志宇)
f795a43cc72fdb4ef26ca349c4cb4f4bfd3b90b5 feat(example_cli): allow chain specific args in examples (志宇)

Pull request description:

  ### Description

  This PR builds on top of #1034 and adds the `bitcoind_rpc` chain-src module and example.

  ### Notes to the reviewers

  Don't merge this until #1034 is in!

  ### Changelog notice

  * Add `bitcoind_rpc` chain-source module.
  * Add `example_bitcoind_rpc` example module.
  * Add `AnchorFromBlockPosition` trait which are for anchors that can be constructed from a given block, height and position in block.
  * Add helper methods to `IndexedTxGraph` and `TxGraph` for batch operations and applying blocks directly.
  * Add helper methods to `CheckPoint` for easier construction from a block `Header`.

  ### Checklists

  * [x] Add test: we should detect when an initially-confirmed transaction is "unconfirmed" during a reorg.
  * [x] Improve `example_bitcoind_rpc`: add `live` command.
  * [x] Improve docs.
  * [x] Reintroduce `CheckPoint`.

  #### All Submissions:

  * [x] I've signed all my commits
  * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md)
  * [x] I ran `cargo fmt` and `cargo clippy` before committing

  #### New Features:

  * [x] I've added tests for the new feature
  * [x] I've added docs for the new feature

ACKs for top commit:
  notmandatory:
    Re ACK 85c62532a55cfc94eade4d20ca3075dc4cd4882e

Tree-SHA512: 88dbafbebaf227b18c69f2ea884e3e586bf9c11e5e450eb4872ade1d1ccd5cf1e33ce9930a6f5aa918baa3e92add7503858b039b8c9d553a281ad6d833f08a49
This commit is contained in:
Steve Myers 2023-10-09 15:46:48 -05:00
commit 38d69c947c
No known key found for this signature in database
GPG Key ID: 8105A46B22C2D051
16 changed files with 2097 additions and 350 deletions

View File

@ -6,9 +6,11 @@ members = [
"crates/file_store",
"crates/electrum",
"crates/esplora",
"crates/bitcoind_rpc",
"example-crates/example_cli",
"example-crates/example_electrum",
"example-crates/example_esplora",
"example-crates/example_bitcoind_rpc_polling",
"example-crates/wallet_electrum",
"example-crates/wallet_esplora_blocking",
"example-crates/wallet_esplora_async",

View File

@ -509,7 +509,7 @@ impl<D> Wallet<D> {
where
D: PersistBackend<ChangeSet>,
{
let additions = self.indexed_graph.insert_txout(outpoint, &txout);
let additions = self.indexed_graph.insert_txout(outpoint, txout);
self.persist.stage(ChangeSet::from(additions));
}
@ -738,7 +738,16 @@ impl<D> Wallet<D> {
ConfirmationTime::Unconfirmed { last_seen } => (None, Some(last_seen)),
};
let changeset: ChangeSet = self.indexed_graph.insert_tx(&tx, anchor, last_seen).into();
let mut changeset = ChangeSet::default();
let txid = tx.txid();
changeset.append(self.indexed_graph.insert_tx(tx).into());
if let Some(anchor) = anchor {
changeset.append(self.indexed_graph.insert_anchor(txid, anchor).into());
}
if let Some(last_seen) = last_seen {
changeset.append(self.indexed_graph.insert_seen_at(txid, last_seen).into());
}
let changed = !changeset.is_empty();
self.persist.stage(changeset);
Ok(changed)

View File

@ -0,0 +1,21 @@
[package]
name = "bdk_bitcoind_rpc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# For no-std, remember to enable the bitcoin/no-std feature
bitcoin = { version = "0.30", default-features = false }
bitcoincore-rpc = { version = "0.17" }
bdk_chain = { path = "../chain", version = "0.5", default-features = false }
[dev-dependencies]
bitcoind = { version = "0.33", features = ["25_0"] }
anyhow = { version = "1" }
[features]
default = ["std"]
std = ["bitcoin/std", "bdk_chain/std"]
serde = ["bitcoin/serde", "bdk_chain/serde"]

View File

@ -0,0 +1,301 @@
//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface. It does not
//! use the wallet RPC API, so this crate can be used with wallet-disabled Bitcoin Core nodes.
//!
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
//!
//! To only get block updates (exclude mempool transactions), the caller can use
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
//! mempool.
#![warn(missing_docs)]
use bdk_chain::{local_chain::CheckPoint, BlockId};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
pub use bitcoincore_rpc;
use bitcoincore_rpc::bitcoincore_rpc_json;
/// A structure that emits data sourced from [`bitcoincore_rpc::Client`].
///
/// Refer to [module-level documentation] for more.
///
/// [module-level documentation]: crate
pub struct Emitter<'c, C> {
client: &'c C,
start_height: u32,
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
/// that the block is no longer in the best chain, it will be popped off from here.
last_cp: Option<CheckPoint>,
/// The block result returned from rpc of the last-emitted block. As this result contains the
/// next block's block hash (which we use to fetch the next block), we set this to `None`
/// whenever there are no more blocks, or the next block is no longer in the best chain. This
/// gives us an opportunity to re-fetch this result.
last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
/// The latest first-seen epoch of emitted mempool transactions. This is used to determine
/// whether a mempool transaction is already emitted.
last_mempool_time: usize,
/// The last emitted block during our last mempool emission. This is used to determine whether
/// there has been a reorg since our last mempool emission.
last_mempool_tip: Option<u32>,
}
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
/// Construct a new [`Emitter`] with the given RPC `client` and `start_height`.
///
/// `start_height` is the block height to start emitting blocks from.
pub fn from_height(client: &'c C, start_height: u32) -> Self {
Self {
client,
start_height,
last_cp: None,
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
}
}
/// Construct a new [`Emitter`] with the given RPC `client` and `checkpoint`.
///
/// `checkpoint` is used to find the latest block which is still part of the best chain. The
/// [`Emitter`] will emit blocks starting right above this block.
pub fn from_checkpoint(client: &'c C, checkpoint: CheckPoint) -> Self {
Self {
client,
start_height: 0,
last_cp: Some(checkpoint),
last_block: None,
last_mempool_time: 0,
last_mempool_tip: None,
}
}
/// Emit mempool transactions, alongside their first-seen unix timestamps.
///
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
/// ancestors are already emitted.
///
/// To understand why, consider a receiver which filters transactions based on whether it
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
/// at height `h`.
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
let client = self.client;
// This is the emitted tip height during the last mempool emission.
let prev_mempool_tip = self
.last_mempool_tip
// We use `start_height - 1` as we cannot guarantee that the block at
// `start_height` has been emitted.
.unwrap_or(self.start_height.saturating_sub(1));
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
// be the new latest timestamp.
let prev_mempool_time = self.last_mempool_time;
let mut latest_time = prev_mempool_time;
let txs_to_emit = client
.get_raw_mempool_verbose()?
.into_iter()
.filter_map({
let latest_time = &mut latest_time;
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
let tx_time = tx_entry.time as usize;
if tx_time > *latest_time {
*latest_time = tx_time;
}
// Avoid emitting transactions that are already emitted if we can guarantee
// blocks containing ancestors are already emitted. The bitcoind rpc interface
// provides us with the block height that the tx is introduced to the mempool.
// If we have already emitted the block of height, we can assume that all
// ancestor txs have been processed by the receiver.
let is_already_emitted = tx_time <= prev_mempool_time;
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
if is_already_emitted && is_within_height {
return None;
}
let tx = match client.get_raw_transaction(&txid, None) {
Ok(tx) => tx,
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
Err(err) if err.is_not_found_error() => return None,
Err(err) => return Some(Err(err)),
};
Some(Ok((tx, tx_time as u64)))
}
})
.collect::<Result<Vec<_>, _>>()?;
self.last_mempool_time = latest_time;
self.last_mempool_tip = self.last_cp.as_ref().map(|cp| cp.height());
Ok(txs_to_emit)
}
/// Emit the next block height and header (if any).
pub fn next_header(&mut self) -> Result<Option<(u32, Header)>, bitcoincore_rpc::Error> {
poll(self, |hash| self.client.get_block_header(hash))
}
/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<(u32, Block)>, bitcoincore_rpc::Error> {
poll(self, |hash| self.client.get_block(hash))
}
}
enum PollResponse {
Block(bitcoincore_rpc_json::GetBlockResult),
NoMoreBlocks,
/// Fetched block is not in the best chain.
BlockNotInBestChain,
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint),
AgreementPointNotFound,
}
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
{
let client = emitter.client;
if let Some(last_res) = &emitter.last_block {
assert!(
emitter.last_cp.is_some(),
"must not have block result without last cp"
);
let next_hash = match last_res.nextblockhash {
None => return Ok(PollResponse::NoMoreBlocks),
Some(next_hash) => next_hash,
};
let res = client.get_block_info(&next_hash)?;
if res.confirmations < 0 {
return Ok(PollResponse::BlockNotInBestChain);
}
return Ok(PollResponse::Block(res));
}
if emitter.last_cp.is_none() {
let hash = client.get_block_hash(emitter.start_height as _)?;
let res = client.get_block_info(&hash)?;
if res.confirmations < 0 {
return Ok(PollResponse::BlockNotInBestChain);
}
return Ok(PollResponse::Block(res));
}
for cp in emitter.last_cp.iter().flat_map(CheckPoint::iter) {
let res = client.get_block_info(&cp.hash())?;
if res.confirmations < 0 {
// block is not in best chain
continue;
}
// agreement point found
return Ok(PollResponse::AgreementFound(res, cp));
}
Ok(PollResponse::AgreementPointNotFound)
}
fn poll<C, V, F>(
emitter: &mut Emitter<C>,
get_item: F,
) -> Result<Option<(u32, V)>, bitcoincore_rpc::Error>
where
C: bitcoincore_rpc::RpcApi,
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
{
loop {
match poll_once(emitter)? {
PollResponse::Block(res) => {
let height = res.height as u32;
let hash = res.hash;
let item = get_item(&hash)?;
let this_id = BlockId { height, hash };
let prev_id = res.previousblockhash.map(|prev_hash| BlockId {
height: height - 1,
hash: prev_hash,
});
match (&mut emitter.last_cp, prev_id) {
(Some(cp), _) => *cp = cp.clone().push(this_id).expect("must push"),
(last_cp, None) => *last_cp = Some(CheckPoint::new(this_id)),
// When the receiver constructs a local_chain update from a block, the previous
// checkpoint is also included in the update. We need to reflect this state in
// `Emitter::last_cp` as well.
(last_cp, Some(prev_id)) => {
*last_cp = Some(CheckPoint::new(prev_id).push(this_id).expect("must push"))
}
}
emitter.last_block = Some(res);
return Ok(Some((height, item)));
}
PollResponse::NoMoreBlocks => {
emitter.last_block = None;
return Ok(None);
}
PollResponse::BlockNotInBestChain => {
emitter.last_block = None;
continue;
}
PollResponse::AgreementFound(res, cp) => {
let agreement_h = res.height as u32;
// get rid of evicted blocks
emitter.last_cp = Some(cp);
// The tip during the last mempool emission needs to in the best chain, we reduce
// it if it is not.
if let Some(h) = emitter.last_mempool_tip.as_mut() {
if *h > agreement_h {
*h = agreement_h;
}
}
emitter.last_block = Some(res);
continue;
}
PollResponse::AgreementPointNotFound => {
// We want to clear `last_cp` and set `start_height` to the first checkpoint's
// height. This way, the first checkpoint in `LocalChain` can be replaced.
if let Some(last_cp) = emitter.last_cp.take() {
emitter.start_height = last_cp.height();
}
emitter.last_block = None;
continue;
}
}
}
}
/// Extends [`bitcoincore_rpc::Error`].
pub trait BitcoindRpcErrorExt {
/// Returns whether the error is a "not found" error.
///
/// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as
/// [`Iterator::Item`].
fn is_not_found_error(&self) -> bool;
}
impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
fn is_not_found_error(&self) -> bool {
if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self
{
rpc_err.code == -5
} else {
false
}
}
}

View File

@ -0,0 +1,826 @@
use std::collections::{BTreeMap, BTreeSet};
use bdk_bitcoind_rpc::Emitter;
use bdk_chain::{
bitcoin::{Address, Amount, BlockHash, Txid},
keychain::Balance,
local_chain::{self, CheckPoint, LocalChain},
Append, BlockId, IndexedTxGraph, SpkTxOutIndex,
};
use bitcoin::{
address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash,
secp256k1::rand::random, Block, CompactTarget, OutPoint, ScriptBuf, ScriptHash, Transaction,
TxIn, TxOut, WScriptHash,
};
use bitcoincore_rpc::{
bitcoincore_rpc_json::{GetBlockTemplateModes, GetBlockTemplateRules},
RpcApi,
};
struct TestEnv {
#[allow(dead_code)]
daemon: bitcoind::BitcoinD,
client: bitcoincore_rpc::Client,
}
impl TestEnv {
fn new() -> anyhow::Result<Self> {
let daemon = match std::env::var_os("TEST_BITCOIND") {
Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path),
None => bitcoind::BitcoinD::from_downloaded(),
}?;
let client = bitcoincore_rpc::Client::new(
&daemon.rpc_url(),
bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()),
)?;
Ok(Self { daemon, client })
}
fn mine_blocks(
&self,
count: usize,
address: Option<Address>,
) -> anyhow::Result<Vec<BlockHash>> {
let coinbase_address = match address {
Some(address) => address,
None => self.client.get_new_address(None, None)?.assume_checked(),
};
let block_hashes = self
.client
.generate_to_address(count as _, &coinbase_address)?;
Ok(block_hashes)
}
fn mine_empty_block(&self) -> anyhow::Result<(usize, BlockHash)> {
let bt = self.client.get_block_template(
GetBlockTemplateModes::Template,
&[GetBlockTemplateRules::SegWit],
&[],
)?;
let txdata = vec![Transaction {
version: 1,
lock_time: bitcoin::absolute::LockTime::from_height(0)?,
input: vec![TxIn {
previous_output: bitcoin::OutPoint::default(),
script_sig: ScriptBuf::builder()
.push_int(bt.height as _)
// randomn number so that re-mining creates unique block
.push_int(random())
.into_script(),
sequence: bitcoin::Sequence::default(),
witness: bitcoin::Witness::new(),
}],
output: vec![TxOut {
value: 0,
script_pubkey: ScriptBuf::new_p2sh(&ScriptHash::all_zeros()),
}],
}];
let bits: [u8; 4] = bt
.bits
.clone()
.try_into()
.expect("rpc provided us with invalid bits");
let mut block = Block {
header: Header {
version: bitcoin::block::Version::default(),
prev_blockhash: bt.previous_block_hash,
merkle_root: TxMerkleNode::all_zeros(),
time: Ord::max(bt.min_time, std::time::UNIX_EPOCH.elapsed()?.as_secs()) as u32,
bits: CompactTarget::from_consensus(u32::from_be_bytes(bits)),
nonce: 0,
},
txdata,
};
block.header.merkle_root = block.compute_merkle_root().expect("must compute");
for nonce in 0..=u32::MAX {
block.header.nonce = nonce;
if block.header.target().is_met_by(block.block_hash()) {
break;
}
}
self.client.submit_block(&block)?;
Ok((bt.height as usize, block.block_hash()))
}
fn invalidate_blocks(&self, count: usize) -> anyhow::Result<()> {
let mut hash = self.client.get_best_block_hash()?;
for _ in 0..count {
let prev_hash = self.client.get_block_info(&hash)?.previousblockhash;
self.client.invalidate_block(&hash)?;
match prev_hash {
Some(prev_hash) => hash = prev_hash,
None => break,
}
}
Ok(())
}
fn reorg(&self, count: usize) -> anyhow::Result<Vec<BlockHash>> {
let start_height = self.client.get_block_count()?;
self.invalidate_blocks(count)?;
let res = self.mine_blocks(count, None);
assert_eq!(
self.client.get_block_count()?,
start_height,
"reorg should not result in height change"
);
res
}
fn reorg_empty_blocks(&self, count: usize) -> anyhow::Result<Vec<(usize, BlockHash)>> {
let start_height = self.client.get_block_count()?;
self.invalidate_blocks(count)?;
let res = (0..count)
.map(|_| self.mine_empty_block())
.collect::<Result<Vec<_>, _>>()?;
assert_eq!(
self.client.get_block_count()?,
start_height,
"reorg should not result in height change"
);
Ok(res)
}
fn send(&self, address: &Address<NetworkChecked>, amount: Amount) -> anyhow::Result<Txid> {
let txid = self
.client
.send_to_address(address, amount, None, None, None, None, None, None)?;
Ok(txid)
}
}
fn block_to_chain_update(block: &bitcoin::Block, height: u32) -> local_chain::Update {
let this_id = BlockId {
height,
hash: block.block_hash(),
};
let tip = if block.header.prev_blockhash == BlockHash::all_zeros() {
CheckPoint::new(this_id)
} else {
CheckPoint::new(BlockId {
height: height - 1,
hash: block.header.prev_blockhash,
})
.extend(core::iter::once(this_id))
.expect("must construct checkpoint")
};
local_chain::Update {
tip,
introduce_older_blocks: false,
}
}
/// Ensure that blocks are emitted in order even after reorg.
///
/// 1. Mine 101 blocks.
/// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`].
/// 3. Reorg highest 6 blocks.
/// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`].
#[test]
pub fn test_sync_local_chain() -> anyhow::Result<()> {
let env = TestEnv::new()?;
let mut local_chain = LocalChain::default();
let mut emitter = Emitter::from_height(&env.client, 0);
// mine some blocks and returned the actual block hashes
let exp_hashes = {
let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block
hashes.extend(env.mine_blocks(101, None)?);
hashes
};
// see if the emitter outputs the right blocks
println!("first sync:");
while let Some((height, block)) = emitter.next_block()? {
assert_eq!(
block.block_hash(),
exp_hashes[height as usize],
"emitted block hash is unexpected"
);
let chain_update = block_to_chain_update(&block, height);
assert_eq!(
local_chain.apply_update(chain_update)?,
BTreeMap::from([(height, Some(block.block_hash()))]),
"chain update changeset is unexpected",
);
}
assert_eq!(
local_chain.blocks(),
&exp_hashes
.iter()
.enumerate()
.map(|(i, hash)| (i as u32, *hash))
.collect(),
"final local_chain state is unexpected",
);
// perform reorg
let reorged_blocks = env.reorg(6)?;
let exp_hashes = exp_hashes
.iter()
.take(exp_hashes.len() - reorged_blocks.len())
.chain(&reorged_blocks)
.cloned()
.collect::<Vec<_>>();
// see if the emitter outputs the right blocks
println!("after reorg:");
let mut exp_height = exp_hashes.len() - reorged_blocks.len();
while let Some((height, block)) = emitter.next_block()? {
assert_eq!(
height, exp_height as u32,
"emitted block has unexpected height"
);
assert_eq!(
block.block_hash(),
exp_hashes[height as usize],
"emitted block is unexpected"
);
let chain_update = block_to_chain_update(&block, height);
assert_eq!(
local_chain.apply_update(chain_update)?,
if exp_height == exp_hashes.len() - reorged_blocks.len() {
core::iter::once((height, Some(block.block_hash())))
.chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None)))
.collect::<bdk_chain::local_chain::ChangeSet>()
} else {
BTreeMap::from([(height, Some(block.block_hash()))])
},
"chain update changeset is unexpected",
);
exp_height += 1;
}
assert_eq!(
local_chain.blocks(),
&exp_hashes
.iter()
.enumerate()
.map(|(i, hash)| (i as u32, *hash))
.collect(),
"final local_chain state is unexpected after reorg",
);
Ok(())
}
/// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and
/// block updates.
///
/// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update
#[test]
fn test_into_tx_graph() -> anyhow::Result<()> {
let env = TestEnv::new()?;
println!("getting new addresses!");
let addr_0 = env.client.get_new_address(None, None)?.assume_checked();
let addr_1 = env.client.get_new_address(None, None)?.assume_checked();
let addr_2 = env.client.get_new_address(None, None)?.assume_checked();
println!("got new addresses!");
println!("mining block!");
env.mine_blocks(101, None)?;
println!("mined blocks!");
let mut chain = LocalChain::default();
let mut indexed_tx_graph = IndexedTxGraph::<BlockId, _>::new({
let mut index = SpkTxOutIndex::<usize>::default();
index.insert_spk(0, addr_0.script_pubkey());
index.insert_spk(1, addr_1.script_pubkey());
index.insert_spk(2, addr_2.script_pubkey());
index
});
let emitter = &mut Emitter::from_height(&env.client, 0);
while let Some((height, block)) = emitter.next_block()? {
let _ = chain.apply_update(block_to_chain_update(&block, height))?;
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.is_empty());
}
// send 3 txs to a tracked address, these txs will be in the mempool
let exp_txids = {
let mut txids = BTreeSet::new();
for _ in 0..3 {
txids.insert(env.client.send_to_address(
&addr_0,
Amount::from_sat(10_000),
None,
None,
None,
None,
None,
None,
)?);
}
txids
};
// expect that the next block should be none and we should get 3 txs from mempool
{
// next block should be `None`
assert!(emitter.next_block()?.is_none());
let mempool_txs = emitter.mempool()?;
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
assert_eq!(
indexed_additions
.graph
.txs
.iter()
.map(|tx| tx.txid())
.collect::<BTreeSet<Txid>>(),
exp_txids,
"changeset should have the 3 mempool transactions",
);
assert!(indexed_additions.graph.anchors.is_empty());
}
// mine a block that confirms the 3 txs
let exp_block_hash = env.mine_blocks(1, None)?[0];
let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32;
let exp_anchors = exp_txids
.iter()
.map({
let anchor = BlockId {
height: exp_block_height,
hash: exp_block_hash,
};
move |&txid| (anchor, txid)
})
.collect::<BTreeSet<_>>();
// must receive mined block which will confirm the transactions.
{
let (height, block) = emitter.next_block()?.expect("must get mined block");
let _ = chain
.apply_update(CheckPoint::from_header(&block.header, height).into_update(false))?;
let indexed_additions = indexed_tx_graph.apply_block_relevant(block, height);
assert!(indexed_additions.graph.txs.is_empty());
assert!(indexed_additions.graph.txouts.is_empty());
assert_eq!(indexed_additions.graph.anchors, exp_anchors);
}
Ok(())
}
/// Ensure next block emitted after reorg is at reorg height.
///
/// After a reorg, if the last-emitted block height is equal or greater than the reorg height, and
/// the fallback height is equal to or lower than the reorg height, the next block/header emission
/// should be at the reorg height.
///
/// TODO: If the reorg height is lower than the fallback height, how do we find a block height to
/// emit that can connect with our receiver chain?
#[test]
fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
const EMITTER_START_HEIGHT: usize = 100;
const CHAIN_TIP_HEIGHT: usize = 110;
let env = TestEnv::new()?;
let mut emitter = Emitter::from_height(&env.client, EMITTER_START_HEIGHT as _);
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
while emitter.next_header()?.is_some() {}
for reorg_count in 1..=10 {
let replaced_blocks = env.reorg_empty_blocks(reorg_count)?;
let (height, next_header) = emitter.next_header()?.expect("must emit block after reorg");
assert_eq!(
(height as usize, next_header.block_hash()),
replaced_blocks[0],
"block emitted after reorg should be at the reorg height"
);
while emitter.next_header()?.is_some() {}
}
Ok(())
}
fn process_block(
recv_chain: &mut LocalChain,
recv_graph: &mut IndexedTxGraph<BlockId, SpkTxOutIndex<()>>,
block: Block,
block_height: u32,
) -> anyhow::Result<()> {
recv_chain
.apply_update(CheckPoint::from_header(&block.header, block_height).into_update(false))?;
let _ = recv_graph.apply_block(block, block_height);
Ok(())
}
fn sync_from_emitter<C>(
recv_chain: &mut LocalChain,
recv_graph: &mut IndexedTxGraph<BlockId, SpkTxOutIndex<()>>,
emitter: &mut Emitter<C>,
) -> anyhow::Result<()>
where
C: bitcoincore_rpc::RpcApi,
{
while let Some((height, block)) = emitter.next_block()? {
process_block(recv_chain, recv_graph, block, height)?;
}
Ok(())
}
fn get_balance(
recv_chain: &LocalChain,
recv_graph: &IndexedTxGraph<BlockId, SpkTxOutIndex<()>>,
) -> anyhow::Result<Balance> {
let chain_tip = recv_chain
.tip()
.map_or(BlockId::default(), |cp| cp.block_id());
let outpoints = recv_graph.index.outpoints().clone();
let balance = recv_graph
.graph()
.balance(recv_chain, chain_tip, outpoints, |_, _| true);
Ok(balance)
}
/// If a block is reorged out, ensure that containing transactions that do not exist in the
/// replacement block(s) become unconfirmed.
#[test]
fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
const PREMINE_COUNT: usize = 101;
const ADDITIONAL_COUNT: usize = 11;
const SEND_AMOUNT: Amount = Amount::from_sat(10_000);
let env = TestEnv::new()?;
let mut emitter = Emitter::from_height(&env.client, 0);
// setup addresses
let addr_to_mine = env.client.get_new_address(None, None)?.assume_checked();
let spk_to_track = ScriptBuf::new_v0_p2wsh(&WScriptHash::all_zeros());
let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?;
// setup receiver
let mut recv_chain = LocalChain::default();
let mut recv_graph = IndexedTxGraph::<BlockId, _>::new({
let mut recv_index = SpkTxOutIndex::default();
recv_index.insert_spk((), spk_to_track.clone());
recv_index
});
// mine and sync receiver up to tip
env.mine_blocks(PREMINE_COUNT, Some(addr_to_mine))?;
// create transactions that are tracked by our receiver
for _ in 0..ADDITIONAL_COUNT {
let txid = env.send(&addr_to_track, SEND_AMOUNT)?;
// lock outputs that send to `addr_to_track`
let outpoints_to_lock = env
.client
.get_transaction(&txid, None)?
.transaction()?
.output
.into_iter()
.enumerate()
.filter(|(_, txo)| txo.script_pubkey == spk_to_track)
.map(|(vout, _)| OutPoint::new(txid, vout as _))
.collect::<Vec<_>>();
env.client.lock_unspent(&outpoints_to_lock)?;
let _ = env.mine_blocks(1, None)?;
}
// get emitter up to tip
sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?;
assert_eq!(
get_balance(&recv_chain, &recv_graph)?,
Balance {
confirmed: SEND_AMOUNT.to_sat() * ADDITIONAL_COUNT as u64,
..Balance::default()
},
"initial balance must be correct",
);
// perform reorgs with different depths
for reorg_count in 1..=ADDITIONAL_COUNT {
env.reorg_empty_blocks(reorg_count)?;
sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?;
assert_eq!(
get_balance(&recv_chain, &recv_graph)?,
Balance {
confirmed: SEND_AMOUNT.to_sat() * (ADDITIONAL_COUNT - reorg_count) as u64,
trusted_pending: SEND_AMOUNT.to_sat() * reorg_count as u64,
..Balance::default()
},
"reorg_count: {}",
reorg_count,
);
}
Ok(())
}
/// Ensure avoid-re-emission-logic is sound when [`Emitter`] is synced to tip.
///
/// The receiver (bdk_chain structures) is synced to the chain tip, and there is txs in the mempool.
/// When we call Emitter::mempool multiple times, mempool txs should not be re-emitted, even if the
/// chain tip is extended.
#[test]
fn mempool_avoids_re_emission() -> anyhow::Result<()> {
const BLOCKS_TO_MINE: usize = 101;
const MEMPOOL_TX_COUNT: usize = 2;
let env = TestEnv::new()?;
let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks and sync up emitter
let addr = env.client.get_new_address(None, None)?.assume_checked();
env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?;
while emitter.next_header()?.is_some() {}
// have some random txs in mempool
let exp_txids = (0..MEMPOOL_TX_COUNT)
.map(|_| env.send(&addr, Amount::from_sat(2100)))
.collect::<Result<BTreeSet<Txid>, _>>()?;
// the first emission should include all transactions
let emitted_txids = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<Txid>>();
assert_eq!(
emitted_txids, exp_txids,
"all mempool txs should be emitted"
);
// second emission should be empty
assert!(
emitter.mempool()?.is_empty(),
"second emission should be empty"
);
// mine empty blocks + sync up our emitter -> we should still not re-emit
for _ in 0..BLOCKS_TO_MINE {
env.mine_empty_block()?;
}
while emitter.next_header()?.is_some() {}
assert!(
emitter.mempool()?.is_empty(),
"third emission, after chain tip is extended, should also be empty"
);
Ok(())
}
/// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction
/// height.
///
/// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous
/// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check
/// that `mempool` should always re-emit txs that have introduced at a height greater than the last
/// emitted block height.
#[test]
fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> {
const PREMINE_COUNT: usize = 101;
const MEMPOOL_TX_COUNT: usize = 21;
let env = TestEnv::new()?;
let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks to get initial balance, sync emitter up to tip
let addr = env.client.get_new_address(None, None)?.assume_checked();
env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
while emitter.next_header()?.is_some() {}
// mine blocks to introduce txs to mempool at different heights
let tx_introductions = (0..MEMPOOL_TX_COUNT)
.map(|_| -> anyhow::Result<_> {
let (height, _) = env.mine_empty_block()?;
let txid = env.send(&addr, Amount::from_sat(2100))?;
Ok((height, txid))
})
.collect::<anyhow::Result<BTreeSet<_>>>()?;
assert_eq!(
emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>(),
tx_introductions.iter().map(|&(_, txid)| txid).collect(),
"first mempool emission should include all txs",
);
assert_eq!(
emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>(),
tx_introductions.iter().map(|&(_, txid)| txid).collect(),
"second mempool emission should still include all txs",
);
// At this point, the emitter has seen all mempool transactions. It should only re-emit those
// that have introduction heights less than the emitter's last-emitted block tip.
while let Some((height, _)) = emitter.next_header()? {
// We call `mempool()` twice.
// The second call (at height `h`) should skip the tx introduced at height `h`.
for try_index in 0..2 {
let exp_txids = tx_introductions
.range((height as usize + try_index, Txid::all_zeros())..)
.map(|&(_, txid)| txid)
.collect::<BTreeSet<_>>();
let emitted_txids = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>();
assert_eq!(
emitted_txids, exp_txids,
"\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}",
height,
try_index,
exp_txids
.difference(&emitted_txids)
.map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap()))
.collect::<Vec<_>>(),
emitted_txids
.difference(&exp_txids)
.map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap()))
.collect::<Vec<_>>(),
);
}
}
Ok(())
}
/// Ensure we force re-emit all mempool txs after reorg.
#[test]
fn mempool_during_reorg() -> anyhow::Result<()> {
const TIP_DIFF: usize = 10;
const PREMINE_COUNT: usize = 101;
let env = TestEnv::new()?;
let mut emitter = Emitter::from_height(&env.client, 0);
// mine blocks to get initial balance
let addr = env.client.get_new_address(None, None)?.assume_checked();
env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?;
// introduce mempool tx at each block extension
for _ in 0..TIP_DIFF {
env.mine_empty_block()?;
env.send(&addr, Amount::from_sat(2100))?;
}
// sync emitter to tip, first mempool emission should include all txs (as we haven't emitted
// from the mempool yet)
while emitter.next_header()?.is_some() {}
assert_eq!(
emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>(),
env.client
.get_raw_mempool()?
.into_iter()
.collect::<BTreeSet<_>>(),
"first mempool emission should include all txs",
);
// perform reorgs at different heights, these reorgs will not comfirm transactions in the
// mempool
for reorg_count in 1..TIP_DIFF {
println!("REORG COUNT: {}", reorg_count);
env.reorg_empty_blocks(reorg_count)?;
// This is a map of mempool txids to tip height where the tx was introduced to the mempool
// we recalculate this at every loop as reorgs may evict transactions from mempool. We use
// the introduction height to determine whether we expect a tx to appear in a mempool
// emission.
// TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first?
let tx_introductions = dbg!(env
.client
.get_raw_mempool_verbose()?
.into_iter()
.map(|(txid, entry)| (txid, entry.height as usize))
.collect::<BTreeMap<_, _>>());
// `next_header` emits the replacement block of the reorg
if let Some((height, _)) = emitter.next_header()? {
println!("\t- replacement height: {}", height);
// the mempool emission (that follows the first block emission after reorg) should only
// include mempool txs introduced at reorg height or greater
let mempool = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>();
let exp_mempool = tx_introductions
.iter()
.filter(|(_, &intro_h)| intro_h >= (height as usize))
.map(|(&txid, _)| txid)
.collect::<BTreeSet<_>>();
assert_eq!(
mempool, exp_mempool,
"the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater"
);
let mempool = emitter
.mempool()?
.into_iter()
.map(|(tx, _)| tx.txid())
.collect::<BTreeSet<_>>();
let exp_mempool = tx_introductions
.iter()
.filter(|&(_, &intro_height)| intro_height > (height as usize))
.map(|(&txid, _)| txid)
.collect::<BTreeSet<_>>();
assert_eq!(
mempool, exp_mempool,
"following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}",
exp_mempool
.difference(&mempool)
.map(|txid| (txid, tx_introductions.get(txid).unwrap()))
.collect::<Vec<_>>(),
mempool
.difference(&exp_mempool)
.map(|txid| (txid, tx_introductions.get(txid).unwrap()))
.collect::<Vec<_>>(),
);
}
// sync emitter to tip
while emitter.next_header()?.is_some() {}
}
Ok(())
}
/// If blockchain re-org includes the start height, emit new start height block
///
/// 1. mine 101 blocks
/// 2. emmit blocks 99a, 100a
/// 3. invalidate blocks 99a, 100a, 101a
/// 4. mine new blocks 99b, 100b, 101b
/// 5. emmit block 99b
///
/// The block hash of 99b should be different than 99a, but their previous block hashes should
/// be the same.
#[test]
fn no_agreement_point() -> anyhow::Result<()> {
const PREMINE_COUNT: usize = 101;
let env = TestEnv::new()?;
// start height is 99
let mut emitter = Emitter::from_height(&env.client, (PREMINE_COUNT - 2) as u32);
// mine 101 blocks
env.mine_blocks(PREMINE_COUNT, None)?;
// emit block 99a
let (_, block_header_99a) = emitter.next_header()?.expect("block 99a header");
let block_hash_99a = block_header_99a.block_hash();
let block_hash_98a = block_header_99a.prev_blockhash;
// emit block 100a
let (_, block_header_100a) = emitter.next_header()?.expect("block 100a header");
let block_hash_100a = block_header_100a.block_hash();
// get hash for block 101a
let block_hash_101a = env.client.get_block_hash(101)?;
// invalidate blocks 99a, 100a, 101a
env.client.invalidate_block(&block_hash_99a)?;
env.client.invalidate_block(&block_hash_100a)?;
env.client.invalidate_block(&block_hash_101a)?;
// mine new blocks 99b, 100b, 101b
env.mine_blocks(3, None)?;
// emit block header 99b
let (_, block_header_99b) = emitter.next_header()?.expect("block 99b header");
let block_hash_99b = block_header_99b.block_hash();
let block_hash_98b = block_header_99b.prev_blockhash;
assert_ne!(block_hash_99a, block_hash_99b);
assert_eq!(block_hash_98a, block_hash_98b);
Ok(())
}

View File

@ -1,6 +1,6 @@
use bitcoin::{hashes::Hash, BlockHash, OutPoint, TxOut, Txid};
use crate::{Anchor, COINBASE_MATURITY};
use crate::{Anchor, AnchorFromBlockPosition, COINBASE_MATURITY};
/// Represents the observed position of some chain data.
///
@ -109,6 +109,12 @@ impl Anchor for BlockId {
}
}
impl AnchorFromBlockPosition for BlockId {
fn from_block_position(_block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self {
block_id
}
}
impl Default for BlockId {
fn default() -> Self {
Self {
@ -168,6 +174,15 @@ impl Anchor for ConfirmationHeightAnchor {
}
}
impl AnchorFromBlockPosition for ConfirmationHeightAnchor {
fn from_block_position(_block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self {
Self {
anchor_block: block_id,
confirmation_height: block_id.height,
}
}
}
/// An [`Anchor`] implementation that also records the exact confirmation time and height of the
/// transaction.
///
@ -196,6 +211,17 @@ impl Anchor for ConfirmationTimeAnchor {
self.confirmation_height
}
}
impl AnchorFromBlockPosition for ConfirmationTimeAnchor {
fn from_block_position(block: &bitcoin::Block, block_id: BlockId, _tx_pos: usize) -> Self {
Self {
anchor_block: block_id,
confirmation_height: block_id.height,
confirmation_time: block.header.time as _,
}
}
}
/// A `TxOut` with as much data as we can retrieve about it
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct FullTxOut<A> {

View File

@ -3,12 +3,12 @@
//! This is essentially a [`TxGraph`] combined with an indexer.
use alloc::vec::Vec;
use bitcoin::{OutPoint, Transaction, TxOut};
use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid};
use crate::{
keychain,
tx_graph::{self, TxGraph},
Anchor, Append,
Anchor, AnchorFromBlockPosition, Append, BlockId,
};
/// A struct that combines [`TxGraph`] and an [`Indexer`] implementation.
@ -72,71 +72,63 @@ impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
where
I::ChangeSet: Default + Append,
{
fn index_tx_graph_changeset(
&mut self,
tx_graph_changeset: &tx_graph::ChangeSet<A>,
) -> I::ChangeSet {
let mut changeset = I::ChangeSet::default();
for added_tx in &tx_graph_changeset.txs {
changeset.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &tx_graph_changeset.txouts {
changeset.append(self.index.index_txout(added_outpoint, added_txout));
}
changeset
}
/// Apply an `update` directly.
///
/// `update` is a [`TxGraph<A>`] and the resultant changes is returned as [`ChangeSet`].
pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.apply_update(update);
let mut indexer = I::ChangeSet::default();
for added_tx in &graph.txs {
indexer.append(self.index.index_tx(added_tx));
}
for (&added_outpoint, added_txout) in &graph.txouts {
indexer.append(self.index.index_txout(added_outpoint, added_txout));
}
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
/// Insert a floating `txout` of given `outpoint`.
pub fn insert_txout(
&mut self,
outpoint: OutPoint,
txout: &TxOut,
) -> ChangeSet<A, I::ChangeSet> {
let mut update = TxGraph::<A>::default();
let _ = update.insert_txout(outpoint, txout.clone());
self.apply_update(update)
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_txout(outpoint, txout);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
/// Insert and index a transaction into the graph.
///
/// `anchors` can be provided to anchor the transaction to various blocks. `seen_at` is a
/// unix timestamp of when the transaction is last seen.
pub fn insert_tx(
&mut self,
tx: &Transaction,
anchors: impl IntoIterator<Item = A>,
seen_at: Option<u64>,
) -> ChangeSet<A, I::ChangeSet> {
let txid = tx.txid();
let mut update = TxGraph::<A>::default();
if self.graph.get_tx(txid).is_none() {
let _ = update.insert_tx(tx.clone());
}
for anchor in anchors.into_iter() {
let _ = update.insert_anchor(txid, anchor);
}
if let Some(seen_at) = seen_at {
let _ = update.insert_seen_at(txid, seen_at);
}
self.apply_update(update)
pub fn insert_tx(&mut self, tx: Transaction) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_tx(tx);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
/// Insert relevant transactions from the given `txs` iterator.
/// Insert an `anchor` for a given transaction.
pub fn insert_anchor(&mut self, txid: Txid, anchor: A) -> ChangeSet<A, I::ChangeSet> {
self.graph.insert_anchor(txid, anchor).into()
}
/// Insert a unix timestamp of when a transaction is seen in the mempool.
///
/// This is used for transaction conflict resolution in [`TxGraph`] where the transaction with
/// the later last-seen is prioritized.
pub fn insert_seen_at(&mut self, txid: Txid, seen_at: u64) -> ChangeSet<A, I::ChangeSet> {
self.graph.insert_seen_at(txid, seen_at).into()
}
/// Batch insert transactions, filtering out those that are irrelevant.
///
/// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant
/// transactions in `txs` will be ignored. `txs` do not need to be in topological order.
///
/// `anchors` can be provided to anchor the transactions to blocks. `seen_at` is a unix
/// timestamp of when the transactions are last seen.
pub fn insert_relevant_txs<'t>(
pub fn batch_insert_relevant<'t>(
&mut self,
txs: impl IntoIterator<Item = (&'t Transaction, impl IntoIterator<Item = A>)>,
seen_at: Option<u64>,
) -> ChangeSet<A, I::ChangeSet> {
// The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by:
@ -144,25 +136,133 @@ where
// not store anything about them.
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let mut changeset = ChangeSet::<A, I::ChangeSet>::default();
let mut transactions = Vec::new();
for (tx, anchors) in txs.into_iter() {
changeset.indexer.append(self.index.index_tx(tx));
transactions.push((tx, anchors));
let txs = txs.into_iter().collect::<Vec<_>>();
let mut indexer = I::ChangeSet::default();
for (tx, _) in &txs {
indexer.append(self.index.index_tx(tx));
}
changeset.append(
transactions
.into_iter()
.filter_map(|(tx, anchors)| match self.index.is_tx_relevant(tx) {
true => Some(self.insert_tx(tx, anchors, seen_at)),
false => None,
})
.fold(Default::default(), |mut acc, other| {
acc.append(other);
acc
}),
let mut graph = tx_graph::ChangeSet::default();
for (tx, anchors) in txs {
if self.index.is_tx_relevant(tx) {
let txid = tx.txid();
graph.append(self.graph.insert_tx(tx.clone()));
for anchor in anchors {
graph.append(self.graph.insert_anchor(txid, anchor));
}
}
}
ChangeSet { graph, indexer }
}
/// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
///
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored.
///
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_relevant_unconfirmed<'t>(
&mut self,
unconfirmed_txs: impl IntoIterator<Item = (&'t Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> {
// The algorithm below allows for non-topologically ordered transactions by using two loops.
// This is achieved by:
// 1. insert all txs into the index. If they are irrelevant then that's fine it will just
// not store anything about them.
// 2. decide whether to insert them into the graph depending on whether `is_tx_relevant`
// returns true or not. (in a second loop).
let txs = unconfirmed_txs.into_iter().collect::<Vec<_>>();
let mut indexer = I::ChangeSet::default();
for (tx, _) in &txs {
indexer.append(self.index.index_tx(tx));
}
let graph = self.graph.batch_insert_unconfirmed(
txs.into_iter()
.filter(|(tx, _)| self.index.is_tx_relevant(tx))
.map(|(tx, seen_at)| (tx.clone(), seen_at)),
);
changeset
ChangeSet { graph, indexer }
}
/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution in [`TxGraph`] (refer to [`TxGraph::insert_seen_at`] for details).
///
/// To filter out irrelevant transactions, use [`batch_insert_relevant_unconfirmed`] instead.
///
/// [`batch_insert_relevant_unconfirmed`]: IndexedTxGraph::batch_insert_relevant_unconfirmed
pub fn batch_insert_unconfirmed(
&mut self,
txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.batch_insert_unconfirmed(txs);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
}
/// Methods are available if the anchor (`A`) implements [`AnchorFromBlockPosition`].
impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I>
where
I::ChangeSet: Default + Append,
A: AnchorFromBlockPosition,
{
/// Batch insert all transactions of the given `block` of `height`, filtering out those that are
/// irrelevant.
///
/// Each inserted transaction's anchor will be constructed from
/// [`AnchorFromBlockPosition::from_block_position`].
///
/// Relevancy is determined by the internal [`Indexer::is_tx_relevant`] implementation of `I`.
/// Irrelevant tansactions in `txs` will be ignored.
pub fn apply_block_relevant(
&mut self,
block: Block,
height: u32,
) -> ChangeSet<A, I::ChangeSet> {
let block_id = BlockId {
hash: block.block_hash(),
height,
};
let txs = block.txdata.iter().enumerate().map(|(tx_pos, tx)| {
(
tx,
core::iter::once(A::from_block_position(&block, block_id, tx_pos)),
)
});
self.batch_insert_relevant(txs)
}
/// Batch insert all transactions of the given `block` of `height`.
///
/// Each inserted transaction's anchor will be constructed from
/// [`AnchorFromBlockPosition::from_block_position`].
///
/// To only insert relevant transactions, use [`apply_block_relevant`] instead.
///
/// [`apply_block_relevant`]: IndexedTxGraph::apply_block_relevant
pub fn apply_block(&mut self, block: Block, height: u32) -> ChangeSet<A, I::ChangeSet> {
let block_id = BlockId {
hash: block.block_hash(),
height,
};
let mut graph = tx_graph::ChangeSet::default();
for (tx_pos, tx) in block.txdata.iter().enumerate() {
let anchor = A::from_block_position(&block, block_id, tx_pos);
graph.append(self.graph.insert_anchor(tx.txid(), anchor));
graph.append(self.graph.insert_tx(tx.clone()));
}
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
}
}

View File

@ -39,6 +39,41 @@ impl CheckPoint {
Self(Arc::new(CPInner { block, prev: None }))
}
/// Construct a checkpoint from the given `header` and block `height`.
///
/// If `header` is of the genesis block, the checkpoint won't have a [`prev`] node. Otherwise,
/// we return a checkpoint linked with the previous block.
///
/// [`prev`]: CheckPoint::prev
pub fn from_header(header: &bitcoin::block::Header, height: u32) -> Self {
let hash = header.block_hash();
let this_block_id = BlockId { height, hash };
let prev_height = match height.checked_sub(1) {
Some(h) => h,
None => return Self::new(this_block_id),
};
let prev_block_id = BlockId {
height: prev_height,
hash: header.prev_blockhash,
};
CheckPoint::new(prev_block_id)
.push(this_block_id)
.expect("must construct checkpoint")
}
/// Convenience method to convert the [`CheckPoint`] into an [`Update`].
///
/// For more information, refer to [`Update`].
pub fn into_update(self, introduce_older_blocks: bool) -> Update {
Update {
tip: self,
introduce_older_blocks,
}
}
/// Puts another checkpoint onto the linked list representing the blockchain.
///
/// Returns an `Err(self)` if the block you are pushing on is not at a greater height that the one you

View File

@ -76,12 +76,19 @@ pub trait Anchor: core::fmt::Debug + Clone + Eq + PartialOrd + Ord + core::hash:
}
}
impl<A: Anchor> Anchor for &'static A {
impl<'a, A: Anchor> Anchor for &'a A {
fn anchor_block(&self) -> BlockId {
<A as Anchor>::anchor_block(self)
}
}
/// An [`Anchor`] that can be constructed from a given block, block height and transaction position
/// within the block.
pub trait AnchorFromBlockPosition: Anchor {
/// Construct the anchor from a given `block`, block height and `tx_pos` within the block.
fn from_block_position(block: &bitcoin::Block, block_id: BlockId, tx_pos: usize) -> Self;
}
/// Trait that makes an object appendable.
pub trait Append {
/// Append another object of the same type onto `self`.

View File

@ -451,6 +451,23 @@ impl<A: Clone + Ord> TxGraph<A> {
self.apply_update(update)
}
/// Batch insert unconfirmed transactions.
///
/// Items of `txs` are tuples containing the transaction and a *last seen* timestamp. The
/// *last seen* communicates when the transaction is last seen in the mempool which is used for
/// conflict-resolution (refer to [`TxGraph::insert_seen_at`] for details).
pub fn batch_insert_unconfirmed(
&mut self,
txs: impl IntoIterator<Item = (Transaction, u64)>,
) -> ChangeSet<A> {
let mut changeset = ChangeSet::<A>::default();
for (tx, seen_at) in txs {
changeset.append(self.insert_seen_at(tx.txid(), seen_at));
changeset.append(self.insert_tx(tx));
}
changeset
}
/// Inserts the given `anchor` into [`TxGraph`].
///
/// The [`ChangeSet`] returned will be empty if graph already knows that `txid` exists in

View File

@ -74,7 +74,7 @@ fn insert_relevant_txs() {
};
assert_eq!(
graph.insert_relevant_txs(txs.iter().map(|tx| (tx, None)), None),
graph.batch_insert_relevant(txs.iter().map(|tx| (tx, None))),
changeset,
);
@ -211,8 +211,8 @@ fn test_list_owned_txouts() {
// Insert transactions into graph with respective anchors
// For unconfirmed txs we pass in `None`.
let _ = graph.insert_relevant_txs(
[&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
let _ =
graph.batch_insert_relevant([&tx1, &tx2, &tx3, &tx6].iter().enumerate().map(|(i, tx)| {
let height = i as u32;
(
*tx,
@ -226,11 +226,9 @@ fn test_list_owned_txouts() {
confirmation_height: anchor_block.height,
}),
)
}),
None,
);
}));
let _ = graph.insert_relevant_txs([&tx4, &tx5].iter().map(|tx| (*tx, None)), Some(100));
let _ = graph.batch_insert_relevant_unconfirmed([&tx4, &tx5].iter().map(|tx| (*tx, 100)));
// A helper lambda to extract and filter data from the graph.
let fetch =

View File

@ -0,0 +1,12 @@
[package]
name = "example_bitcoind_rpc_polling"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bdk_chain = { path = "../../crates/chain", features = ["serde"] }
bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" }
example_cli = { path = "../example_cli" }
ctrlc = { version = "^2" }

View File

@ -0,0 +1,388 @@
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use bdk_bitcoind_rpc::{
bitcoincore_rpc::{Auth, Client, RpcApi},
Emitter,
};
use bdk_chain::{
bitcoin::{Block, Transaction},
indexed_tx_graph, keychain,
local_chain::{self, CheckPoint, LocalChain},
ConfirmationTimeAnchor, IndexedTxGraph,
};
use example_cli::{
anyhow,
clap::{self, Args, Subcommand},
Keychain,
};
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
const DB_PATH: &str = ".bdk_example_rpc.db";
/// The mpsc channel bound for emissions from [`Emitter`].
const CHANNEL_BOUND: usize = 10;
/// Delay for printing status to stdout.
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
/// Delay between mempool emissions.
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
/// Delay for committing to persistance.
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
type ChangeSet = (
local_chain::ChangeSet,
indexed_tx_graph::ChangeSet<ConfirmationTimeAnchor, keychain::ChangeSet<Keychain>>,
);
#[derive(Debug)]
enum Emission {
Block { height: u32, block: Block },
Mempool(Vec<(Transaction, u64)>),
Tip(u32),
}
#[derive(Args, Debug, Clone)]
struct RpcArgs {
/// RPC URL
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
url: String,
/// RPC auth cookie file
#[clap(env = "RPC_COOKIE", long)]
rpc_cookie: Option<PathBuf>,
/// RPC auth username
#[clap(env = "RPC_USER", long)]
rpc_user: Option<String>,
/// RPC auth password
#[clap(env = "RPC_PASS", long)]
rpc_password: Option<String>,
/// Starting block height to fallback to if no point of agreement if found
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
fallback_height: u32,
/// The unused-scripts lookahead will be kept at this size
#[clap(long, default_value = "10")]
lookahead: u32,
}
impl From<RpcArgs> for Auth {
fn from(args: RpcArgs) -> Self {
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
(None, None, None) => Self::None,
(Some(path), _, _) => Self::CookieFile(path),
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
}
}
}
impl RpcArgs {
fn new_client(&self) -> anyhow::Result<Client> {
Ok(Client::new(
&self.url,
match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) {
(None, None, None) => Auth::None,
(Some(path), _, _) => Auth::CookieFile(path.clone()),
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
},
)?)
}
}
#[derive(Subcommand, Debug, Clone)]
enum RpcCommands {
/// Syncs local state with remote state via RPC (starting from last point of agreement) and
/// stores/indexes relevant transactions
Sync {
#[clap(flatten)]
rpc_args: RpcArgs,
},
/// Sync by having the emitter logic in a separate thread
Live {
#[clap(flatten)]
rpc_args: RpcArgs,
},
}
fn main() -> anyhow::Result<()> {
let start = Instant::now();
let (args, keymap, index, db, init_changeset) =
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
println!(
"[{:>10}s] loaded initial changeset from db",
start.elapsed().as_secs_f32()
);
let graph = Mutex::new({
let mut graph = IndexedTxGraph::new(index);
graph.apply_changeset(init_changeset.1);
graph
});
println!(
"[{:>10}s] loaded indexed tx graph from changeset",
start.elapsed().as_secs_f32()
);
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
println!(
"[{:>10}s] loaded local chain from changeset",
start.elapsed().as_secs_f32()
);
let rpc_cmd = match args.command {
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
general_cmd => {
let res = example_cli::handle_commands(
&graph,
&db,
&chain,
&keymap,
args.network,
|rpc_args, tx| {
let client = rpc_args.new_client()?;
client.send_raw_transaction(tx)?;
Ok(())
},
general_cmd,
);
db.lock().unwrap().commit()?;
return res;
}
};
match rpc_cmd {
RpcCommands::Sync { rpc_args } => {
let RpcArgs {
fallback_height,
lookahead,
..
} = rpc_args;
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?;
let mut emitter = match chain_tip {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height),
};
let mut last_db_commit = Instant::now();
let mut last_print = Instant::now();
while let Some((height, block)) = emitter.next_block()? {
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain
.apply_update(chain_update)
.expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
db.stage((chain_changeset, graph_changeset));
// commit staged db changes in intervals
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
last_db_commit = Instant::now();
db.commit()?;
println!(
"[{:>10}s] commited to db (took {}s)",
start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32()
);
}
// print synced-to height and current balance in intervals
if last_print.elapsed() >= STDOUT_PRINT_DELAY {
last_print = Instant::now();
if let Some(synced_to) = chain.tip() {
let balance = {
graph.graph().balance(
&*chain,
synced_to.block_id(),
graph.index.outpoints().iter().cloned(),
|(k, _), _| k == &Keychain::Internal,
)
};
println!(
"[{:>10}s] synced to {} @ {} | total: {} sats",
start.elapsed().as_secs_f32(),
synced_to.hash(),
synced_to.height(),
balance.total()
);
}
}
}
let mempool_txs = emitter.mempool()?;
let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
);
{
let mut db = db.lock().unwrap();
db.stage((local_chain::ChangeSet::default(), graph_changeset));
db.commit()?; // commit one last time
}
}
RpcCommands::Live { rpc_args } => {
let RpcArgs {
fallback_height,
lookahead,
..
} = rpc_args;
let sigterm_flag = start_ctrlc_handler();
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
let last_cp = chain.lock().unwrap().tip();
println!(
"[{:>10}s] starting emitter thread...",
start.elapsed().as_secs_f32()
);
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
let rpc_client = rpc_args.new_client()?;
let mut emitter = match last_cp {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height),
};
let mut block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
loop {
match emitter.next_block()? {
Some((height, block)) => {
if sigterm_flag.load(Ordering::Acquire) {
break;
}
if height > block_count {
block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
}
tx.send(Emission::Block { height, block })?;
}
None => {
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
break;
}
println!("preparing mempool emission...");
let now = Instant::now();
tx.send(Emission::Mempool(emitter.mempool()?))?;
println!("mempool emission prepared in {}s", now.elapsed().as_secs());
continue;
}
};
}
println!("emitter thread shutting down...");
Ok(())
});
let mut tip_height = 0_u32;
let mut last_db_commit = Instant::now();
let mut last_print = Option::<Instant>::None;
for emission in rx {
let mut db = db.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut chain = chain.lock().unwrap();
let changeset = match emission {
Emission::Block { height, block } => {
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain
.apply_update(chain_update)
.expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
(chain_changeset, graph_changeset)
}
Emission::Mempool(mempool_txs) => {
let graph_changeset = graph.batch_insert_relevant_unconfirmed(
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
);
(local_chain::ChangeSet::default(), graph_changeset)
}
Emission::Tip(h) => {
tip_height = h;
continue;
}
};
db.stage(changeset);
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
last_db_commit = Instant::now();
db.commit()?;
println!(
"[{:>10}s] commited to db (took {}s)",
start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32()
);
}
if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY {
last_print = Some(Instant::now());
if let Some(synced_to) = chain.tip() {
let balance = {
graph.graph().balance(
&*chain,
synced_to.block_id(),
graph.index.outpoints().iter().cloned(),
|(k, _), _| k == &Keychain::Internal,
)
};
println!(
"[{:>10}s] synced to {} @ {} / {} | total: {} sats",
start.elapsed().as_secs_f32(),
synced_to.hash(),
synced_to.height(),
tip_height,
balance.total()
);
}
}
}
emission_jh.join().expect("must join emitter thread")?;
}
}
Ok(())
}
#[allow(dead_code)]
fn start_ctrlc_handler() -> Arc<AtomicBool> {
let flag = Arc::new(AtomicBool::new(false));
let cloned_flag = flag.clone();
ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));
flag
}
#[allow(dead_code)]
fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
let start = Instant::now();
loop {
if flag.load(Ordering::Acquire) {
return true;
}
if start.elapsed() >= duration {
return false;
}
std::thread::sleep(Duration::from_secs(1));
}
}

View File

@ -34,7 +34,7 @@ pub type Database<'m, C> = Persist<Store<'m, C>, C>;
#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)]
pub struct Args<S: clap::Subcommand> {
pub struct Args<CS: clap::Subcommand, S: clap::Args> {
#[clap(env = "DESCRIPTOR")]
pub descriptor: String,
#[clap(env = "CHANGE_DESCRIPTOR")]
@ -50,14 +50,14 @@ pub struct Args<S: clap::Subcommand> {
pub cp_limit: usize,
#[clap(subcommand)]
pub command: Commands<S>,
pub command: Commands<CS, S>,
}
#[allow(clippy::almost_swapped)]
#[derive(Subcommand, Debug, Clone)]
pub enum Commands<S: clap::Subcommand> {
pub enum Commands<CS: clap::Subcommand, S: clap::Args> {
#[clap(flatten)]
ChainSpecific(S),
ChainSpecific(CS),
/// Address generation and inspection.
Address {
#[clap(subcommand)]
@ -77,6 +77,8 @@ pub enum Commands<S: clap::Subcommand> {
address: Address<address::NetworkUnchecked>,
#[clap(short, default_value = "bnb")]
coin_select: CoinSelectionAlgo,
#[clap(flatten)]
chain_specfic: S,
},
}
@ -183,225 +185,6 @@ impl core::fmt::Display for Keychain {
}
}
pub fn run_address_cmd<A, C>(
graph: &mut KeychainTxGraph<A>,
db: &Mutex<Database<C>>,
network: Network,
cmd: AddressCmd,
) -> anyhow::Result<()>
where
C: Default + Append + DeserializeOwned + Serialize + From<KeychainChangeSet<A>>,
{
let index = &mut graph.index;
match cmd {
AddressCmd::Next | AddressCmd::New => {
let spk_chooser = match cmd {
AddressCmd::Next => KeychainTxOutIndex::next_unused_spk,
AddressCmd::New => KeychainTxOutIndex::reveal_next_spk,
_ => unreachable!("only these two variants exist in match arm"),
};
let ((spk_i, spk), index_changeset) = spk_chooser(index, &Keychain::External);
let db = &mut *db.lock().unwrap();
db.stage(C::from((
local_chain::ChangeSet::default(),
indexed_tx_graph::ChangeSet::from(index_changeset),
)));
db.commit()?;
let addr = Address::from_script(spk, network).context("failed to derive address")?;
println!("[address @ {}] {}", spk_i, addr);
Ok(())
}
AddressCmd::Index => {
for (keychain, derivation_index) in index.last_revealed_indices() {
println!("{:?}: {}", keychain, derivation_index);
}
Ok(())
}
AddressCmd::List { change } => {
let target_keychain = match change {
true => Keychain::Internal,
false => Keychain::External,
};
for (spk_i, spk) in index.revealed_spks_of_keychain(&target_keychain) {
let address = Address::from_script(spk, network)
.expect("should always be able to derive address");
println!(
"{:?} {} used:{}",
spk_i,
address,
index.is_used(&(target_keychain, spk_i))
);
}
Ok(())
}
}
}
pub fn run_balance_cmd<A: Anchor, O: ChainOracle>(
graph: &KeychainTxGraph<A>,
chain: &O,
) -> Result<(), O::Error> {
fn print_balances<'a>(title_str: &'a str, items: impl IntoIterator<Item = (&'a str, u64)>) {
println!("{}:", title_str);
for (name, amount) in items.into_iter() {
println!(" {:<10} {:>12} sats", name, amount)
}
}
let balance = graph.graph().try_balance(
chain,
chain.get_chain_tip()?.unwrap_or_default(),
graph.index.outpoints().iter().cloned(),
|(k, _), _| k == &Keychain::Internal,
)?;
let confirmed_total = balance.confirmed + balance.immature;
let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending;
print_balances(
"confirmed",
[
("total", confirmed_total),
("spendable", balance.confirmed),
("immature", balance.immature),
],
);
print_balances(
"unconfirmed",
[
("total", unconfirmed_total),
("trusted", balance.trusted_pending),
("untrusted", balance.untrusted_pending),
],
);
Ok(())
}
pub fn run_txo_cmd<A: Anchor, O: ChainOracle>(
graph: &KeychainTxGraph<A>,
chain: &O,
network: Network,
cmd: TxOutCmd,
) -> anyhow::Result<()>
where
O::Error: std::error::Error + Send + Sync + 'static,
{
let chain_tip = chain.get_chain_tip()?.unwrap_or_default();
let outpoints = graph.index.outpoints().iter().cloned();
match cmd {
TxOutCmd::List {
spent,
unspent,
confirmed,
unconfirmed,
} => {
let txouts = graph
.graph()
.try_filter_chain_txouts(chain, chain_tip, outpoints)
.filter(|r| match r {
Ok((_, full_txo)) => match (spent, unspent) {
(true, false) => full_txo.spent_by.is_some(),
(false, true) => full_txo.spent_by.is_none(),
_ => true,
},
// always keep errored items
Err(_) => true,
})
.filter(|r| match r {
Ok((_, full_txo)) => match (confirmed, unconfirmed) {
(true, false) => full_txo.chain_position.is_confirmed(),
(false, true) => !full_txo.chain_position.is_confirmed(),
_ => true,
},
// always keep errored items
Err(_) => true,
})
.collect::<Result<Vec<_>, _>>()?;
for (spk_i, full_txo) in txouts {
let addr = Address::from_script(&full_txo.txout.script_pubkey, network)?;
println!(
"{:?} {} {} {} spent:{:?}",
spk_i, full_txo.txout.value, full_txo.outpoint, addr, full_txo.spent_by
)
}
Ok(())
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn run_send_cmd<A: Anchor, O: ChainOracle, C>(
graph: &Mutex<KeychainTxGraph<A>>,
db: &Mutex<Database<'_, C>>,
chain: &O,
keymap: &HashMap<DescriptorPublicKey, DescriptorSecretKey>,
cs_algorithm: CoinSelectionAlgo,
address: Address,
value: u64,
broadcast: impl FnOnce(&Transaction) -> anyhow::Result<()>,
) -> anyhow::Result<()>
where
O::Error: std::error::Error + Send + Sync + 'static,
C: Default + Append + DeserializeOwned + Serialize + From<KeychainChangeSet<A>>,
{
let (transaction, change_index) = {
let graph = &mut *graph.lock().unwrap();
// take mutable ref to construct tx -- it is only open for a short time while building it.
let (tx, change_info) = create_tx(graph, chain, keymap, cs_algorithm, address, value)?;
if let Some((index_changeset, (change_keychain, index))) = change_info {
// We must first persist to disk the fact that we've got a new address from the
// change keychain so future scans will find the tx we're about to broadcast.
// If we're unable to persist this, then we don't want to broadcast.
{
let db = &mut *db.lock().unwrap();
db.stage(C::from((
local_chain::ChangeSet::default(),
indexed_tx_graph::ChangeSet::from(index_changeset),
)));
db.commit()?;
}
// We don't want other callers/threads to use this address while we're using it
// but we also don't want to scan the tx we just created because it's not
// technically in the blockchain yet.
graph.index.mark_used(&change_keychain, index);
(tx, Some((change_keychain, index)))
} else {
(tx, None)
}
};
match (broadcast)(&transaction) {
Ok(_) => {
println!("Broadcasted Tx : {}", transaction.txid());
let keychain_changeset = graph.lock().unwrap().insert_tx(&transaction, None, None);
// We know the tx is at least unconfirmed now. Note if persisting here fails,
// it's not a big deal since we can always find it again form
// blockchain.
db.lock().unwrap().stage(C::from((
local_chain::ChangeSet::default(),
keychain_changeset,
)));
Ok(())
}
Err(e) => {
if let Some((keychain, index)) = change_index {
// We failed to broadcast, so allow our change address to be used in the future
graph.lock().unwrap().index.unmark_used(&keychain, index);
}
Err(e)
}
}
}
#[allow(clippy::type_complexity)]
pub fn create_tx<A: Anchor, O: ChainOracle>(
graph: &mut KeychainTxGraph<A>,
@ -647,14 +430,14 @@ pub fn planned_utxos<A: Anchor, O: ChainOracle, K: Clone + bdk_tmp_plan::CanDeri
.collect()
}
pub fn handle_commands<S: clap::Subcommand, A: Anchor, O: ChainOracle, C>(
pub fn handle_commands<CS: clap::Subcommand, S: clap::Args, A: Anchor, O: ChainOracle, C>(
graph: &Mutex<KeychainTxGraph<A>>,
db: &Mutex<Database<C>>,
chain: &Mutex<O>,
keymap: &HashMap<DescriptorPublicKey, DescriptorSecretKey>,
network: Network,
broadcast: impl FnOnce(&Transaction) -> anyhow::Result<()>,
cmd: Commands<S>,
broadcast: impl FnOnce(S, &Transaction) -> anyhow::Result<()>,
cmd: Commands<CS, S>,
) -> anyhow::Result<()>
where
O::Error: std::error::Error + Send + Sync + 'static,
@ -664,45 +447,212 @@ where
Commands::ChainSpecific(_) => unreachable!("example code should handle this!"),
Commands::Address { addr_cmd } => {
let graph = &mut *graph.lock().unwrap();
run_address_cmd(graph, db, network, addr_cmd)
let index = &mut graph.index;
match addr_cmd {
AddressCmd::Next | AddressCmd::New => {
let spk_chooser = match addr_cmd {
AddressCmd::Next => KeychainTxOutIndex::next_unused_spk,
AddressCmd::New => KeychainTxOutIndex::reveal_next_spk,
_ => unreachable!("only these two variants exist in match arm"),
};
let ((spk_i, spk), index_changeset) = spk_chooser(index, &Keychain::External);
let db = &mut *db.lock().unwrap();
db.stage(C::from((
local_chain::ChangeSet::default(),
indexed_tx_graph::ChangeSet::from(index_changeset),
)));
db.commit()?;
let addr =
Address::from_script(spk, network).context("failed to derive address")?;
println!("[address @ {}] {}", spk_i, addr);
Ok(())
}
AddressCmd::Index => {
for (keychain, derivation_index) in index.last_revealed_indices() {
println!("{:?}: {}", keychain, derivation_index);
}
Ok(())
}
AddressCmd::List { change } => {
let target_keychain = match change {
true => Keychain::Internal,
false => Keychain::External,
};
for (spk_i, spk) in index.revealed_spks_of_keychain(&target_keychain) {
let address = Address::from_script(spk, network)
.expect("should always be able to derive address");
println!(
"{:?} {} used:{}",
spk_i,
address,
index.is_used(&(target_keychain, spk_i))
);
}
Ok(())
}
}
}
Commands::Balance => {
let graph = &*graph.lock().unwrap();
let chain = &*chain.lock().unwrap();
run_balance_cmd(graph, chain).map_err(anyhow::Error::from)
fn print_balances<'a>(
title_str: &'a str,
items: impl IntoIterator<Item = (&'a str, u64)>,
) {
println!("{}:", title_str);
for (name, amount) in items.into_iter() {
println!(" {:<10} {:>12} sats", name, amount)
}
}
let balance = graph.graph().try_balance(
chain,
chain.get_chain_tip()?.unwrap_or_default(),
graph.index.outpoints().iter().cloned(),
|(k, _), _| k == &Keychain::Internal,
)?;
let confirmed_total = balance.confirmed + balance.immature;
let unconfirmed_total = balance.untrusted_pending + balance.trusted_pending;
print_balances(
"confirmed",
[
("total", confirmed_total),
("spendable", balance.confirmed),
("immature", balance.immature),
],
);
print_balances(
"unconfirmed",
[
("total", unconfirmed_total),
("trusted", balance.trusted_pending),
("untrusted", balance.untrusted_pending),
],
);
Ok(())
}
Commands::TxOut { txout_cmd } => {
let graph = &*graph.lock().unwrap();
let chain = &*chain.lock().unwrap();
run_txo_cmd(graph, chain, network, txout_cmd)
let chain_tip = chain.get_chain_tip()?.unwrap_or_default();
let outpoints = graph.index.outpoints().iter().cloned();
match txout_cmd {
TxOutCmd::List {
spent,
unspent,
confirmed,
unconfirmed,
} => {
let txouts = graph
.graph()
.try_filter_chain_txouts(chain, chain_tip, outpoints)
.filter(|r| match r {
Ok((_, full_txo)) => match (spent, unspent) {
(true, false) => full_txo.spent_by.is_some(),
(false, true) => full_txo.spent_by.is_none(),
_ => true,
},
// always keep errored items
Err(_) => true,
})
.filter(|r| match r {
Ok((_, full_txo)) => match (confirmed, unconfirmed) {
(true, false) => full_txo.chain_position.is_confirmed(),
(false, true) => !full_txo.chain_position.is_confirmed(),
_ => true,
},
// always keep errored items
Err(_) => true,
})
.collect::<Result<Vec<_>, _>>()?;
for (spk_i, full_txo) in txouts {
let addr = Address::from_script(&full_txo.txout.script_pubkey, network)?;
println!(
"{:?} {} {} {} spent:{:?}",
spk_i, full_txo.txout.value, full_txo.outpoint, addr, full_txo.spent_by
)
}
Ok(())
}
}
}
Commands::Send {
value,
address,
coin_select,
chain_specfic,
} => {
let chain = &*chain.lock().unwrap();
let address = address.require_network(network)?;
run_send_cmd(
graph,
db,
chain,
keymap,
coin_select,
address,
value,
broadcast,
)
let (transaction, change_index) = {
let graph = &mut *graph.lock().unwrap();
// take mutable ref to construct tx -- it is only open for a short time while building it.
let (tx, change_info) =
create_tx(graph, chain, keymap, coin_select, address, value)?;
if let Some((index_changeset, (change_keychain, index))) = change_info {
// We must first persist to disk the fact that we've got a new address from the
// change keychain so future scans will find the tx we're about to broadcast.
// If we're unable to persist this, then we don't want to broadcast.
{
let db = &mut *db.lock().unwrap();
db.stage(C::from((
local_chain::ChangeSet::default(),
indexed_tx_graph::ChangeSet::from(index_changeset),
)));
db.commit()?;
}
// We don't want other callers/threads to use this address while we're using it
// but we also don't want to scan the tx we just created because it's not
// technically in the blockchain yet.
graph.index.mark_used(&change_keychain, index);
(tx, Some((change_keychain, index)))
} else {
(tx, None)
}
};
match (broadcast)(chain_specfic, &transaction) {
Ok(_) => {
println!("Broadcasted Tx : {}", transaction.txid());
let keychain_changeset = graph.lock().unwrap().insert_tx(transaction);
// We know the tx is at least unconfirmed now. Note if persisting here fails,
// it's not a big deal since we can always find it again form
// blockchain.
db.lock().unwrap().stage(C::from((
local_chain::ChangeSet::default(),
keychain_changeset,
)));
Ok(())
}
Err(e) => {
if let Some((keychain, index)) = change_index {
// We failed to broadcast, so allow our change address to be used in the future
graph.lock().unwrap().index.unmark_used(&keychain, index);
}
Err(e)
}
}
}
}
}
#[allow(clippy::type_complexity)]
pub fn init<'m, S: clap::Subcommand, C>(
pub fn init<'m, CS: clap::Subcommand, S: clap::Args, C>(
db_magic: &'m [u8],
db_default_path: &str,
) -> anyhow::Result<(
Args<S>,
Args<CS, S>,
KeyMap,
KeychainTxOutIndex<Keychain>,
Mutex<Database<'m, C>>,
@ -714,7 +664,7 @@ where
if std::env::var("BDK_DB_PATH").is_err() {
std::env::set_var("BDK_DB_PATH", db_default_path);
}
let args = Args::<S>::parse();
let args = Args::<CS, S>::parse();
let secp = Secp256k1::default();
let mut index = KeychainTxOutIndex::<Keychain>::default();

View File

@ -12,7 +12,7 @@ use bdk_chain::{
Append, ConfirmationHeightAnchor,
};
use bdk_electrum::{
electrum_client::{self, ElectrumApi},
electrum_client::{self, Client, ElectrumApi},
ElectrumExt, ElectrumUpdate,
};
use example_cli::{
@ -33,6 +33,8 @@ enum ElectrumCommands {
stop_gap: usize,
#[clap(flatten)]
scan_options: ScanOptions,
#[clap(flatten)]
electrum_args: ElectrumArgs,
},
/// Scans particular addresses using the electrum API.
Sync {
@ -50,9 +52,44 @@ enum ElectrumCommands {
unconfirmed: bool,
#[clap(flatten)]
scan_options: ScanOptions,
#[clap(flatten)]
electrum_args: ElectrumArgs,
},
}
impl ElectrumCommands {
fn electrum_args(&self) -> ElectrumArgs {
match self {
ElectrumCommands::Scan { electrum_args, .. } => electrum_args.clone(),
ElectrumCommands::Sync { electrum_args, .. } => electrum_args.clone(),
}
}
}
#[derive(clap::Args, Debug, Clone)]
pub struct ElectrumArgs {
/// The electrum url to use to connect to. If not provided it will use a default electrum server
/// for your chosen network.
electrum_url: Option<String>,
}
impl ElectrumArgs {
pub fn client(&self, network: Network) -> anyhow::Result<Client> {
let electrum_url = self.electrum_url.as_deref().unwrap_or(match network {
Network::Bitcoin => "ssl://electrum.blockstream.info:50002",
Network::Testnet => "ssl://electrum.blockstream.info:60002",
Network::Regtest => "tcp://localhost:60401",
Network::Signet => "tcp://signet-electrumx.wakiyamap.dev:50001",
_ => panic!("Unknown network"),
});
let config = electrum_client::Config::builder()
.validate_domain(matches!(network, Network::Bitcoin))
.build();
Ok(electrum_client::Client::from_config(electrum_url, config)?)
}
}
#[derive(Parser, Debug, Clone, PartialEq)]
pub struct ScanOptions {
/// Set batch size for each script_history call to electrum client.
@ -67,7 +104,7 @@ type ChangeSet = (
fn main() -> anyhow::Result<()> {
let (args, keymap, index, db, (disk_local_chain, disk_tx_graph)) =
example_cli::init::<ElectrumCommands, ChangeSet>(DB_MAGIC, DB_PATH)?;
example_cli::init::<ElectrumCommands, ElectrumArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
let graph = Mutex::new({
let mut graph = IndexedTxGraph::new(index);
@ -77,19 +114,6 @@ fn main() -> anyhow::Result<()> {
let chain = Mutex::new(LocalChain::from_changeset(disk_local_chain));
let electrum_url = match args.network {
Network::Bitcoin => "ssl://electrum.blockstream.info:50002",
Network::Testnet => "ssl://electrum.blockstream.info:60002",
Network::Regtest => "tcp://localhost:60401",
Network::Signet => "tcp://signet-electrumx.wakiyamap.dev:50001",
_ => panic!("Unknown network"),
};
let config = electrum_client::Config::builder()
.validate_domain(matches!(args.network, Network::Bitcoin))
.build();
let client = electrum_client::Client::from_config(electrum_url, config)?;
let electrum_cmd = match &args.command {
example_cli::Commands::ChainSpecific(electrum_cmd) => electrum_cmd,
general_cmd => {
@ -99,11 +123,10 @@ fn main() -> anyhow::Result<()> {
&chain,
&keymap,
args.network,
|tx| {
client
.transaction_broadcast(tx)
.map(|_| ())
.map_err(anyhow::Error::from)
|electrum_args, tx| {
let client = electrum_args.client(args.network)?;
client.transaction_broadcast(tx)?;
Ok(())
},
general_cmd.clone(),
);
@ -113,10 +136,13 @@ fn main() -> anyhow::Result<()> {
}
};
let client = electrum_cmd.electrum_args().client(args.network)?;
let response = match electrum_cmd.clone() {
ElectrumCommands::Scan {
stop_gap,
scan_options,
..
} => {
let (keychain_spks, tip) = {
let graph = &*graph.lock().unwrap();
@ -162,6 +188,7 @@ fn main() -> anyhow::Result<()> {
mut utxos,
mut unconfirmed,
scan_options,
..
} => {
// Get a short lock on the tracker to get the spks we're interested in
let graph = graph.lock().unwrap();

View File

@ -37,6 +37,8 @@ enum EsploraCommands {
stop_gap: usize,
#[clap(flatten)]
scan_options: ScanOptions,
#[clap(flatten)]
esplora_args: EsploraArgs,
},
/// Scan for particular addresses and unconfirmed transactions using the esplora API.
Sync {
@ -54,8 +56,40 @@ enum EsploraCommands {
unconfirmed: bool,
#[clap(flatten)]
scan_options: ScanOptions,
#[clap(flatten)]
esplora_args: EsploraArgs,
},
}
impl EsploraCommands {
fn esplora_args(&self) -> EsploraArgs {
match self {
EsploraCommands::Scan { esplora_args, .. } => esplora_args.clone(),
EsploraCommands::Sync { esplora_args, .. } => esplora_args.clone(),
}
}
}
#[derive(clap::Args, Debug, Clone)]
pub struct EsploraArgs {
/// The esplora url endpoint to connect to e.g. `<https://blockstream.info/api>`
/// If not provided it'll be set to a default for the network provided
esplora_url: Option<String>,
}
impl EsploraArgs {
pub fn client(&self, network: Network) -> anyhow::Result<esplora_client::BlockingClient> {
let esplora_url = self.esplora_url.as_deref().unwrap_or(match network {
Network::Bitcoin => "https://blockstream.info/api",
Network::Testnet => "https://blockstream.info/testnet/api",
Network::Regtest => "http://localhost:3002",
Network::Signet => "https://mempool.space/signet/api",
_ => panic!("unsupported network"),
});
let client = esplora_client::Builder::new(esplora_url).build_blocking()?;
Ok(client)
}
}
#[derive(Parser, Debug, Clone, PartialEq)]
pub struct ScanOptions {
@ -66,7 +100,7 @@ pub struct ScanOptions {
fn main() -> anyhow::Result<()> {
let (args, keymap, index, db, init_changeset) =
example_cli::init::<EsploraCommands, ChangeSet>(DB_MAGIC, DB_PATH)?;
example_cli::init::<EsploraCommands, EsploraArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
let (init_chain_changeset, init_indexed_tx_graph_changeset) = init_changeset;
@ -84,16 +118,6 @@ fn main() -> anyhow::Result<()> {
chain
});
let esplora_url = match args.network {
Network::Bitcoin => "https://blockstream.info/api",
Network::Testnet => "https://blockstream.info/testnet/api",
Network::Regtest => "http://localhost:3002",
Network::Signet => "https://mempool.space/signet/api",
_ => panic!("unsupported network"),
};
let client = esplora_client::Builder::new(esplora_url).build_blocking()?;
let esplora_cmd = match &args.command {
// These are commands that are handled by this example (sync, scan).
example_cli::Commands::ChainSpecific(esplora_cmd) => esplora_cmd,
@ -105,7 +129,8 @@ fn main() -> anyhow::Result<()> {
&chain,
&keymap,
args.network,
|tx| {
|esplora_args, tx| {
let client = esplora_args.client(args.network)?;
client
.broadcast(tx)
.map(|_| ())
@ -119,6 +144,7 @@ fn main() -> anyhow::Result<()> {
}
};
let client = esplora_cmd.esplora_args().client(args.network)?;
// Prepare the `IndexedTxGraph` update based on whether we are scanning or syncing.
// Scanning: We are iterating through spks of all keychains and scanning for transactions for
// each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap`
@ -131,6 +157,7 @@ fn main() -> anyhow::Result<()> {
EsploraCommands::Scan {
stop_gap,
scan_options,
..
} => {
let keychain_spks = graph
.lock()
@ -184,6 +211,7 @@ fn main() -> anyhow::Result<()> {
mut utxos,
mut unconfirmed,
scan_options,
..
} => {
if !(*all_spks || unused_spks || utxos || unconfirmed) {
// If nothing is specifically selected, we select everything (except all spks).