feat(bitcoind_rpc)!: emissions include checkpoint and connected_to data
Previously, emissions are purely blocks + the block height. This means emitted blocks can only connect to previous-adjacent blocks. Hence, sync must start from genesis and include every block.
This commit is contained in:
@@ -14,7 +14,7 @@ use bdk_bitcoind_rpc::{
|
||||
use bdk_chain::{
|
||||
bitcoin::{constants::genesis_block, Block, Transaction},
|
||||
indexed_tx_graph, keychain,
|
||||
local_chain::{self, CheckPoint, LocalChain},
|
||||
local_chain::{self, LocalChain},
|
||||
ConfirmationTimeHeightAnchor, IndexedTxGraph,
|
||||
};
|
||||
use example_cli::{
|
||||
@@ -42,7 +42,7 @@ type ChangeSet = (
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Emission {
|
||||
Block { height: u32, block: Block },
|
||||
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
|
||||
Mempool(Vec<(Transaction, u64)>),
|
||||
Tip(u32),
|
||||
}
|
||||
@@ -178,17 +178,20 @@ fn main() -> anyhow::Result<()> {
|
||||
let mut last_db_commit = Instant::now();
|
||||
let mut last_print = Instant::now();
|
||||
|
||||
while let Some((height, block)) = emitter.next_block()? {
|
||||
while let Some(emission) = emitter.next_block()? {
|
||||
let height = emission.block_height();
|
||||
|
||||
let mut chain = chain.lock().unwrap();
|
||||
let mut graph = graph.lock().unwrap();
|
||||
let mut db = db.lock().unwrap();
|
||||
|
||||
let chain_update =
|
||||
CheckPoint::from_header(&block.header, height).into_update(false);
|
||||
let chain_changeset = chain
|
||||
.apply_update(chain_update)
|
||||
.apply_update(local_chain::Update {
|
||||
tip: emission.checkpoint,
|
||||
introduce_older_blocks: false,
|
||||
})
|
||||
.expect("must always apply as we receive blocks in order from emitter");
|
||||
let graph_changeset = graph.apply_block_relevant(block, height);
|
||||
let graph_changeset = graph.apply_block_relevant(emission.block, height);
|
||||
db.stage((chain_changeset, graph_changeset));
|
||||
|
||||
// commit staged db changes in intervals
|
||||
@@ -256,7 +259,8 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
loop {
|
||||
match emitter.next_block()? {
|
||||
Some((height, block)) => {
|
||||
Some(block_emission) => {
|
||||
let height = block_emission.block_height();
|
||||
if sigterm_flag.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
@@ -264,7 +268,7 @@ fn main() -> anyhow::Result<()> {
|
||||
block_count = rpc_client.get_block_count()? as u32;
|
||||
tx.send(Emission::Tip(block_count))?;
|
||||
}
|
||||
tx.send(Emission::Block { height, block })?;
|
||||
tx.send(Emission::Block(block_emission))?;
|
||||
}
|
||||
None => {
|
||||
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
|
||||
@@ -293,13 +297,17 @@ fn main() -> anyhow::Result<()> {
|
||||
let mut chain = chain.lock().unwrap();
|
||||
|
||||
let changeset = match emission {
|
||||
Emission::Block { height, block } => {
|
||||
let chain_update =
|
||||
CheckPoint::from_header(&block.header, height).into_update(false);
|
||||
Emission::Block(block_emission) => {
|
||||
let height = block_emission.block_height();
|
||||
let chain_update = local_chain::Update {
|
||||
tip: block_emission.checkpoint,
|
||||
introduce_older_blocks: false,
|
||||
};
|
||||
let chain_changeset = chain
|
||||
.apply_update(chain_update)
|
||||
.expect("must always apply as we receive blocks in order from emitter");
|
||||
let graph_changeset = graph.apply_block_relevant(block, height);
|
||||
let graph_changeset =
|
||||
graph.apply_block_relevant(block_emission.block, height);
|
||||
(chain_changeset, graph_changeset)
|
||||
}
|
||||
Emission::Mempool(mempool_txs) => {
|
||||
|
||||
Reference in New Issue
Block a user