Merge bitcoindevkit/bdk#683: Fix wallet sync for RpcBlockchain

5eeba6cced9a6fa0ad8ee4f64d04e1948620eac8 Various `RpcBlockchain` improvements (志宇)
5eb74af41494b7ec4894d7da3015da2981639228  Rpc: Manually add immature coinbase utxos (志宇)
ac19c19f21fce43a99ecf0c4f95ae818b620558c New `RpcBlockchain` implementation with various fixes (志宇)

Pull request description:

  Fixes #677

  ### Description

  Unfortunately to fix all the problems, I had to do a complete re-implementation of `RpcBlockchain`.

  **The new implementation fixes the following:**
  * We can track more than 100 scriptPubKeys
  * We can obtain more than 1000 transactions per sync
  * Transaction "metadata" for already-syned transactions are updated when we introduce new scriptPubKeys

  **`RpcConfig` changes:**
  * Introduce `RpcSyncParams`.
  * Remove `RpcConfig::skip_blocks` (this is replaced by `RpcSyncParams::start_time`).

  ### Notes to the reviewers

  * The `RpcConfig` structure is changed. It will be good to confirm whether this is an okay change.

  ### Checklists

  #### All Submissions:

  * [x] I've signed all my commits
  * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md)
  * [x] I ran `cargo fmt` and `cargo clippy` before committing

  #### New Features:

  ~* [ ] I've added tests for the new feature~
  * [x] I've added docs for the new feature
  * [x] I've updated `CHANGELOG.md`

  #### Bugfixes:

  * [x] This pull request breaks the existing API
  * [x] I've added tests to reproduce the issue which are now passing
  * [x] I'm linking the issue being fixed by this PR

ACKs for top commit:
  afilini:
    ACK 5eeba6cced9a6fa0ad8ee4f64d04e1948620eac8

Tree-SHA512: 7e0c9cfc4ef10fb07e4ac7f6fbf30cf28ca6395495c0237fa5bfa9a2fcbbd4d8ff980ffcf71ddd10bc052e4c07bc2c27f093dd3cd1c69cb29141455c693f2386
This commit is contained in:
Alekos Filini 2022-08-04 10:22:50 +02:00
commit dc7adb7161
No known key found for this signature in database
GPG Key ID: 431401E4A4530061
4 changed files with 581 additions and 277 deletions

View File

@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add the ability to specify whether a taproot transaction should be signed using the internal key or not, using `sign_with_tap_internal_key` in `SignOptions`
- Consolidate params `fee_amount` and `amount_needed` in `target_amount` in `CoinSelectionAlgorithm::coin_select` signature.
- Change the meaning of the `fee_amount` field inside `CoinSelectionResult`: from now on the `fee_amount` will represent only the fees asociated with the utxos in the `selected` field of `CoinSelectionResult`.
- New `RpcBlockchain` implementation with various fixes.
## [v0.20.0] - [v0.19.0]

View File

@ -103,7 +103,7 @@ fn main() -> Result<(), Box<dyn Error>> {
auth: bitcoind_auth,
network: Network::Regtest,
wallet_name,
skip_blocks: None,
sync_params: None,
};
// Use the above configuration to create a RPC blockchain backend

View File

