bitcoind_rpc!: bring back CheckPoints to Emitter

* `bdk_chain` dependency is added. In the future, we will introduce a
  separate `bdk_core` crate to contain shared types.
* replace `Emitter::new` with `from_height` and `from_checkpoint`
  * `from_height` emits from the given start height
  * `from_checkpoint` uses the provided cp to find agreement point
* introduce logic that ensures emitted blocks can connect with
  receiver's `LocalChain`
* in our rpc example, we can now `expect()` chain updates to always
  since we are using checkpoints and receiving blocks in order
This commit is contained in:
志宇
2023-10-07 00:56:01 +08:00
parent 57590e0a1f
commit 5f34df8489
4 changed files with 84 additions and 43 deletions

View File

@@ -27,8 +27,6 @@ const DB_MAGIC: &[u8] = b"bdk_example_rpc";
const DB_PATH: &str = ".bdk_example_rpc.db";
const CHANNEL_BOUND: usize = 10;
/// The block depth which we assume no reorgs can happen at.
const ASSUME_FINAL_DEPTH: u32 = 6;
/// Delay for printing status to stdout.
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
/// Delay between mempool emissions.
@@ -160,13 +158,12 @@ fn main() -> anyhow::Result<()> {
let mut db = db.lock().unwrap();
graph.index.set_lookahead_for_all(lookahead);
// we start at a height lower than last-seen tip in case of reorgs
let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
});
let rpc_client = rpc_args.new_client()?;
let mut emitter = Emitter::new(&rpc_client, start_height);
let mut emitter = match chain.tip() {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height),
};
let mut last_db_commit = Instant::now();
let mut last_print = Instant::now();
@@ -174,7 +171,9 @@ fn main() -> anyhow::Result<()> {
while let Some((height, block)) = emitter.next_block()? {
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain.apply_update(chain_update)?;
let chain_changeset = chain
.apply_update(chain_update)
.expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
db.stage((chain_changeset, graph_changeset));
@@ -227,17 +226,17 @@ fn main() -> anyhow::Result<()> {
let sigterm_flag = start_ctrlc_handler();
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
// we start at a height lower than last-seen tip in case of reorgs
let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
});
let last_cp = chain.lock().unwrap().tip();
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
println!("emitter thread started...");
let rpc_client = rpc_args.new_client()?;
let mut emitter = Emitter::new(&rpc_client, start_height);
let mut emitter = match last_cp {
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
None => Emitter::from_height(&rpc_client, fallback_height),
};
let mut block_count = rpc_client.get_block_count()? as u32;
tx.send(Emission::Tip(block_count))?;
@@ -284,7 +283,9 @@ fn main() -> anyhow::Result<()> {
Emission::Block { height, block } => {
let chain_update =
CheckPoint::from_header(&block.header, height).into_update(false);
let chain_changeset = chain.apply_update(chain_update)?;
let chain_changeset = chain
.apply_update(chain_update)
.expect("must always apply as we recieve blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(block, height);
(chain_changeset, graph_changeset)
}