2023-10-04 18:22:03 +08:00
|
|
|
use std::{
|
|
|
|
path::PathBuf,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc, Mutex,
|
|
|
|
},
|
|
|
|
time::{Duration, Instant},
|
|
|
|
};
|
|
|
|
|
|
|
|
use bdk_bitcoind_rpc::{
|
|
|
|
bitcoincore_rpc::{Auth, Client, RpcApi},
|
|
|
|
Emitter,
|
|
|
|
};
|
2024-06-01 00:06:20 -05:00
|
|
|
use bdk_chain::persist::PersistBackend;
|
2023-10-04 18:22:03 +08:00
|
|
|
use bdk_chain::{
|
2023-12-28 12:49:04 +08:00
|
|
|
bitcoin::{constants::genesis_block, Block, Transaction},
|
2023-10-04 18:22:03 +08:00
|
|
|
indexed_tx_graph, keychain,
|
2023-12-30 20:48:20 +08:00
|
|
|
local_chain::{self, LocalChain},
|
2023-11-12 21:31:44 +08:00
|
|
|
ConfirmationTimeHeightAnchor, IndexedTxGraph,
|
2023-10-04 18:22:03 +08:00
|
|
|
};
|
|
|
|
use example_cli::{
|
|
|
|
anyhow,
|
|
|
|
clap::{self, Args, Subcommand},
|
|
|
|
Keychain,
|
|
|
|
};
|
|
|
|
|
|
|
|
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
|
|
|
|
const DB_PATH: &str = ".bdk_example_rpc.db";
|
|
|
|
|
2023-10-08 02:29:04 +08:00
|
|
|
/// The mpsc channel bound for emissions from [`Emitter`].
|
2023-10-04 18:22:03 +08:00
|
|
|
const CHANNEL_BOUND: usize = 10;
|
|
|
|
/// Delay for printing status to stdout.
|
|
|
|
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
|
|
|
|
/// Delay between mempool emissions.
|
|
|
|
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
|
2023-10-20 17:37:28 -03:00
|
|
|
/// Delay for committing to persistence.
|
2023-10-04 18:22:03 +08:00
|
|
|
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
|
|
|
|
|
|
|
|
type ChangeSet = (
|
|
|
|
local_chain::ChangeSet,
|
2023-11-12 21:31:44 +08:00
|
|
|
indexed_tx_graph::ChangeSet<ConfirmationTimeHeightAnchor, keychain::ChangeSet<Keychain>>,
|
2023-10-04 18:22:03 +08:00
|
|
|
);
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
enum Emission {
|
2023-12-30 20:48:20 +08:00
|
|
|
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
|
2023-10-04 18:22:03 +08:00
|
|
|
Mempool(Vec<(Transaction, u64)>),
|
|
|
|
Tip(u32),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Args, Debug, Clone)]
|
|
|
|
struct RpcArgs {
|
|
|
|
/// RPC URL
|
|
|
|
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
|
|
|
|
url: String,
|
|
|
|
/// RPC auth cookie file
|
|
|
|
#[clap(env = "RPC_COOKIE", long)]
|
|
|
|
rpc_cookie: Option<PathBuf>,
|
|
|
|
/// RPC auth username
|
|
|
|
#[clap(env = "RPC_USER", long)]
|
|
|
|
rpc_user: Option<String>,
|
|
|
|
/// RPC auth password
|
|
|
|
#[clap(env = "RPC_PASS", long)]
|
|
|
|
rpc_password: Option<String>,
|
|
|
|
/// Starting block height to fallback to if no point of agreement if found
|
|
|
|
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
|
|
|
|
fallback_height: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<RpcArgs> for Auth {
|
|
|
|
fn from(args: RpcArgs) -> Self {
|
|
|
|
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
|
|
|
|
(None, None, None) => Self::None,
|
|
|
|
(Some(path), _, _) => Self::CookieFile(path),
|
|
|
|
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
|
|
|
|
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
|
|
|
|
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl RpcArgs {
|
|
|
|
fn new_client(&self) -> anyhow::Result<Client> {
|
|
|
|
Ok(Client::new(
|
|
|
|
&self.url,
|
|
|
|
match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) {
|
|
|
|
(None, None, None) => Auth::None,
|
|
|
|
(Some(path), _, _) => Auth::CookieFile(path.clone()),
|
|
|
|
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
|
|
|
|
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
|
|
|
|
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
|
|
|
|
},
|
|
|
|
)?)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Subcommand, Debug, Clone)]
|
|
|
|
enum RpcCommands {
|
|
|
|
/// Syncs local state with remote state via RPC (starting from last point of agreement) and
|
|
|
|
/// stores/indexes relevant transactions
|
|
|
|
Sync {
|
|
|
|
#[clap(flatten)]
|
|
|
|
rpc_args: RpcArgs,
|
|
|
|
},
|
|
|
|
/// Sync by having the emitter logic in a separate thread
|
|
|
|
Live {
|
|
|
|
#[clap(flatten)]
|
|
|
|
rpc_args: RpcArgs,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
fn main() -> anyhow::Result<()> {
|
2023-10-08 02:29:04 +08:00
|
|
|
let start = Instant::now();
|
2024-01-30 17:56:51 -05:00
|
|
|
let example_cli::Init {
|
|
|
|
args,
|
|
|
|
keymap,
|
|
|
|
index,
|
|
|
|
db,
|
|
|
|
init_changeset,
|
|
|
|
} = example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
|
2023-10-08 02:29:04 +08:00
|
|
|
println!(
|
|
|
|
"[{:>10}s] loaded initial changeset from db",
|
|
|
|
start.elapsed().as_secs_f32()
|
|
|
|
);
|
2023-12-28 12:49:04 +08:00
|
|
|
let (init_chain_changeset, init_graph_changeset) = init_changeset;
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
let graph = Mutex::new({
|
|
|
|
let mut graph = IndexedTxGraph::new(index);
|
2023-12-28 12:49:04 +08:00
|
|
|
graph.apply_changeset(init_graph_changeset);
|
2023-10-04 18:22:03 +08:00
|
|
|
graph
|
|
|
|
});
|
2023-10-08 02:29:04 +08:00
|
|
|
println!(
|
|
|
|
"[{:>10}s] loaded indexed tx graph from changeset",
|
|
|
|
start.elapsed().as_secs_f32()
|
|
|
|
);
|
2023-10-04 18:22:03 +08:00
|
|
|
|
2023-12-28 12:49:04 +08:00
|
|
|
let chain = Mutex::new(if init_chain_changeset.is_empty() {
|
|
|
|
let genesis_hash = genesis_block(args.network).block_hash();
|
|
|
|
let (chain, chain_changeset) = LocalChain::from_genesis_hash(genesis_hash);
|
|
|
|
let mut db = db.lock().unwrap();
|
2024-06-01 00:06:20 -05:00
|
|
|
db.write_changes(&(chain_changeset, Default::default()))?;
|
2023-12-28 12:49:04 +08:00
|
|
|
chain
|
|
|
|
} else {
|
|
|
|
LocalChain::from_changeset(init_chain_changeset)?
|
|
|
|
});
|
2023-10-08 02:29:04 +08:00
|
|
|
println!(
|
|
|
|
"[{:>10}s] loaded local chain from changeset",
|
|
|
|
start.elapsed().as_secs_f32()
|
|
|
|
);
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
let rpc_cmd = match args.command {
|
|
|
|
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
|
|
|
|
general_cmd => {
|
2024-01-19 11:23:46 +11:00
|
|
|
return example_cli::handle_commands(
|
2023-10-04 18:22:03 +08:00
|
|
|
&graph,
|
|
|
|
&db,
|
|
|
|
&chain,
|
|
|
|
&keymap,
|
|
|
|
args.network,
|
|
|
|
|rpc_args, tx| {
|
|
|
|
let client = rpc_args.new_client()?;
|
|
|
|
client.send_raw_transaction(tx)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
general_cmd,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
match rpc_cmd {
|
|
|
|
RpcCommands::Sync { rpc_args } => {
|
|
|
|
let RpcArgs {
|
2023-11-28 18:08:49 +01:00
|
|
|
fallback_height, ..
|
2023-10-04 18:22:03 +08:00
|
|
|
} = rpc_args;
|
|
|
|
|
2023-10-08 02:29:04 +08:00
|
|
|
let chain_tip = chain.lock().unwrap().tip();
|
2023-10-04 18:22:03 +08:00
|
|
|
let rpc_client = rpc_args.new_client()?;
|
2023-10-12 16:55:32 +08:00
|
|
|
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
let mut last_db_commit = Instant::now();
|
|
|
|
let mut last_print = Instant::now();
|
|
|
|
|
2023-12-30 20:48:20 +08:00
|
|
|
while let Some(emission) = emitter.next_block()? {
|
|
|
|
let height = emission.block_height();
|
|
|
|
|
2023-10-08 02:29:04 +08:00
|
|
|
let mut chain = chain.lock().unwrap();
|
|
|
|
let mut graph = graph.lock().unwrap();
|
|
|
|
let mut db = db.lock().unwrap();
|
|
|
|
|
2023-10-07 00:56:01 +08:00
|
|
|
let chain_changeset = chain
|
2024-04-17 10:02:12 +08:00
|
|
|
.apply_update(emission.checkpoint)
|
2023-10-20 17:37:28 -03:00
|
|
|
.expect("must always apply as we receive blocks in order from emitter");
|
2023-10-11 11:16:38 +03:00
|
|
|
let graph_changeset = graph.apply_block_relevant(&emission.block, height);
|
2024-06-01 00:06:20 -05:00
|
|
|
db.write_changes(&(chain_changeset, graph_changeset))?;
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
// commit staged db changes in intervals
|
|
|
|
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
|
|
|
|
last_db_commit = Instant::now();
|
|
|
|
println!(
|
2023-10-20 17:37:28 -03:00
|
|
|
"[{:>10}s] committed to db (took {}s)",
|
2023-10-08 02:29:04 +08:00
|
|
|
start.elapsed().as_secs_f32(),
|
2023-10-04 18:22:03 +08:00
|
|
|
last_db_commit.elapsed().as_secs_f32()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
// print synced-to height and current balance in intervals
|
|
|
|
if last_print.elapsed() >= STDOUT_PRINT_DELAY {
|
|
|
|
last_print = Instant::now();
|
2023-10-12 16:55:32 +08:00
|
|
|
let synced_to = chain.tip();
|
|
|
|
let balance = {
|
|
|
|
graph.graph().balance(
|
|
|
|
&*chain,
|
|
|
|
synced_to.block_id(),
|
2024-06-06 10:17:55 +10:00
|
|
|
graph.index.outpoints().iter().cloned(),
|
2023-10-12 16:55:32 +08:00
|
|
|
|(k, _), _| k == &Keychain::Internal,
|
|
|
|
)
|
|
|
|
};
|
|
|
|
println!(
|
|
|
|
"[{:>10}s] synced to {} @ {} | total: {} sats",
|
|
|
|
start.elapsed().as_secs_f32(),
|
|
|
|
synced_to.hash(),
|
|
|
|
synced_to.height(),
|
|
|
|
balance.total()
|
|
|
|
);
|
2023-10-04 18:22:03 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let mempool_txs = emitter.mempool()?;
|
2023-10-08 02:29:04 +08:00
|
|
|
let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
|
|
|
|
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
|
|
|
|
);
|
|
|
|
{
|
|
|
|
let mut db = db.lock().unwrap();
|
2024-06-01 00:06:20 -05:00
|
|
|
db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?;
|
2023-10-08 02:29:04 +08:00
|
|
|
}
|
2023-10-04 18:22:03 +08:00
|
|
|
}
|
|
|
|
RpcCommands::Live { rpc_args } => {
|
|
|
|
let RpcArgs {
|
2023-11-28 18:08:49 +01:00
|
|
|
fallback_height, ..
|
2023-10-04 18:22:03 +08:00
|
|
|
} = rpc_args;
|
|
|
|
let sigterm_flag = start_ctrlc_handler();
|
|
|
|
|
2023-10-07 00:56:01 +08:00
|
|
|
let last_cp = chain.lock().unwrap().tip();
|
2023-10-04 18:22:03 +08:00
|
|
|
|
2023-10-08 02:29:04 +08:00
|
|
|
println!(
|
|
|
|
"[{:>10}s] starting emitter thread...",
|
|
|
|
start.elapsed().as_secs_f32()
|
|
|
|
);
|
2023-10-04 18:22:03 +08:00
|
|
|
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
|
|
|
|
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
|
|
|
|
let rpc_client = rpc_args.new_client()?;
|
2023-10-12 16:55:32 +08:00
|
|
|
let mut emitter = Emitter::new(&rpc_client, last_cp, fallback_height);
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
let mut block_count = rpc_client.get_block_count()? as u32;
|
|
|
|
tx.send(Emission::Tip(block_count))?;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match emitter.next_block()? {
|
2023-12-30 20:48:20 +08:00
|
|
|
Some(block_emission) => {
|
|
|
|
let height = block_emission.block_height();
|
2023-10-04 18:22:03 +08:00
|
|
|
if sigterm_flag.load(Ordering::Acquire) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if height > block_count {
|
|
|
|
block_count = rpc_client.get_block_count()? as u32;
|
|
|
|
tx.send(Emission::Tip(block_count))?;
|
|
|
|
}
|
2023-12-30 20:48:20 +08:00
|
|
|
tx.send(Emission::Block(block_emission))?;
|
2023-10-04 18:22:03 +08:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
println!("preparing mempool emission...");
|
|
|
|
let now = Instant::now();
|
|
|
|
tx.send(Emission::Mempool(emitter.mempool()?))?;
|
|
|
|
println!("mempool emission prepared in {}s", now.elapsed().as_secs());
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
println!("emitter thread shutting down...");
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
|
|
|
|
let mut tip_height = 0_u32;
|
|
|
|
let mut last_db_commit = Instant::now();
|
|
|
|
let mut last_print = Option::<Instant>::None;
|
|
|
|
|
|
|
|
for emission in rx {
|
2023-10-08 02:29:04 +08:00
|
|
|
let mut db = db.lock().unwrap();
|
|
|
|
let mut graph = graph.lock().unwrap();
|
|
|
|
let mut chain = chain.lock().unwrap();
|
|
|
|
|
2023-10-04 18:22:03 +08:00
|
|
|
let changeset = match emission {
|
2023-12-30 20:48:20 +08:00
|
|
|
Emission::Block(block_emission) => {
|
|
|
|
let height = block_emission.block_height();
|
2023-10-07 00:56:01 +08:00
|
|
|
let chain_changeset = chain
|
2024-04-17 10:02:12 +08:00
|
|
|
.apply_update(block_emission.checkpoint)
|
2023-10-20 17:37:28 -03:00
|
|
|
.expect("must always apply as we receive blocks in order from emitter");
|
2023-12-30 20:48:20 +08:00
|
|
|
let graph_changeset =
|
2023-10-11 11:16:38 +03:00
|
|
|
graph.apply_block_relevant(&block_emission.block, height);
|
2023-10-04 18:22:03 +08:00
|
|
|
(chain_changeset, graph_changeset)
|
|
|
|
}
|
|
|
|
Emission::Mempool(mempool_txs) => {
|
|
|
|
let graph_changeset = graph.batch_insert_relevant_unconfirmed(
|
2023-10-06 02:05:31 +08:00
|
|
|
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
|
2023-10-04 18:22:03 +08:00
|
|
|
);
|
|
|
|
(local_chain::ChangeSet::default(), graph_changeset)
|
|
|
|
}
|
|
|
|
Emission::Tip(h) => {
|
|
|
|
tip_height = h;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2024-06-01 00:06:20 -05:00
|
|
|
db.write_changes(&changeset)?;
|
2023-10-04 18:22:03 +08:00
|
|
|
|
|
|
|
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
|
|
|
|
last_db_commit = Instant::now();
|
|
|
|
println!(
|
2023-10-20 17:37:28 -03:00
|
|
|
"[{:>10}s] committed to db (took {}s)",
|
2023-10-08 02:29:04 +08:00
|
|
|
start.elapsed().as_secs_f32(),
|
2023-10-04 18:22:03 +08:00
|
|
|
last_db_commit.elapsed().as_secs_f32()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY {
|
|
|
|
last_print = Some(Instant::now());
|
2023-10-12 16:55:32 +08:00
|
|
|
let synced_to = chain.tip();
|
|
|
|
let balance = {
|
|
|
|
graph.graph().balance(
|
|
|
|
&*chain,
|
|
|
|
synced_to.block_id(),
|
2024-06-06 10:17:55 +10:00
|
|
|
graph.index.outpoints().iter().cloned(),
|
2023-10-12 16:55:32 +08:00
|
|
|
|(k, _), _| k == &Keychain::Internal,
|
|
|
|
)
|
|
|
|
};
|
|
|
|
println!(
|
|
|
|
"[{:>10}s] synced to {} @ {} / {} | total: {} sats",
|
|
|
|
start.elapsed().as_secs_f32(),
|
|
|
|
synced_to.hash(),
|
|
|
|
synced_to.height(),
|
|
|
|
tip_height,
|
|
|
|
balance.total()
|
|
|
|
);
|
2023-10-04 18:22:03 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
emission_jh.join().expect("must join emitter thread")?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
fn start_ctrlc_handler() -> Arc<AtomicBool> {
|
|
|
|
let flag = Arc::new(AtomicBool::new(false));
|
|
|
|
let cloned_flag = flag.clone();
|
|
|
|
|
|
|
|
ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));
|
|
|
|
|
|
|
|
flag
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
|
|
|
|
let start = Instant::now();
|
|
|
|
loop {
|
|
|
|
if flag.load(Ordering::Acquire) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if start.elapsed() >= duration {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
std::thread::sleep(Duration::from_secs(1));
|
|
|
|
}
|
|
|
|
}
|