@ -26,30 +26,33 @@
//! },
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! skip_blocks: None,
//! sync_params: None,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
use crate::bitcoin::consensus::deserialize;
use crate::bitcoin::hashes::hex::ToHex;
use crate::bitcoin::{Address, Network, OutPoint, Transaction, TxOut, Txid};
use crate::bitcoin::{Network, OutPoint, Transaction, TxOut, Txid};
use crate::blockchain::*;
use crate::database::{BatchDatabase, DatabaseUtils};
use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
use crate::descriptor::get_checksum;
use crate::error::MissingCachedScripts;
use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
use bitcoin::Script;
use bitcoincore_rpc::json::{
GetAddressInfoResultLabel, ImportMultiOptions, ImportMultiRequest,
ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
GetTransactionResultDetailCategory, ImportMultiOptions, ImportMultiRequest,
ImportMultiRequestScriptPubkey, ImportMultiRescanSince, ListTransactionResult,
ListUnspentResultEntry, ScanningDetails,
};
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::debug;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
/// The main struct for RPC backend implementing the [crate::blockchain::Blockchain] trait
#[derive(Debug)]
@ -60,11 +63,8 @@ pub struct RpcBlockchain {
is_descriptors: bool,
/// Blockchain capabilities, cached here at startup
capabilities: HashSet<Capability>,
/// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
skip_blocks: Option<u32>,
/// This is a fixed Address used as a hack key to store information on the node
_storage_address: Address,
/// Sync parameters.
sync_params: RpcSyncParams,
}
/// RpcBlockchain configuration options
@ -78,8 +78,36 @@ pub struct RpcConfig {
pub network: Network,
/// The wallet name in the bitcoin node, consider using [crate::wallet::wallet_name_from_descriptor] for this
pub wallet_name: String,
/// Skip this many blocks of the blockchain at the first rescan, if None the rescan is done from the genesis block
pub skip_blocks: Option<u32>,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
}
/// Sync parameters for Bitcoin Core RPC.
///
/// In general, BDK tries to sync `scriptPubKey`s cached in [`crate::database::Database`] with
/// `scriptPubKey`s imported in the Bitcoin Core Wallet. These parameters are used for determining
/// how the `importdescriptors` RPC calls are to be made.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct RpcSyncParams {
/// The minimum number of scripts to scan for on initial sync.
pub start_script_count: usize,
/// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
pub start_time: u64,
/// Forces every sync to use `start_time` as import timestamp.
pub force_start_time: bool,
/// RPC poll rate (in seconds) to get state updates.
pub poll_rate_sec: u64,
}
impl Default for RpcSyncParams {
fn default() -> Self {
Self {
start_script_count: 100,
start_time: 0,
force_start_time: false,
poll_rate_sec: 3,
}
}
}
/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
@ -115,27 +143,6 @@ impl From<Auth> for RpcAuth {
}
}
impl RpcBlockchain {
fn get_node_synced_height(&self) -> Result<u32, Error> {
let info = self.client.get_address_info(&self._storage_address)?;
if let Some(GetAddressInfoResultLabel::Simple(label)) = info.labels.first() {
Ok(label
.parse::<u32>()
.unwrap_or_else(|_| self.skip_blocks.unwrap_or(0)))
} else {
Ok(self.skip_blocks.unwrap_or(0))
}
}
/// Set the synced height in the core node by using a label of a fixed address so that
/// another client with the same descriptor doesn't rescan the blockchain
fn set_node_synced_height(&self, height: u32) -> Result<(), Error> {
Ok(self
.client
.set_label(&self._storage_address, &height.to_string())?)
}
}
impl Blockchain for RpcBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
self.capabilities.clone()
@ -176,226 +183,15 @@ impl GetBlockHash for RpcBlockchain {
}
impl WalletSync for RpcBlockchain {
fn wallet_setup<D: BatchDatabase>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
let mut scripts_pubkeys = database.iter_script_pubkeys(Some(KeychainKind::External))?;
scripts_pubkeys.extend(database.iter_script_pubkeys(Some(KeychainKind::Internal))?);
debug!(
"importing {} script_pubkeys (some maybe already imported)",
scripts_pubkeys.len()
);
fn wallet_setup<D>(&self, db: &mut D, prog: Box<dyn Progress>) -> Result<(), Error>
where
D: BatchDatabase,
{
let batch = DbState::new(db, &self.sync_params, &*prog)?
.sync_with_core(&self.client, self.is_descriptors)?
.as_db_batch()?;
if self.is_descriptors {
// Core still doesn't support complex descriptors like BDK, but when the wallet type is
// "descriptors" we should import individual addresses using `importdescriptors` rather
// than `importmulti`, using the `raw()` descriptor which allows us to specify an
// arbitrary script
let requests = Value::Array(
scripts_pubkeys
.iter()
.map(|s| {
let desc = format!("raw({})", s.to_hex());
json!({
"timestamp": "now",
"desc": format!("{}#{}", desc, get_checksum(&desc).unwrap()),
})
})
.collect(),
);
let res: Vec<Value> = self.client.call("importdescriptors", &[requests])?;
res.into_iter()
.map(|v| match v["success"].as_bool() {
Some(true) => Ok(()),
Some(false) => Err(Error::Generic(
v["error"]["message"]
.as_str()
.unwrap_or("Unknown error")
.to_string(),
)),
_ => Err(Error::Generic("Unexpected response from Core".to_string())),
})
.collect::<Result<Vec<_>, _>>()?;
} else {
let requests: Vec<_> = scripts_pubkeys
.iter()
.map(|s| ImportMultiRequest {
timestamp: ImportMultiRescanSince::Timestamp(0),
script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(s)),
watchonly: Some(true),
..Default::default()
})
.collect();
let options = ImportMultiOptions {
rescan: Some(false),
};
self.client.import_multi(&requests, Some(&options))?;
}
loop {
let current_height = self.get_height()?;
// min because block invalidate may cause height to go down
let node_synced = self.get_node_synced_height()?.min(current_height);
let sync_up_to = node_synced.saturating_add(10_000).min(current_height);
debug!("rescan_blockchain from:{} to:{}", node_synced, sync_up_to);
self.client
.rescan_blockchain(Some(node_synced as usize), Some(sync_up_to as usize))?;
progress_update.update((sync_up_to as f32) / (current_height as f32), None)?;
self.set_node_synced_height(sync_up_to)?;
if sync_up_to == current_height {
break;
}
}
self.wallet_sync(database, progress_update)
}
fn wallet_sync<D: BatchDatabase>(
&self,
db: &mut D,
_progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
let mut indexes = HashMap::new();
for keykind in &[KeychainKind::External, KeychainKind::Internal] {
indexes.insert(*keykind, db.get_last_index(*keykind)?.unwrap_or(0));
}
let mut known_txs: HashMap<_, _> = db
.iter_txs(true)?
.into_iter()
.map(|tx| (tx.txid, tx))
.collect();
let known_utxos: HashSet<_> = db.iter_utxos()?.into_iter().collect();
//TODO list_since_blocks would be more efficient
let current_utxo = self
.client
.list_unspent(Some(0), None, None, Some(true), None)?;
debug!("current_utxo len {}", current_utxo.len());
//TODO supported up to 1_000 txs, should use since_blocks or do paging
let list_txs = self
.client
.list_transactions(None, Some(1_000), None, Some(true))?;
let mut list_txs_ids = HashSet::new();
for tx_result in list_txs.iter().filter(|t| {
// list_txs returns all conflicting txs, we want to
// filter out replaced tx => unconfirmed and not in the mempool
t.info.confirmations > 0 || self.client.get_mempool_entry(&t.info.txid).is_ok()
}) {
let txid = tx_result.info.txid;
list_txs_ids.insert(txid);
if let Some(mut known_tx) = known_txs.get_mut(&txid) {
let confirmation_time =
BlockTime::new(tx_result.info.blockheight, tx_result.info.blocktime);
if confirmation_time != known_tx.confirmation_time {
// reorg may change tx height
debug!(
"updating tx({}) confirmation time to: {:?}",
txid, confirmation_time
);
known_tx.confirmation_time = confirmation_time;
db.set_tx(known_tx)?;
}
} else {
//TODO check there is already the raw tx in db?
let tx_result = self.client.get_transaction(&txid, Some(true))?;
let tx: Transaction = deserialize(&tx_result.hex)?;
let mut received = 0u64;
let mut sent = 0u64;
for output in tx.output.iter() {
if let Ok(Some((kind, index))) =
db.get_path_from_script_pubkey(&output.script_pubkey)
{
if index > *indexes.get(&kind).unwrap() {
indexes.insert(kind, index);
}
received += output.value;
}
}
for input in tx.input.iter() {
if let Some(previous_output) = db.get_previous_output(&input.previous_output)? {
if db.is_mine(&previous_output.script_pubkey)? {
sent += previous_output.value;
}
}
}
let td = TransactionDetails {
transaction: Some(tx),
txid: tx_result.info.txid,
confirmation_time: BlockTime::new(
tx_result.info.blockheight,
tx_result.info.blocktime,
),
received,
sent,
fee: tx_result.fee.map(|f| f.as_sat().unsigned_abs()),
};
debug!(
"saving tx: {} tx_result.fee:{:?} td.fees:{:?}",
td.txid, tx_result.fee, td.fee
);
db.set_tx(&td)?;
}
}
for known_txid in known_txs.keys() {
if !list_txs_ids.contains(known_txid) {
debug!("removing tx: {}", known_txid);
db.del_tx(known_txid, false)?;
}
}
// Filter out trasactions that are for script pubkeys that aren't in this wallet.
let current_utxos = current_utxo
.into_iter()
.filter_map(
|u| match db.get_path_from_script_pubkey(&u.script_pub_key) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(Some(path)) => Some(Ok(LocalUtxo {
outpoint: OutPoint::new(u.txid, u.vout),
keychain: path.0,
txout: TxOut {
value: u.amount.as_sat(),
script_pubkey: u.script_pub_key,
},
is_spent: false,
})),
},
)
.collect::<Result<HashSet<_>, Error>>()?;
let spent: HashSet<_> = known_utxos.difference(&current_utxos).collect();
for utxo in spent {
debug!("setting as spent utxo: {:?}", utxo);
let mut spent_utxo = utxo.clone();
spent_utxo.is_spent = true;
db.set_utxo(&spent_utxo)?;
}
let received: HashSet<_> = current_utxos.difference(&known_utxos).collect();
for utxo in received {
debug!("adding utxo: {:?}", utxo);
db.set_utxo(utxo)?;
}
for (keykind, index) in indexes {
debug!("{:?} max {}", keykind, index);
db.set_last_index(keykind, index)?;
}
Ok(())
db.commit_batch(batch)
}
}
@ -464,17 +260,11 @@ impl ConfigurableBlockchain for RpcBlockchain {
}
}
// this is just a fixed address used only to store a label containing the synced height in the node
let mut storage_address =
Address::from_str("bc1qst0rewf0wm4kw6qn6kv0e5tc56nkf9yhcxlhqv").unwrap();
storage_address.network = network;
Ok(RpcBlockchain {
client,
capabilities,
is_descriptors,
_storage_address: storage_address,
skip_blocks: config.skip_blocks,
sync_params: config.sync_params.clone().unwrap_or_default(),
})
}
}
@ -495,6 +285,519 @@ fn list_wallet_dir(client: &Client) -> Result<Vec<String>, Error> {
Ok(result.wallets.into_iter().map(|n| n.name).collect())
}
/// Represents the state of the [`crate::database::Database`].
struct DbState<'a, D> {
db: &'a D,
params: &'a RpcSyncParams,
prog: &'a dyn Progress,
ext_spks: Vec<Script>,
int_spks: Vec<Script>,
txs: HashMap<Txid, TransactionDetails>,
utxos: HashSet<LocalUtxo>,
last_indexes: HashMap<KeychainKind, u32>,
// "deltas" to apply to database
retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
updated_txs: HashSet<Txid>, // txs to update
updated_utxos: HashSet<LocalUtxo>, // utxos to update
}
impl<'a, D: BatchDatabase> DbState<'a, D> {
/// Obtain [DbState] from [crate::database::Database].
fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;
// This is a hack to see whether atleast one of the keychains comes from a derivable
// descriptor. We assume that non-derivable descriptors always has a script count of 1.
let last_count = std::cmp::max(ext_spks.len(), int_spks.len());
let has_derivable = last_count > 1;
// If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
// cached.
if has_derivable && last_count < params.start_script_count {
let inner_err = MissingCachedScripts {
last_count,
missing_count: params.start_script_count - last_count,
};
debug!("requesting more spks with: {:?}", inner_err);
return Err(Error::MissingCachedScripts(inner_err));
}
let txs = db
.iter_txs(true)?
.into_iter()
.map(|tx| (tx.txid, tx))
.collect::<HashMap<_, _>>();
let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
let last_indexes = [KeychainKind::External, KeychainKind::Internal]
.iter()
.filter_map(|keychain| match db.get_last_index(*keychain) {
Ok(li_opt) => li_opt.map(|li| Ok((*keychain, li))),
Err(err) => Some(Err(err)),
})
.collect::<Result<HashMap<_, _>, Error>>()?;
info!("initial db state: txs={} utxos={}", txs.len(), utxos.len());
// "delta" fields
let retained_txs = HashSet::with_capacity(txs.len());
let updated_txs = HashSet::with_capacity(txs.len());
let updated_utxos = HashSet::with_capacity(utxos.len());
Ok(Self {
db,
params,
prog,
ext_spks,
int_spks,
txs,
utxos,
last_indexes,
retained_txs,
updated_txs,
updated_utxos,
})
}
/// Sync states of [BatchDatabase] and Core wallet.
/// First we import all `scriptPubKey`s from database into core wallet
fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> {
// this tells Core wallet where to sync from for imported scripts
let start_epoch = if self.params.force_start_time {
self.params.start_time
} else {
self.db
.get_sync_time()?
.map_or(self.params.start_time, |st| st.block_time.timestamp)
};
// sync scriptPubKeys from Database to Core wallet
let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
if is_descriptor {
import_descriptors(client, start_epoch, scripts_iter)?;
} else {
import_multi(client, start_epoch, scripts_iter)?;
}
// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
// loop through results of Core RPC method `listtransactions`
for tx_res in CoreTxIter::new(client, 100) {
let tx_res = tx_res?;
let mut updated = false;
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
updated = true;
TransactionDetails {
txid: tx_res.info.txid,
..Default::default()
}
});
// update raw tx (if needed)
let raw_tx =
&*match &mut db_tx.transaction {
Some(raw_tx) => raw_tx,
db_tx_opt => {
updated = true;
db_tx_opt.insert(client.get_raw_transaction(
&tx_res.info.txid,
tx_res.info.blockhash.as_ref(),
)?)
}
};
// update fee (if needed)
if let (None, Some(new_fee)) = (db_tx.fee, tx_res.detail.fee) {
updated = true;
db_tx.fee = Some(new_fee.as_sat().unsigned_abs());
}
// update confirmation time (if needed)
let conf_time = BlockTime::new(tx_res.info.blockheight, tx_res.info.blocktime);
if db_tx.confirmation_time != conf_time {
updated = true;
db_tx.confirmation_time = conf_time;
}
// update received (if needed)
let received = Self::received_from_raw_tx(self.db, raw_tx)?;
if db_tx.received != received {
updated = true;
db_tx.received = received;
}
// check if tx has an immature coinbase output (add to updated UTXOs)
// this is required because `listunspent` does not include immature coinbase outputs
if tx_res.detail.category == GetTransactionResultDetailCategory::Immature {
let txout = raw_tx
.output
.get(tx_res.detail.vout as usize)
.cloned()
.ok_or_else(|| {
Error::Generic(format!(
"Core RPC returned detail with invalid vout '{}' for tx '{}'",
tx_res.detail.vout, tx_res.info.txid,
))
})?;
if let Some((keychain, index)) =
self.db.get_path_from_script_pubkey(&txout.script_pubkey)?
{
let utxo = LocalUtxo {
outpoint: OutPoint::new(tx_res.info.txid, tx_res.detail.vout),
txout,
keychain,
is_spent: false,
};
self.updated_utxos.insert(utxo);
self.update_last_index(keychain, index);
}
}
// update tx deltas
self.retained_txs.insert(tx_res.info.txid);
if updated {
self.updated_txs.insert(tx_res.info.txid);
}
}
// obtain vector of `TransactionDetails::sent` changes
let sent_updates = self
.txs
.values()
// only bother to update txs that are retained
.filter(|db_tx| self.retained_txs.contains(&db_tx.txid))
// only bother to update txs where the raw tx is accessable
.filter_map(|db_tx| (db_tx.transaction.as_ref().map(|tx| (tx, db_tx.sent))))
// recalcuate sent value, only update txs in which sent value is changed
.filter_map(|(raw_tx, old_sent)| {
self.sent_from_raw_tx(raw_tx)
.map(|sent| {
if sent != old_sent {
Some((raw_tx.txid(), sent))
} else {
None
}
})
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
// record send updates
sent_updates.iter().for_each(|&(txid, sent)| {
// apply sent field changes
self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
// mark tx as modified
self.updated_txs.insert(txid);
});
// obtain UTXOs from Core wallet
let core_utxos = client
.list_unspent(Some(0), None, None, Some(true), None)?
.into_iter()
.filter_map(|utxo_entry| {
let path_result = self
.db
.get_path_from_script_pubkey(&utxo_entry.script_pub_key)
.transpose()?;
let utxo_result = match path_result {
Ok((keychain, index)) => {
self.update_last_index(keychain, index);
Ok(Self::make_local_utxo(utxo_entry, keychain, false))
}
Err(err) => Err(err),
};
Some(utxo_result)
})
.collect::<Result<HashSet<_>, Error>>()?;
// mark "spent utxos" to be updated in database
let spent_utxos = self.utxos.difference(&core_utxos).cloned().map(|mut utxo| {
utxo.is_spent = true;
utxo
});
// mark new utxos to be added in database
let new_utxos = core_utxos.difference(&self.utxos).cloned();
// add to updated utxos
self.updated_utxos.extend(spent_utxos.chain(new_utxos));
Ok(self)
}
/// Calculates received amount from raw tx.
fn received_from_raw_tx(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
let v = if db.is_mine(&txo.script_pubkey)? {
txo.value
} else {
0
};
Ok(recv + v)
})
}
/// Calculates sent from raw tx.
fn sent_from_raw_tx(&self, raw_tx: &Transaction) -> Result<u64, Error> {
let get_output = |outpoint: &OutPoint| {
let raw_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
raw_tx.output.get(outpoint.vout as usize)
};
raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
let v = match get_output(&txin.previous_output) {
Some(prev_txo) => {
if self.db.is_mine(&prev_txo.script_pubkey)? {
prev_txo.value
} else {
0
}
}
None => 0_u64,
};
Ok(sent + v)
})
}
// updates the db state's last_index for the given keychain (if larger than current last_index)
fn update_last_index(&mut self, keychain: KeychainKind, index: u32) {
self.last_indexes
.entry(keychain)
.and_modify(|last| {
if *last < index {
*last = index;
}
})
.or_insert_with(|| index);
}
fn make_local_utxo(
entry: ListUnspentResultEntry,
keychain: KeychainKind,
is_spent: bool,
) -> LocalUtxo {
LocalUtxo {
outpoint: OutPoint::new(entry.txid, entry.vout),
txout: TxOut {
value: entry.amount.as_sat(),
script_pubkey: entry.script_pub_key,
},
keychain,
is_spent,
}
}
/// Prepare db batch operations.
fn as_db_batch(&self) -> Result<D::Batch, Error> {
let mut batch = self.db.begin_batch();
let mut del_txs = 0_u32;
// delete stale (not retained) txs from db
self.txs
.keys()
.filter(|&txid| !self.retained_txs.contains(txid))
.try_for_each(|txid| -> Result<(), Error> {
batch.del_tx(txid, false)?;
del_txs += 1;
Ok(())
})?;
// update txs
self.updated_txs
.iter()
.inspect(|&txid| debug!("updating tx: {}", txid))
.try_for_each(|txid| batch.set_tx(self.txs.get(txid).unwrap()))?;
// update utxos
self.updated_utxos
.iter()
.inspect(|&utxo| debug!("updating utxo: {}", utxo.outpoint))
.try_for_each(|utxo| batch.set_utxo(utxo))?;
// update last indexes
self.last_indexes
.iter()
.try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
info!(
"db batch updates: del_txs={}, update_txs={}, update_utxos={}",
del_txs,
self.updated_txs.len(),
self.updated_utxos.len()
);
Ok(batch)
}
}
fn import_descriptors<'a, S>(
client: &Client,
start_epoch: u64,
scripts_iter: S,
) -> Result<(), Error>
where
S: Iterator<Item = &'a Script>,
{
let requests = Value::Array(
scripts_iter
.map(|script| {
let desc = descriptor_from_script_pubkey(script);
json!({ "timestamp": start_epoch, "desc": desc })
})
.collect(),
);
for v in client.call::<Vec<Value>>("importdescriptors", &[requests])? {
match v["success"].as_bool() {
Some(true) => continue,
Some(false) => {
return Err(Error::Generic(
v["error"]["message"]
.as_str()
.map_or("unknown error".into(), ToString::to_string),
))
}
_ => return Err(Error::Generic("Unexpected response form Core".to_string())),
}
}
Ok(())
}
fn import_multi<'a, S>(client: &Client, start_epoch: u64, scripts_iter: S) -> Result<(), Error>
where
S: Iterator<Item = &'a Script>,
{
let requests = scripts_iter
.map(|script| ImportMultiRequest {
timestamp: ImportMultiRescanSince::Timestamp(start_epoch),
script_pubkey: Some(ImportMultiRequestScriptPubkey::Script(script)),
watchonly: Some(true),
..Default::default()
})
.collect::<Vec<_>>();
let options = ImportMultiOptions { rescan: Some(true) };
for v in client.import_multi(&requests, Some(&options))? {
if let Some(err) = v.error {
return Err(Error::Generic(format!(
"{} (code: {})",
err.message, err.code
)));
}
}
Ok(())
}
/// Iterates through results of multiple `listtransactions` calls.
struct CoreTxIter<'a> {
client: &'a Client,
page_size: usize,
page_index: usize,
stack: Vec<ListTransactionResult>,
done: bool,
}
impl<'a> CoreTxIter<'a> {
fn new(client: &'a Client, mut page_size: usize) -> Self {
if page_size > 1000 {
page_size = 1000;
}
Self {
client,
page_size,
page_index: 0,
stack: Vec::with_capacity(page_size),
done: false,
}
}
/// We want to filter out conflicting transactions.
/// Only accept transactions that are already confirmed, or existing in mempool.
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
}
}
impl<'a> Iterator for CoreTxIter<'a> {
type Item = Result<ListTransactionResult, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done {
return None;
}
if let Some(item) = self.stack.pop() {
if self.keep_tx(&item) {
return Some(Ok(item));
}
}
let res = self
.client
.list_transactions(
None,
Some(self.page_size),
Some(self.page_size * self.page_index),
Some(true),
)
.map_err(Error::Rpc);
self.page_index += 1;
let list = match res {
Ok(list) => list,
Err(err) => {
self.done = true;
return Some(Err(err));
}
};
if list.is_empty() {
self.done = true;
return None;
}
self.stack = list;
}
}
}
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
#[derive(Deserialize)]
struct CallResult {
scanning: ScanningDetails,
}
let dur = Duration::from_secs(rate_sec);
loop {
match client.call::<CallResult>("getwalletinfo", &[])?.scanning {
ScanningDetails::Scanning {
duration,
progress: pc,
} => {
debug!("scanning: duration={}, progress={}", duration, pc);
progress.update(pc, Some(format!("elapsed for {} seconds", duration)))?;
thread::sleep(dur);
}
ScanningDetails::NotScanning(_) => {
progress.update(1.0, None)?;
info!("scanning: done!");
return Ok(());
}
};
}
}
/// Returns whether a wallet is legacy or descriptors by calling `getwalletinfo`.
///
/// This API is mapped by bitcoincore_rpc, but it doesn't have the fields we need (either
@ -509,6 +812,11 @@ fn is_wallet_descriptor(client: &Client) -> Result<bool, Error> {
Ok(result.descriptors.unwrap_or(false))
}
fn descriptor_from_script_pubkey(script: &Script) -> String {
let desc = format!("raw({})", script.to_hex());
format!("{}#{}", desc, get_checksum(&desc).unwrap())
}
/// Factory of [`RpcBlockchain`] instances, implements [`BlockchainFactory`]
///
/// Internally caches the node url and authentication params and allows getting many different [`RpcBlockchain`]
@ -529,6 +837,7 @@ fn is_wallet_descriptor(client: &Client) -> Result<bool, Error> {
/// network: Network::Testnet,
/// wallet_name_prefix: Some("prefix-".to_string()),
/// default_skip_blocks: 100_000,
/// sync_params: None,
/// };
/// let main_wallet_blockchain = factory.build("main_wallet", Some(200_000))?;
/// # Ok(())
@ -546,6 +855,8 @@ pub struct RpcBlockchainFactory {
pub wallet_name_prefix: Option<String>,
/// Default number of blocks to skip which will be inherited by blockchain unless overridden
pub default_skip_blocks: u32,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
}
impl BlockchainFactory for RpcBlockchainFactory {
@ -554,7 +865,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
fn build(
&self,
checksum: &str,
override_skip_blocks: Option<u32>,
_override_skip_blocks: Option<u32>,
) -> Result<Self::Inner, Error> {
RpcBlockchain::from_config(&RpcConfig {
url: self.url.clone(),
@ -565,7 +876,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
self.wallet_name_prefix.as_ref().unwrap_or(&String::new()),
checksum
),
skip_blocks: Some(override_skip_blocks.unwrap_or(self.default_skip_blocks)),
sync_params: self.sync_params.clone(),
})
}
}
@ -586,7 +897,7 @@ mod test {
auth: Auth::Cookie { file: test_client.bitcoind.params.cookie_file.clone() },
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
skip_blocks: None,
sync_params: None,
};
RpcBlockchain::from_config(&config).unwrap()
}
@ -603,6 +914,7 @@ mod test {
network: Network::Regtest,
wallet_name_prefix: Some("prefix-".into()),
default_skip_blocks: 0,
sync_params: None,
};
(test_client, factory)
@ -613,7 +925,6 @@ mod test {
let (_test_client, factory) = get_factory();
let a = factory.build("aaaaaa", None).unwrap();
assert_eq!(a.skip_blocks, Some(0));
assert_eq!(
a.client
.get_wallet_info()
@ -623,7 +934,6 @@ mod test {
);
let b = factory.build("bbbbbb", Some(100)).unwrap();
assert_eq!(b.skip_blocks, Some(100));
assert_eq!(
b.client
.get_wallet_info()

View File

@ -1057,20 +1057,13 @@ macro_rules! bdk_blockchain_tests {
let (wallet, blockchain, _, mut test_client) = init_single_sig();
let wallet_addr = wallet.get_address($crate::wallet::AddressIndex::New).unwrap().address;
println!("wallet addr: {}", wallet_addr);
wallet.sync(&blockchain, SyncOptions::default()).unwrap();
assert_eq!(wallet.get_balance().unwrap(), 0, "incorrect balance");
test_client.generate(1, Some(wallet_addr));
#[cfg(feature = "rpc")]
{
// rpc consider coinbase only when mature (100 blocks)
let node_addr = test_client.get_node_address(None);
test_client.generate(100, Some(node_addr));
}
wallet.sync(&blockchain, SyncOptions::default()).unwrap();
assert!(wallet.get_balance().unwrap() > 0, "incorrect balance after receiving coinbase");
}
@ -1267,7 +1260,7 @@ macro_rules! bdk_blockchain_tests {
wallet.sync(&blockchain, SyncOptions::default()).unwrap();
let _ = test_client.receive(testutils! {
@tx ( (@external descriptors, 0) => 50_000 )
@tx ( (@external descriptors, 0) => 50_000 )
});
wallet.sync(&blockchain, SyncOptions::default()).unwrap();