example_bitcoind_rpc: tweaks

* avoid holding mutex lock over io
* document `CHANNEL_BOUND` const
* use the `relevant` variant of `batch_insert_unconfirmed`
* print elapsed time in stdout for various updates
This commit is contained in:
志宇 2023-10-08 02:29:04 +08:00
parent 5f34df8489
commit b69c13ddf6
No known key found for this signature in database
GPG Key ID: F6345C9837C2BDE8

View File

@ -26,12 +26,13 @@ use example_cli::{
const DB_MAGIC: &[u8] = b"bdk_example_rpc"; const DB_MAGIC: &[u8] = b"bdk_example_rpc";
const DB_PATH: &str = ".bdk_example_rpc.db"; const DB_PATH: &str = ".bdk_example_rpc.db";
/// The mpsc channel bound for emissions from [`Emitter`].
const CHANNEL_BOUND: usize = 10; const CHANNEL_BOUND: usize = 10;
/// Delay for printing status to stdout. /// Delay for printing status to stdout.
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
/// Delay between mempool emissions. /// Delay between mempool emissions.
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30); const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
/// Delay for commiting to persistance. /// Delay for committing to persistance.
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
type ChangeSet = ( type ChangeSet = (
@ -111,18 +112,30 @@ enum RpcCommands {
} }
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
let start = Instant::now();
let (args, keymap, index, db, init_changeset) = let (args, keymap, index, db, init_changeset) =
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?; example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
println!(
"[{:>10}s] loaded initial changeset from db",
start.elapsed().as_secs_f32()
);
let graph = Mutex::new({ let graph = Mutex::new({
let mut graph = IndexedTxGraph::new(index); let mut graph = IndexedTxGraph::new(index);
graph.apply_changeset(init_changeset.1); graph.apply_changeset(init_changeset.1);
graph graph
}); });
println!("loaded indexed tx graph from db"); println!(
"[{:>10}s] loaded indexed tx graph from changeset",
start.elapsed().as_secs_f32()
);
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
println!("loaded local chain from db"); println!(
"[{:>10}s] loaded local chain from changeset",
start.elapsed().as_secs_f32()
);
let rpc_cmd = match args.command { let rpc_cmd = match args.command {
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
@ -153,14 +166,11 @@ fn main() -> anyhow::Result<()> {
.. ..
} = rpc_args; } = rpc_args;
let mut chain = chain.lock().unwrap(); graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();
graph.index.set_lookahead_for_all(lookahead);
let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?; let rpc_client = rpc_args.new_client()?;
let mut emitter = match chain.tip() { let mut emitter = match chain_tip {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height), None => Emitter::from_height(&rpc_client, fallback_height),
}; };
@ -169,6 +179,10 @@ fn main() -> anyhow::Result<()> {
let mut last_print = Instant::now(); let mut last_print = Instant::now();
while let Some((height, block)) = emitter.next_block()? { while let Some((height, block)) = emitter.next_block()? {
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();
let chain_update = let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false); CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain let chain_changeset = chain
@ -182,7 +196,8 @@ fn main() -> anyhow::Result<()> {
last_db_commit = Instant::now(); last_db_commit = Instant::now();
db.commit()?; db.commit()?;
println!( println!(
"commited to db (took {}s)", "[{:>10}s] commited to db (took {}s)",
start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32() last_db_commit.elapsed().as_secs_f32()
); );
} }
@ -200,7 +215,8 @@ fn main() -> anyhow::Result<()> {
) )
}; };
println!( println!(
"synced to {} @ {} | total: {} sats", "[{:>10}s] synced to {} @ {} | total: {} sats",
start.elapsed().as_secs_f32(),
synced_to.hash(), synced_to.hash(),
synced_to.height(), synced_to.height(),
balance.total() balance.total()
@ -209,13 +225,15 @@ fn main() -> anyhow::Result<()> {
} }
} }
// mempool
let mempool_txs = emitter.mempool()?; let mempool_txs = emitter.mempool()?;
let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs); let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
);
{
let mut db = db.lock().unwrap();
db.stage((local_chain::ChangeSet::default(), graph_changeset)); db.stage((local_chain::ChangeSet::default(), graph_changeset));
db.commit()?; // commit one last time
// commit one last time! }
db.commit()?;
} }
RpcCommands::Live { rpc_args } => { RpcCommands::Live { rpc_args } => {
let RpcArgs { let RpcArgs {
@ -228,10 +246,12 @@ fn main() -> anyhow::Result<()> {
graph.lock().unwrap().index.set_lookahead_for_all(lookahead); graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
let last_cp = chain.lock().unwrap().tip(); let last_cp = chain.lock().unwrap().tip();
println!(
"[{:>10}s] starting emitter thread...",
start.elapsed().as_secs_f32()
);
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND); let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
println!("emitter thread started...");
let rpc_client = rpc_args.new_client()?; let rpc_client = rpc_args.new_client()?;
let mut emitter = match last_cp { let mut emitter = match last_cp {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
@ -270,15 +290,15 @@ fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
}); });
let mut db = db.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut chain = chain.lock().unwrap();
let mut tip_height = 0_u32; let mut tip_height = 0_u32;
let mut last_db_commit = Instant::now(); let mut last_db_commit = Instant::now();
let mut last_print = Option::<Instant>::None; let mut last_print = Option::<Instant>::None;
for emission in rx { for emission in rx {
let mut db = db.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut chain = chain.lock().unwrap();
let changeset = match emission { let changeset = match emission {
Emission::Block { height, block } => { Emission::Block { height, block } => {
let chain_update = let chain_update =
@ -307,7 +327,8 @@ fn main() -> anyhow::Result<()> {
last_db_commit = Instant::now(); last_db_commit = Instant::now();
db.commit()?; db.commit()?;
println!( println!(
"commited to db (took {}s)", "[{:>10}s] commited to db (took {}s)",
start.elapsed().as_secs_f32(),
last_db_commit.elapsed().as_secs_f32() last_db_commit.elapsed().as_secs_f32()
); );
} }
@ -324,7 +345,8 @@ fn main() -> anyhow::Result<()> {
) )
}; };
println!( println!(
"synced to {} @ {} / {} | total: {} sats", "[{:>10}s] synced to {} @ {} / {} | total: {} sats",
start.elapsed().as_secs_f32(),
synced_to.hash(), synced_to.hash(),
synced_to.height(), synced_to.height(),
tip_height, tip_height,