367 lines
13 KiB
Rust
367 lines
13 KiB
Rust
|
use std::{
|
||
|
path::PathBuf,
|
||
|
sync::{
|
||
|
atomic::{AtomicBool, Ordering},
|
||
|
Arc, Mutex,
|
||
|
},
|
||
|
time::{Duration, Instant},
|
||
|
};
|
||
|
|
||
|
use bdk_bitcoind_rpc::{
|
||
|
bitcoincore_rpc::{Auth, Client, RpcApi},
|
||
|
Emitter,
|
||
|
};
|
||
|
use bdk_chain::{
|
||
|
bitcoin::{Block, Transaction},
|
||
|
indexed_tx_graph, keychain,
|
||
|
local_chain::{self, CheckPoint, LocalChain},
|
||
|
ConfirmationTimeAnchor, IndexedTxGraph,
|
||
|
};
|
||
|
use example_cli::{
|
||
|
anyhow,
|
||
|
clap::{self, Args, Subcommand},
|
||
|
Keychain,
|
||
|
};
|
||
|
|
||
|
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
|
||
|
const DB_PATH: &str = ".bdk_example_rpc.db";
|
||
|
|
||
|
const CHANNEL_BOUND: usize = 10;
|
||
|
/// The block depth which we assume no reorgs can happen at.
|
||
|
const ASSUME_FINAL_DEPTH: u32 = 6;
|
||
|
/// Delay for printing status to stdout.
|
||
|
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
|
||
|
/// Delay between mempool emissions.
|
||
|
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
|
||
|
/// Delay for commiting to persistance.
|
||
|
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
|
||
|
|
||
|
type ChangeSet = (
|
||
|
local_chain::ChangeSet,
|
||
|
indexed_tx_graph::ChangeSet<ConfirmationTimeAnchor, keychain::ChangeSet<Keychain>>,
|
||
|
);
|
||
|
|
||
|
#[derive(Debug)]
|
||
|
enum Emission {
|
||
|
Block { height: u32, block: Block },
|
||
|
Mempool(Vec<(Transaction, u64)>),
|
||
|
Tip(u32),
|
||
|
}
|
||
|
|
||
|
#[derive(Args, Debug, Clone)]
|
||
|
struct RpcArgs {
|
||
|
/// RPC URL
|
||
|
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
|
||
|
url: String,
|
||
|
/// RPC auth cookie file
|
||
|
#[clap(env = "RPC_COOKIE", long)]
|
||
|
rpc_cookie: Option<PathBuf>,
|
||
|
/// RPC auth username
|
||
|
#[clap(env = "RPC_USER", long)]
|
||
|
rpc_user: Option<String>,
|
||
|
/// RPC auth password
|
||
|
#[clap(env = "RPC_PASS", long)]
|
||
|
rpc_password: Option<String>,
|
||
|
/// Starting block height to fallback to if no point of agreement if found
|
||
|
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
|
||
|
fallback_height: u32,
|
||
|
/// The unused-scripts lookahead will be kept at this size
|
||
|
#[clap(long, default_value = "10")]
|
||
|
lookahead: u32,
|
||
|
}
|
||
|
|
||
|
impl From<RpcArgs> for Auth {
|
||
|
fn from(args: RpcArgs) -> Self {
|
||
|
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
|
||
|
(None, None, None) => Self::None,
|
||
|
(Some(path), _, _) => Self::CookieFile(path),
|
||
|
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
|
||
|
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
|
||
|
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl RpcArgs {
|
||
|
fn new_client(&self) -> anyhow::Result<Client> {
|
||
|
Ok(Client::new(
|
||
|
&self.url,
|
||
|
match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) {
|
||
|
(None, None, None) => Auth::None,
|
||
|
(Some(path), _, _) => Auth::CookieFile(path.clone()),
|
||
|
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
|
||
|
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
|
||
|
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
|
||
|
},
|
||
|
)?)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Subcommand, Debug, Clone)]
|
||
|
enum RpcCommands {
|
||
|
/// Syncs local state with remote state via RPC (starting from last point of agreement) and
|
||
|
/// stores/indexes relevant transactions
|
||
|
Sync {
|
||
|
#[clap(flatten)]
|
||
|
rpc_args: RpcArgs,
|
||
|
},
|
||
|
/// Sync by having the emitter logic in a separate thread
|
||
|
Live {
|
||
|
#[clap(flatten)]
|
||
|
rpc_args: RpcArgs,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
fn main() -> anyhow::Result<()> {
|
||
|
let (args, keymap, index, db, init_changeset) =
|
||
|
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
|
||
|
|
||
|
let graph = Mutex::new({
|
||
|
let mut graph = IndexedTxGraph::new(index);
|
||
|
graph.apply_changeset(init_changeset.1);
|
||
|
graph
|
||
|
});
|
||
|
println!("loaded indexed tx graph from db");
|
||
|
|
||
|
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
|
||
|
println!("loaded local chain from db");
|
||
|
|
||
|
let rpc_cmd = match args.command {
|
||
|
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
|
||
|
general_cmd => {
|
||
|
let res = example_cli::handle_commands(
|
||
|
&graph,
|
||
|
&db,
|
||
|
&chain,
|
||
|
&keymap,
|
||
|
args.network,
|
||
|
|rpc_args, tx| {
|
||
|
let client = rpc_args.new_client()?;
|
||
|
client.send_raw_transaction(tx)?;
|
||
|
Ok(())
|
||
|
},
|
||
|
general_cmd,
|
||
|
);
|
||
|
db.lock().unwrap().commit()?;
|
||
|
return res;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
match rpc_cmd {
|
||
|
RpcCommands::Sync { rpc_args } => {
|
||
|
let RpcArgs {
|
||
|
fallback_height,
|
||
|
lookahead,
|
||
|
..
|
||
|
} = rpc_args;
|
||
|
|
||
|
let mut chain = chain.lock().unwrap();
|
||
|
let mut graph = graph.lock().unwrap();
|
||
|
let mut db = db.lock().unwrap();
|
||
|
|
||
|
graph.index.set_lookahead_for_all(lookahead);
|
||
|
// we start at a height lower than last-seen tip in case of reorgs
|
||
|
let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
|
||
|
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
|
||
|
});
|
||
|
|
||
|
let rpc_client = rpc_args.new_client()?;
|
||
|
let mut emitter = Emitter::new(&rpc_client, start_height);
|
||
|
|
||
|
let mut last_db_commit = Instant::now();
|
||
|
let mut last_print = Instant::now();
|
||
|
|
||
|
while let Some((height, block)) = emitter.next_block()? {
|
||
|
let chain_update =
|
||
|
CheckPoint::from_header(&block.header, height).into_update(false);
|
||
|
let chain_changeset = chain.apply_update(chain_update)?;
|
||
|
let graph_changeset = graph.apply_block_relevant(block, height);
|
||
|
db.stage((chain_changeset, graph_changeset));
|
||
|
|
||
|
// commit staged db changes in intervals
|
||
|
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
|
||
|
last_db_commit = Instant::now();
|
||
|
db.commit()?;
|
||
|
println!(
|
||
|
"commited to db (took {}s)",
|
||
|
last_db_commit.elapsed().as_secs_f32()
|
||
|
);
|
||
|
}
|
||
|
|
||
|
// print synced-to height and current balance in intervals
|
||
|
if last_print.elapsed() >= STDOUT_PRINT_DELAY {
|
||
|
last_print = Instant::now();
|
||
|
if let Some(synced_to) = chain.tip() {
|
||
|
let balance = {
|
||
|
graph.graph().balance(
|
||
|
&*chain,
|
||
|
synced_to.block_id(),
|
||
|
graph.index.outpoints().iter().cloned(),
|
||
|
|(k, _), _| k == &Keychain::Internal,
|
||
|
)
|
||
|
};
|
||
|
println!(
|
||
|
"synced to {} @ {} | total: {} sats",
|
||
|
synced_to.hash(),
|
||
|
synced_to.height(),
|
||
|
balance.total()
|
||
|
);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// mempool
|
||
|
let mempool_txs = emitter.mempool()?;
|
||
|
let graph_changeset = graph
|
||
|
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
|
||
|
db.stage((local_chain::ChangeSet::default(), graph_changeset));
|
||
|
|
||
|
// commit one last time!
|
||
|
db.commit()?;
|
||
|
}
|
||
|
RpcCommands::Live { rpc_args } => {
|
||
|
let RpcArgs {
|
||
|
fallback_height,
|
||
|
lookahead,
|
||
|
..
|
||
|
} = rpc_args;
|
||
|
let sigterm_flag = start_ctrlc_handler();
|
||
|
|
||
|
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
|
||
|
// we start at a height lower than last-seen tip in case of reorgs
|
||
|
let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
|
||
|
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
|
||
|
});
|
||
|
|
||
|
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
|
||
|
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
|
||
|
println!("emitter thread started...");
|
||
|
|
||
|
let rpc_client = rpc_args.new_client()?;
|
||
|
let mut emitter = Emitter::new(&rpc_client, start_height);
|
||
|
|
||
|
let mut block_count = rpc_client.get_block_count()? as u32;
|
||
|
tx.send(Emission::Tip(block_count))?;
|
||
|
|
||
|
loop {
|
||
|
match emitter.next_block()? {
|
||
|
Some((height, block)) => {
|
||
|
if sigterm_flag.load(Ordering::Acquire) {
|
||
|
break;
|
||
|
}
|
||
|
if height > block_count {
|
||
|
block_count = rpc_client.get_block_count()? as u32;
|
||
|
tx.send(Emission::Tip(block_count))?;
|
||
|
}
|
||
|
tx.send(Emission::Block { height, block })?;
|
||
|
}
|
||
|
None => {
|
||
|
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
|
||
|
break;
|
||
|
}
|
||
|
println!("preparing mempool emission...");
|
||
|
let now = Instant::now();
|
||
|
tx.send(Emission::Mempool(emitter.mempool()?))?;
|
||
|
println!("mempool emission prepared in {}s", now.elapsed().as_secs());
|
||
|
continue;
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
|
||
|
println!("emitter thread shutting down...");
|
||
|
Ok(())
|
||
|
});
|
||
|
|
||
|
let mut db = db.lock().unwrap();
|
||
|
let mut graph = graph.lock().unwrap();
|
||
|
let mut chain = chain.lock().unwrap();
|
||
|
let mut tip_height = 0_u32;
|
||
|
|
||
|
let mut last_db_commit = Instant::now();
|
||
|
let mut last_print = Option::<Instant>::None;
|
||
|
|
||
|
for emission in rx {
|
||
|
let changeset = match emission {
|
||
|
Emission::Block { height, block } => {
|
||
|
let chain_update =
|
||
|
CheckPoint::from_header(&block.header, height).into_update(false);
|
||
|
let chain_changeset = chain.apply_update(chain_update)?;
|
||
|
let graph_changeset = graph.apply_block_relevant(block, height);
|
||
|
(chain_changeset, graph_changeset)
|
||
|
}
|
||
|
Emission::Mempool(mempool_txs) => {
|
||
|
let graph_changeset = graph.batch_insert_relevant_unconfirmed(
|
||
|
mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))),
|
||
|
);
|
||
|
(local_chain::ChangeSet::default(), graph_changeset)
|
||
|
}
|
||
|
Emission::Tip(h) => {
|
||
|
tip_height = h;
|
||
|
continue;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
db.stage(changeset);
|
||
|
|
||
|
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
|
||
|
last_db_commit = Instant::now();
|
||
|
db.commit()?;
|
||
|
println!(
|
||
|
"commited to db (took {}s)",
|
||
|
last_db_commit.elapsed().as_secs_f32()
|
||
|
);
|
||
|
}
|
||
|
|
||
|
if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY {
|
||
|
last_print = Some(Instant::now());
|
||
|
if let Some(synced_to) = chain.tip() {
|
||
|
let balance = {
|
||
|
graph.graph().balance(
|
||
|
&*chain,
|
||
|
synced_to.block_id(),
|
||
|
graph.index.outpoints().iter().cloned(),
|
||
|
|(k, _), _| k == &Keychain::Internal,
|
||
|
)
|
||
|
};
|
||
|
println!(
|
||
|
"synced to {} @ {} / {} | total: {} sats",
|
||
|
synced_to.hash(),
|
||
|
synced_to.height(),
|
||
|
tip_height,
|
||
|
balance.total()
|
||
|
);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
emission_jh.join().expect("must join emitter thread")?;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
#[allow(dead_code)]
|
||
|
fn start_ctrlc_handler() -> Arc<AtomicBool> {
|
||
|
let flag = Arc::new(AtomicBool::new(false));
|
||
|
let cloned_flag = flag.clone();
|
||
|
|
||
|
ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));
|
||
|
|
||
|
flag
|
||
|
}
|
||
|
|
||
|
#[allow(dead_code)]
|
||
|
fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
|
||
|
let start = Instant::now();
|
||
|
loop {
|
||
|
if flag.load(Ordering::Acquire) {
|
||
|
return true;
|
||
|
}
|
||
|
if start.elapsed() >= duration {
|
||
|
return false;
|
||
|
}
|
||
|
std::thread::sleep(Duration::from_secs(1));
|
||
|
}
|
||
|
}
|