feat(example): use changeset staging with rpc polling example

This commit is contained in:
志宇 2024-06-13 22:36:12 +08:00 committed by Steve Myers
parent 19328d4999
commit ec36c7ecca
No known key found for this signature in database
GPG Key ID: 8105A46B22C2D051

View File

@ -11,12 +11,12 @@ use bdk_bitcoind_rpc::{
bitcoincore_rpc::{Auth, Client, RpcApi}, bitcoincore_rpc::{Auth, Client, RpcApi},
Emitter, Emitter,
}; };
use bdk_chain::persist::PersistBackend; use bdk_chain::persist::{PersistBackend, StageExt};
use bdk_chain::{ use bdk_chain::{
bitcoin::{constants::genesis_block, Block, Transaction}, bitcoin::{constants::genesis_block, Block, Transaction},
indexed_tx_graph, keychain, indexed_tx_graph, keychain,
local_chain::{self, LocalChain}, local_chain::{self, LocalChain},
ConfirmationTimeHeightAnchor, IndexedTxGraph, Append, ConfirmationTimeHeightAnchor, IndexedTxGraph,
}; };
use example_cli::{ use example_cli::{
anyhow, anyhow,
@ -176,6 +176,7 @@ fn main() -> anyhow::Result<()> {
let chain_tip = chain.lock().unwrap().tip(); let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?; let rpc_client = rpc_args.new_client()?;
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height); let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
let mut db_stage = ChangeSet::default();
let mut last_db_commit = Instant::now(); let mut last_db_commit = Instant::now();
let mut last_print = Instant::now(); let mut last_print = Instant::now();
@ -185,17 +186,18 @@ fn main() -> anyhow::Result<()> {
let mut chain = chain.lock().unwrap(); let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap(); let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();
let chain_changeset = chain let chain_changeset = chain
.apply_update(emission.checkpoint) .apply_update(emission.checkpoint)
.expect("must always apply as we receive blocks in order from emitter"); .expect("must always apply as we receive blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(&emission.block, height); let graph_changeset = graph.apply_block_relevant(&emission.block, height);
db.write_changes(&(chain_changeset, graph_changeset))?; db_stage.append((chain_changeset, graph_changeset));
// commit staged db changes in intervals // commit staged db changes in intervals
if last_db_commit.elapsed() >= DB_COMMIT_DELAY { if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
let db = &mut *db.lock().unwrap();
last_db_commit = Instant::now(); last_db_commit = Instant::now();
db_stage.commit_to(db)?;
println!( println!(
"[{:>10}s] committed to db (took {}s)", "[{:>10}s] committed to db (took {}s)",
start.elapsed().as_secs_f32(), start.elapsed().as_secs_f32(),
@ -230,8 +232,11 @@ fn main() -> anyhow::Result<()> {
mempool_txs.iter().map(|(tx, time)| (tx, *time)), mempool_txs.iter().map(|(tx, time)| (tx, *time)),
); );
{ {
let mut db = db.lock().unwrap(); let db = &mut *db.lock().unwrap();
db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?; db_stage.append_and_commit_to(
(local_chain::ChangeSet::default(), graph_changeset),
db,
)?;
} }
} }
RpcCommands::Live { rpc_args } => { RpcCommands::Live { rpc_args } => {
@ -287,9 +292,9 @@ fn main() -> anyhow::Result<()> {
let mut tip_height = 0_u32; let mut tip_height = 0_u32;
let mut last_db_commit = Instant::now(); let mut last_db_commit = Instant::now();
let mut last_print = Option::<Instant>::None; let mut last_print = Option::<Instant>::None;
let mut db_stage = ChangeSet::default();
for emission in rx { for emission in rx {
let mut db = db.lock().unwrap();
let mut graph = graph.lock().unwrap(); let mut graph = graph.lock().unwrap();
let mut chain = chain.lock().unwrap(); let mut chain = chain.lock().unwrap();
@ -314,11 +319,12 @@ fn main() -> anyhow::Result<()> {
continue; continue;
} }
}; };
db_stage.append(changeset);
db.write_changes(&changeset)?;
if last_db_commit.elapsed() >= DB_COMMIT_DELAY { if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
let db = &mut *db.lock().unwrap();
last_db_commit = Instant::now(); last_db_commit = Instant::now();
db_stage.commit_to(db)?;
println!( println!(
"[{:>10}s] committed to db (took {}s)", "[{:>10}s] committed to db (took {}s)",
start.elapsed().as_secs_f32(), start.elapsed().as_secs_f32(),