From 150d6f8ab6cd1eb1c9448d61e7bd71db0dd32a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 4 Oct 2023 18:22:03 +0800 Subject: [PATCH] feat(example_bitcoind_rpc_polling): add example for RPC polling --- Cargo.toml | 1 + .../example_bitcoind_rpc_polling/Cargo.toml | 12 + .../example_bitcoind_rpc_polling/src/main.rs | 366 ++++++++++++++++++ 3 files changed, 379 insertions(+) create mode 100644 example-crates/example_bitcoind_rpc_polling/Cargo.toml create mode 100644 example-crates/example_bitcoind_rpc_polling/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index a5058ebc..0e1efc90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "example-crates/example_cli", "example-crates/example_electrum", "example-crates/example_esplora", + "example-crates/example_bitcoind_rpc_polling", "example-crates/wallet_electrum", "example-crates/wallet_esplora_blocking", "example-crates/wallet_esplora_async", diff --git a/example-crates/example_bitcoind_rpc_polling/Cargo.toml b/example-crates/example_bitcoind_rpc_polling/Cargo.toml new file mode 100644 index 00000000..6728bb13 --- /dev/null +++ b/example-crates/example_bitcoind_rpc_polling/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "example_bitcoind_rpc_polling" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bdk_chain = { path = "../../crates/chain", features = ["serde"] } +bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" } +example_cli = { path = "../example_cli" } +ctrlc = { version = "^2" } diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs new file mode 100644 index 00000000..6fb557f7 --- /dev/null +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -0,0 +1,366 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant}, +}; + +use bdk_bitcoind_rpc::{ + bitcoincore_rpc::{Auth, Client, RpcApi}, + Emitter, +}; +use bdk_chain::{ + bitcoin::{Block, Transaction}, + indexed_tx_graph, keychain, + local_chain::{self, CheckPoint, LocalChain}, + ConfirmationTimeAnchor, IndexedTxGraph, +}; +use example_cli::{ + anyhow, + clap::{self, Args, Subcommand}, + Keychain, +}; + +const DB_MAGIC: &[u8] = b"bdk_example_rpc"; +const DB_PATH: &str = ".bdk_example_rpc.db"; + +const CHANNEL_BOUND: usize = 10; +/// The block depth which we assume no reorgs can happen at. +const ASSUME_FINAL_DEPTH: u32 = 6; +/// Delay for printing status to stdout. +const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); +/// Delay between mempool emissions. +const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30); +/// Delay for commiting to persistance. +const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); + +type ChangeSet = ( + local_chain::ChangeSet, + indexed_tx_graph::ChangeSet>, +); + +#[derive(Debug)] +enum Emission { + Block { height: u32, block: Block }, + Mempool(Vec<(Transaction, u64)>), + Tip(u32), +} + +#[derive(Args, Debug, Clone)] +struct RpcArgs { + /// RPC URL + #[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")] + url: String, + /// RPC auth cookie file + #[clap(env = "RPC_COOKIE", long)] + rpc_cookie: Option, + /// RPC auth username + #[clap(env = "RPC_USER", long)] + rpc_user: Option, + /// RPC auth password + #[clap(env = "RPC_PASS", long)] + rpc_password: Option, + /// Starting block height to fallback to if no point of agreement if found + #[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")] + fallback_height: u32, + /// The unused-scripts lookahead will be kept at this size + #[clap(long, default_value = "10")] + lookahead: u32, +} + +impl From for Auth { + fn from(args: RpcArgs) -> Self { + match (args.rpc_cookie, args.rpc_user, args.rpc_password) { + (None, None, None) => Self::None, + (Some(path), _, _) => Self::CookieFile(path), + (_, Some(user), Some(pass)) => Self::UserPass(user, pass), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + } + } +} + +impl RpcArgs { + fn new_client(&self) -> anyhow::Result { + Ok(Client::new( + &self.url, + match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) { + (None, None, None) => Auth::None, + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + }, + )?) + } +} + +#[derive(Subcommand, Debug, Clone)] +enum RpcCommands { + /// Syncs local state with remote state via RPC (starting from last point of agreement) and + /// stores/indexes relevant transactions + Sync { + #[clap(flatten)] + rpc_args: RpcArgs, + }, + /// Sync by having the emitter logic in a separate thread + Live { + #[clap(flatten)] + rpc_args: RpcArgs, + }, +} + +fn main() -> anyhow::Result<()> { + let (args, keymap, index, db, init_changeset) = + example_cli::init::(DB_MAGIC, DB_PATH)?; + + let graph = Mutex::new({ + let mut graph = IndexedTxGraph::new(index); + graph.apply_changeset(init_changeset.1); + graph + }); + println!("loaded indexed tx graph from db"); + + let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); + println!("loaded local chain from db"); + + let rpc_cmd = match args.command { + example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, + general_cmd => { + let res = example_cli::handle_commands( + &graph, + &db, + &chain, + &keymap, + args.network, + |rpc_args, tx| { + let client = rpc_args.new_client()?; + client.send_raw_transaction(tx)?; + Ok(()) + }, + general_cmd, + ); + db.lock().unwrap().commit()?; + return res; + } + }; + + match rpc_cmd { + RpcCommands::Sync { rpc_args } => { + let RpcArgs { + fallback_height, + lookahead, + .. + } = rpc_args; + + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut db = db.lock().unwrap(); + + graph.index.set_lookahead_for_all(lookahead); + // we start at a height lower than last-seen tip in case of reorgs + let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| { + cp.height().saturating_sub(ASSUME_FINAL_DEPTH) + }); + + let rpc_client = rpc_args.new_client()?; + let mut emitter = Emitter::new(&rpc_client, start_height); + + let mut last_db_commit = Instant::now(); + let mut last_print = Instant::now(); + + while let Some((height, block)) = emitter.next_block()? { + let chain_update = + CheckPoint::from_header(&block.header, height).into_update(false); + let chain_changeset = chain.apply_update(chain_update)?; + let graph_changeset = graph.apply_block_relevant(block, height); + db.stage((chain_changeset, graph_changeset)); + + // commit staged db changes in intervals + if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + last_db_commit = Instant::now(); + db.commit()?; + println!( + "commited to db (took {}s)", + last_db_commit.elapsed().as_secs_f32() + ); + } + + // print synced-to height and current balance in intervals + if last_print.elapsed() >= STDOUT_PRINT_DELAY { + last_print = Instant::now(); + if let Some(synced_to) = chain.tip() { + let balance = { + graph.graph().balance( + &*chain, + synced_to.block_id(), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "synced to {} @ {} | total: {} sats", + synced_to.hash(), + synced_to.height(), + balance.total() + ); + } + } + } + + // mempool + let mempool_txs = emitter.mempool()?; + let graph_changeset = graph + .batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time)))); + db.stage((local_chain::ChangeSet::default(), graph_changeset)); + + // commit one last time! + db.commit()?; + } + RpcCommands::Live { rpc_args } => { + let RpcArgs { + fallback_height, + lookahead, + .. + } = rpc_args; + let sigterm_flag = start_ctrlc_handler(); + + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + // we start at a height lower than last-seen tip in case of reorgs + let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| { + cp.height().saturating_sub(ASSUME_FINAL_DEPTH) + }); + + let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); + let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { + println!("emitter thread started..."); + + let rpc_client = rpc_args.new_client()?; + let mut emitter = Emitter::new(&rpc_client, start_height); + + let mut block_count = rpc_client.get_block_count()? as u32; + tx.send(Emission::Tip(block_count))?; + + loop { + match emitter.next_block()? { + Some((height, block)) => { + if sigterm_flag.load(Ordering::Acquire) { + break; + } + if height > block_count { + block_count = rpc_client.get_block_count()? as u32; + tx.send(Emission::Tip(block_count))?; + } + tx.send(Emission::Block { height, block })?; + } + None => { + if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { + break; + } + println!("preparing mempool emission..."); + let now = Instant::now(); + tx.send(Emission::Mempool(emitter.mempool()?))?; + println!("mempool emission prepared in {}s", now.elapsed().as_secs()); + continue; + } + }; + } + + println!("emitter thread shutting down..."); + Ok(()) + }); + + let mut db = db.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut chain = chain.lock().unwrap(); + let mut tip_height = 0_u32; + + let mut last_db_commit = Instant::now(); + let mut last_print = Option::::None; + + for emission in rx { + let changeset = match emission { + Emission::Block { height, block } => { + let chain_update = + CheckPoint::from_header(&block.header, height).into_update(false); + let chain_changeset = chain.apply_update(chain_update)?; + let graph_changeset = graph.apply_block_relevant(block, height); + (chain_changeset, graph_changeset) + } + Emission::Mempool(mempool_txs) => { + let graph_changeset = graph.batch_insert_relevant_unconfirmed( + mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))), + ); + (local_chain::ChangeSet::default(), graph_changeset) + } + Emission::Tip(h) => { + tip_height = h; + continue; + } + }; + + db.stage(changeset); + + if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + last_db_commit = Instant::now(); + db.commit()?; + println!( + "commited to db (took {}s)", + last_db_commit.elapsed().as_secs_f32() + ); + } + + if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY { + last_print = Some(Instant::now()); + if let Some(synced_to) = chain.tip() { + let balance = { + graph.graph().balance( + &*chain, + synced_to.block_id(), + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + ) + }; + println!( + "synced to {} @ {} / {} | total: {} sats", + synced_to.hash(), + synced_to.height(), + tip_height, + balance.total() + ); + } + } + } + + emission_jh.join().expect("must join emitter thread")?; + } + } + + Ok(()) +} + +#[allow(dead_code)] +fn start_ctrlc_handler() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let cloned_flag = flag.clone(); + + ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release)); + + flag +} + +#[allow(dead_code)] +fn await_flag(flag: &AtomicBool, duration: Duration) -> bool { + let start = Instant::now(); + loop { + if flag.load(Ordering::Acquire) { + return true; + } + if start.elapsed() >= duration { + return false; + } + std::thread::sleep(Duration::from_secs(1)); + } +}