Various RpcBlockchain
improvements
These are as suggested by @danielabrozzoni and @afilini Also introduced `RpcSyncParams::force_start_time` for users who prioritise reliability above all else. Also improved logging.
This commit is contained in:
parent
5eb74af414
commit
5eeba6cced
@ -40,14 +40,14 @@ use crate::error::MissingCachedScripts;
|
|||||||
use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
|
use crate::{BlockTime, Error, FeeRate, KeychainKind, LocalUtxo, TransactionDetails};
|
||||||
use bitcoin::Script;
|
use bitcoin::Script;
|
||||||
use bitcoincore_rpc::json::{
|
use bitcoincore_rpc::json::{
|
||||||
GetTransactionResult, GetTransactionResultDetailCategory, ImportMultiOptions,
|
GetTransactionResultDetailCategory, ImportMultiOptions, ImportMultiRequest,
|
||||||
ImportMultiRequest, ImportMultiRequestScriptPubkey, ImportMultiRescanSince,
|
ImportMultiRequestScriptPubkey, ImportMultiRescanSince, ListTransactionResult,
|
||||||
ListTransactionResult, ScanningDetails,
|
ListUnspentResultEntry, ScanningDetails,
|
||||||
};
|
};
|
||||||
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
|
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
|
||||||
use bitcoincore_rpc::Auth as RpcAuth;
|
use bitcoincore_rpc::Auth as RpcAuth;
|
||||||
use bitcoincore_rpc::{Client, RpcApi};
|
use bitcoincore_rpc::{Client, RpcApi};
|
||||||
use log::debug;
|
use log::{debug, info};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -93,6 +93,8 @@ pub struct RpcSyncParams {
|
|||||||
pub start_script_count: usize,
|
pub start_script_count: usize,
|
||||||
/// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
|
/// Time in unix seconds in which initial sync will start scanning from (0 to start from genesis).
|
||||||
pub start_time: u64,
|
pub start_time: u64,
|
||||||
|
/// Forces every sync to use `start_time` as import timestamp.
|
||||||
|
pub force_start_time: bool,
|
||||||
/// RPC poll rate (in seconds) to get state updates.
|
/// RPC poll rate (in seconds) to get state updates.
|
||||||
pub poll_rate_sec: u64,
|
pub poll_rate_sec: u64,
|
||||||
}
|
}
|
||||||
@ -102,6 +104,7 @@ impl Default for RpcSyncParams {
|
|||||||
Self {
|
Self {
|
||||||
start_script_count: 100,
|
start_script_count: 100,
|
||||||
start_time: 0,
|
start_time: 0,
|
||||||
|
force_start_time: false,
|
||||||
poll_rate_sec: 3,
|
poll_rate_sec: 3,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -180,54 +183,15 @@ impl GetBlockHash for RpcBlockchain {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl WalletSync for RpcBlockchain {
|
impl WalletSync for RpcBlockchain {
|
||||||
fn wallet_setup<D: BatchDatabase>(
|
fn wallet_setup<D>(&self, db: &mut D, prog: Box<dyn Progress>) -> Result<(), Error>
|
||||||
&self,
|
where
|
||||||
db: &mut D,
|
D: BatchDatabase,
|
||||||
progress_update: Box<dyn Progress>,
|
{
|
||||||
) -> Result<(), Error> {
|
let batch = DbState::new(db, &self.sync_params, &*prog)?
|
||||||
let db_scripts = db.iter_script_pubkeys(None)?;
|
.sync_with_core(&self.client, self.is_descriptors)?
|
||||||
|
.as_db_batch()?;
|
||||||
|
|
||||||
// this is a hack to check whether the scripts are coming from a derivable descriptor
|
db.commit_batch(batch)
|
||||||
// we assume for non-derivable descriptors, the initial script count is always 1
|
|
||||||
let is_derivable = db_scripts.len() > 1;
|
|
||||||
|
|
||||||
// ensure db scripts meet start script count requirements
|
|
||||||
if is_derivable && db_scripts.len() < self.sync_params.start_script_count {
|
|
||||||
return Err(Error::MissingCachedScripts(MissingCachedScripts {
|
|
||||||
last_count: db_scripts.len(),
|
|
||||||
missing_count: self.sync_params.start_script_count - db_scripts.len(),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// this tells Core wallet where to sync from for imported scripts
|
|
||||||
let start_epoch = db
|
|
||||||
.get_sync_time()?
|
|
||||||
.map_or(self.sync_params.start_time, |st| st.block_time.timestamp);
|
|
||||||
|
|
||||||
// import all scripts from db into Core wallet
|
|
||||||
if self.is_descriptors {
|
|
||||||
import_descriptors(&self.client, start_epoch, db_scripts.iter())?;
|
|
||||||
} else {
|
|
||||||
import_multi(&self.client, start_epoch, db_scripts.iter())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// await sync (TODO: Maybe make this async)
|
|
||||||
await_wallet_scan(
|
|
||||||
&self.client,
|
|
||||||
self.sync_params.poll_rate_sec,
|
|
||||||
&*progress_update,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// begin db batch updates
|
|
||||||
let mut db_batch = db.begin_batch();
|
|
||||||
|
|
||||||
// update batch: obtain db state then update state with core txids
|
|
||||||
DbState::from_db(db)?
|
|
||||||
.update_state(&self.client, db)?
|
|
||||||
.update_batch::<D>(&mut db_batch)?;
|
|
||||||
|
|
||||||
// apply batch updates to db
|
|
||||||
db.commit_batch(db_batch)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -322,7 +286,13 @@ fn list_wallet_dir(client: &Client) -> Result<Vec<String>, Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Represents the state of the [`crate::database::Database`].
|
/// Represents the state of the [`crate::database::Database`].
|
||||||
struct DbState {
|
struct DbState<'a, D> {
|
||||||
|
db: &'a D,
|
||||||
|
params: &'a RpcSyncParams,
|
||||||
|
prog: &'a dyn Progress,
|
||||||
|
|
||||||
|
ext_spks: Vec<Script>,
|
||||||
|
int_spks: Vec<Script>,
|
||||||
txs: HashMap<Txid, TransactionDetails>,
|
txs: HashMap<Txid, TransactionDetails>,
|
||||||
utxos: HashSet<LocalUtxo>,
|
utxos: HashSet<LocalUtxo>,
|
||||||
last_indexes: HashMap<KeychainKind, u32>,
|
last_indexes: HashMap<KeychainKind, u32>,
|
||||||
@ -331,53 +301,94 @@ struct DbState {
|
|||||||
retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
|
retained_txs: HashSet<Txid>, // txs to retain (everything else should be deleted)
|
||||||
updated_txs: HashSet<Txid>, // txs to update
|
updated_txs: HashSet<Txid>, // txs to update
|
||||||
updated_utxos: HashSet<LocalUtxo>, // utxos to update
|
updated_utxos: HashSet<LocalUtxo>, // utxos to update
|
||||||
updated_last_indexes: HashSet<KeychainKind>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DbState {
|
impl<'a, D: BatchDatabase> DbState<'a, D> {
|
||||||
/// Obtain [DbState] from [crate::database::Database].
|
/// Obtain [DbState] from [crate::database::Database].
|
||||||
fn from_db<D: BatchDatabase>(db: &D) -> Result<Self, Error> {
|
fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
|
||||||
|
let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
|
||||||
|
let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;
|
||||||
|
|
||||||
|
// This is a hack to see whether atleast one of the keychains comes from a derivable
|
||||||
|
// descriptor. We assume that non-derivable descriptors always has a script count of 1.
|
||||||
|
let last_count = std::cmp::max(ext_spks.len(), int_spks.len());
|
||||||
|
let has_derivable = last_count > 1;
|
||||||
|
|
||||||
|
// If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
|
||||||
|
// cached.
|
||||||
|
if has_derivable && last_count < params.start_script_count {
|
||||||
|
let inner_err = MissingCachedScripts {
|
||||||
|
last_count,
|
||||||
|
missing_count: params.start_script_count - last_count,
|
||||||
|
};
|
||||||
|
debug!("requesting more spks with: {:?}", inner_err);
|
||||||
|
return Err(Error::MissingCachedScripts(inner_err));
|
||||||
|
}
|
||||||
|
|
||||||
let txs = db
|
let txs = db
|
||||||
.iter_txs(true)?
|
.iter_txs(true)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|tx| (tx.txid, tx))
|
.map(|tx| (tx.txid, tx))
|
||||||
.collect::<HashMap<_, _>>();
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
|
let utxos = db.iter_utxos()?.into_iter().collect::<HashSet<_>>();
|
||||||
|
|
||||||
let last_indexes = [KeychainKind::External, KeychainKind::Internal]
|
let last_indexes = [KeychainKind::External, KeychainKind::Internal]
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|keychain| {
|
.filter_map(|keychain| match db.get_last_index(*keychain) {
|
||||||
db.get_last_index(*keychain)
|
Ok(li_opt) => li_opt.map(|li| Ok((*keychain, li))),
|
||||||
.map(|v| v.map(|i| (*keychain, i)))
|
Err(err) => Some(Err(err)),
|
||||||
.transpose()
|
|
||||||
})
|
})
|
||||||
.collect::<Result<HashMap<_, _>, Error>>()?;
|
.collect::<Result<HashMap<_, _>, Error>>()?;
|
||||||
|
|
||||||
|
info!("initial db state: txs={} utxos={}", txs.len(), utxos.len());
|
||||||
|
|
||||||
|
// "delta" fields
|
||||||
let retained_txs = HashSet::with_capacity(txs.len());
|
let retained_txs = HashSet::with_capacity(txs.len());
|
||||||
let updated_txs = HashSet::with_capacity(txs.len());
|
let updated_txs = HashSet::with_capacity(txs.len());
|
||||||
let updated_utxos = HashSet::with_capacity(utxos.len());
|
let updated_utxos = HashSet::with_capacity(utxos.len());
|
||||||
let updated_last_indexes = HashSet::with_capacity(last_indexes.len());
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
db,
|
||||||
|
params,
|
||||||
|
prog,
|
||||||
|
ext_spks,
|
||||||
|
int_spks,
|
||||||
txs,
|
txs,
|
||||||
utxos,
|
utxos,
|
||||||
last_indexes,
|
last_indexes,
|
||||||
retained_txs,
|
retained_txs,
|
||||||
updated_txs,
|
updated_txs,
|
||||||
updated_utxos,
|
updated_utxos,
|
||||||
updated_last_indexes,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update [DbState] with Core wallet state
|
/// Sync states of [BatchDatabase] and Core wallet.
|
||||||
fn update_state<D>(&mut self, client: &Client, db: &D) -> Result<&mut Self, Error>
|
/// First we import all `scriptPubKey`s from database into core wallet
|
||||||
where
|
fn sync_with_core(&mut self, client: &Client, is_descriptor: bool) -> Result<&mut Self, Error> {
|
||||||
D: BatchDatabase,
|
// this tells Core wallet where to sync from for imported scripts
|
||||||
{
|
let start_epoch = if self.params.force_start_time {
|
||||||
let tx_iter = CoreTxIter::new(client, 10);
|
self.params.start_time
|
||||||
|
} else {
|
||||||
|
self.db
|
||||||
|
.get_sync_time()?
|
||||||
|
.map_or(self.params.start_time, |st| st.block_time.timestamp)
|
||||||
|
};
|
||||||
|
|
||||||
for tx_res in tx_iter {
|
// sync scriptPubKeys from Database to Core wallet
|
||||||
|
let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
|
||||||
|
if is_descriptor {
|
||||||
|
import_descriptors(client, start_epoch, scripts_iter)?;
|
||||||
|
} else {
|
||||||
|
import_multi(client, start_epoch, scripts_iter)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for Core wallet to rescan (TODO: maybe make this async)
|
||||||
|
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
|
||||||
|
|
||||||
|
// loop through results of Core RPC method `listtransactions`
|
||||||
|
for tx_res in CoreTxIter::new(client, 100) {
|
||||||
let tx_res = tx_res?;
|
let tx_res = tx_res?;
|
||||||
|
|
||||||
let mut updated = false;
|
let mut updated = false;
|
||||||
|
|
||||||
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
|
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
|
||||||
@ -390,11 +401,11 @@ impl DbState {
|
|||||||
|
|
||||||
// update raw tx (if needed)
|
// update raw tx (if needed)
|
||||||
let raw_tx =
|
let raw_tx =
|
||||||
match &db_tx.transaction {
|
&*match &mut db_tx.transaction {
|
||||||
Some(raw_tx) => raw_tx,
|
Some(raw_tx) => raw_tx,
|
||||||
None => {
|
db_tx_opt => {
|
||||||
updated = true;
|
updated = true;
|
||||||
db_tx.transaction.insert(client.get_raw_transaction(
|
db_tx_opt.insert(client.get_raw_transaction(
|
||||||
&tx_res.info.txid,
|
&tx_res.info.txid,
|
||||||
tx_res.info.blockhash.as_ref(),
|
tx_res.info.blockhash.as_ref(),
|
||||||
)?)
|
)?)
|
||||||
@ -415,7 +426,7 @@ impl DbState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// update received (if needed)
|
// update received (if needed)
|
||||||
let received = Self::_received_from_raw_tx(db, raw_tx)?;
|
let received = Self::received_from_raw_tx(self.db, raw_tx)?;
|
||||||
if db_tx.received != received {
|
if db_tx.received != received {
|
||||||
updated = true;
|
updated = true;
|
||||||
db_tx.received = received;
|
db_tx.received = received;
|
||||||
@ -436,7 +447,7 @@ impl DbState {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
if let Some((keychain, index)) =
|
if let Some((keychain, index)) =
|
||||||
db.get_path_from_script_pubkey(&txout.script_pubkey)?
|
self.db.get_path_from_script_pubkey(&txout.script_pubkey)?
|
||||||
{
|
{
|
||||||
let utxo = LocalUtxo {
|
let utxo = LocalUtxo {
|
||||||
outpoint: OutPoint::new(tx_res.info.txid, tx_res.detail.vout),
|
outpoint: OutPoint::new(tx_res.info.txid, tx_res.detail.vout),
|
||||||
@ -445,7 +456,7 @@ impl DbState {
|
|||||||
is_spent: false,
|
is_spent: false,
|
||||||
};
|
};
|
||||||
self.updated_utxos.insert(utxo);
|
self.updated_utxos.insert(utxo);
|
||||||
self._update_last_index(keychain, index);
|
self.update_last_index(keychain, index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -456,16 +467,20 @@ impl DbState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update sent from tx inputs
|
// obtain vector of `TransactionDetails::sent` changes
|
||||||
let sent_updates = self
|
let sent_updates = self
|
||||||
.txs
|
.txs
|
||||||
.values()
|
.values()
|
||||||
.filter_map(|db_tx| {
|
// only bother to update txs that are retained
|
||||||
let txid = self.retained_txs.get(&db_tx.txid)?;
|
.filter(|db_tx| self.retained_txs.contains(&db_tx.txid))
|
||||||
self._sent_from_raw_tx(db, db_tx.transaction.as_ref()?)
|
// only bother to update txs where the raw tx is accessable
|
||||||
|
.filter_map(|db_tx| (db_tx.transaction.as_ref().map(|tx| (tx, db_tx.sent))))
|
||||||
|
// recalcuate sent value, only update txs in which sent value is changed
|
||||||
|
.filter_map(|(raw_tx, old_sent)| {
|
||||||
|
self.sent_from_raw_tx(raw_tx)
|
||||||
.map(|sent| {
|
.map(|sent| {
|
||||||
if db_tx.sent != sent {
|
if sent != old_sent {
|
||||||
Some((*txid, sent))
|
Some((raw_tx.txid(), sent))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -475,8 +490,10 @@ impl DbState {
|
|||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
// record send updates
|
// record send updates
|
||||||
sent_updates.into_iter().for_each(|(txid, sent)| {
|
sent_updates.iter().for_each(|&(txid, sent)| {
|
||||||
|
// apply sent field changes
|
||||||
self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
|
self.txs.entry(txid).and_modify(|db_tx| db_tx.sent = sent);
|
||||||
|
// mark tx as modified
|
||||||
self.updated_txs.insert(txid);
|
self.updated_txs.insert(txid);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -484,25 +501,21 @@ impl DbState {
|
|||||||
let core_utxos = client
|
let core_utxos = client
|
||||||
.list_unspent(Some(0), None, None, Some(true), None)?
|
.list_unspent(Some(0), None, None, Some(true), None)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|utxo_res| {
|
.filter_map(|utxo_entry| {
|
||||||
db.get_path_from_script_pubkey(&utxo_res.script_pub_key)
|
let path_result = self
|
||||||
.transpose()
|
.db
|
||||||
.map(|v| {
|
.get_path_from_script_pubkey(&utxo_entry.script_pub_key)
|
||||||
v.map(|(keychain, index)| {
|
.transpose()?;
|
||||||
// update last index if needed
|
|
||||||
self._update_last_index(keychain, index);
|
|
||||||
|
|
||||||
LocalUtxo {
|
let utxo_result = match path_result {
|
||||||
outpoint: OutPoint::new(utxo_res.txid, utxo_res.vout),
|
Ok((keychain, index)) => {
|
||||||
keychain,
|
self.update_last_index(keychain, index);
|
||||||
txout: TxOut {
|
Ok(Self::make_local_utxo(utxo_entry, keychain, false))
|
||||||
value: utxo_res.amount.as_sat(),
|
}
|
||||||
script_pubkey: utxo_res.script_pub_key,
|
Err(err) => Err(err),
|
||||||
},
|
};
|
||||||
is_spent: false,
|
|
||||||
}
|
Some(utxo_result)
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
.collect::<Result<HashSet<_>, Error>>()?;
|
.collect::<Result<HashSet<_>, Error>>()?;
|
||||||
|
|
||||||
@ -521,19 +534,8 @@ impl DbState {
|
|||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// We want to filter out conflicting transactions.
|
|
||||||
/// Only accept transactions that are already confirmed, or existing in mempool.
|
|
||||||
fn _filter_tx(client: &Client, res: GetTransactionResult) -> Option<GetTransactionResult> {
|
|
||||||
if res.info.confirmations > 0 || client.get_mempool_entry(&res.info.txid).is_ok() {
|
|
||||||
Some(res)
|
|
||||||
} else {
|
|
||||||
debug!("tx filtered: {}", res.info.txid);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Calculates received amount from raw tx.
|
/// Calculates received amount from raw tx.
|
||||||
fn _received_from_raw_tx<D: BatchDatabase>(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
|
fn received_from_raw_tx(db: &D, raw_tx: &Transaction) -> Result<u64, Error> {
|
||||||
raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
|
raw_tx.output.iter().try_fold(0_u64, |recv, txo| {
|
||||||
let v = if db.is_mine(&txo.script_pubkey)? {
|
let v = if db.is_mine(&txo.script_pubkey)? {
|
||||||
txo.value
|
txo.value
|
||||||
@ -545,15 +547,16 @@ impl DbState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Calculates sent from raw tx.
|
/// Calculates sent from raw tx.
|
||||||
fn _sent_from_raw_tx<D: BatchDatabase>(
|
fn sent_from_raw_tx(&self, raw_tx: &Transaction) -> Result<u64, Error> {
|
||||||
&self,
|
let get_output = |outpoint: &OutPoint| {
|
||||||
db: &D,
|
let raw_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
|
||||||
raw_tx: &Transaction,
|
raw_tx.output.get(outpoint.vout as usize)
|
||||||
) -> Result<u64, Error> {
|
};
|
||||||
|
|
||||||
raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
|
raw_tx.input.iter().try_fold(0_u64, |sent, txin| {
|
||||||
let v = match self._previous_output(&txin.previous_output) {
|
let v = match get_output(&txin.previous_output) {
|
||||||
Some(prev_txo) => {
|
Some(prev_txo) => {
|
||||||
if db.is_mine(&prev_txo.script_pubkey)? {
|
if self.db.is_mine(&prev_txo.script_pubkey)? {
|
||||||
prev_txo.value
|
prev_txo.value
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
@ -565,60 +568,74 @@ impl DbState {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _previous_output(&self, outpoint: &OutPoint) -> Option<&TxOut> {
|
// updates the db state's last_index for the given keychain (if larger than current last_index)
|
||||||
let prev_tx = self.txs.get(&outpoint.txid)?.transaction.as_ref()?;
|
fn update_last_index(&mut self, keychain: KeychainKind, index: u32) {
|
||||||
prev_tx.output.get(outpoint.vout as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn _update_last_index(&mut self, keychain: KeychainKind, index: u32) {
|
|
||||||
let mut updated = false;
|
|
||||||
|
|
||||||
self.last_indexes
|
self.last_indexes
|
||||||
.entry(keychain)
|
.entry(keychain)
|
||||||
.and_modify(|last| {
|
.and_modify(|last| {
|
||||||
if *last < index {
|
if *last < index {
|
||||||
updated = true;
|
|
||||||
*last = index;
|
*last = index;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| index);
|
||||||
updated = true;
|
}
|
||||||
index
|
|
||||||
});
|
|
||||||
|
|
||||||
if updated {
|
fn make_local_utxo(
|
||||||
self.updated_last_indexes.insert(keychain);
|
entry: ListUnspentResultEntry,
|
||||||
|
keychain: KeychainKind,
|
||||||
|
is_spent: bool,
|
||||||
|
) -> LocalUtxo {
|
||||||
|
LocalUtxo {
|
||||||
|
outpoint: OutPoint::new(entry.txid, entry.vout),
|
||||||
|
txout: TxOut {
|
||||||
|
value: entry.amount.as_sat(),
|
||||||
|
script_pubkey: entry.script_pub_key,
|
||||||
|
},
|
||||||
|
keychain,
|
||||||
|
is_spent,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare db batch operations.
|
/// Prepare db batch operations.
|
||||||
fn update_batch<D: BatchDatabase>(&self, batch: &mut D::Batch) -> Result<(), Error> {
|
fn as_db_batch(&self) -> Result<D::Batch, Error> {
|
||||||
// delete stale txs from db
|
let mut batch = self.db.begin_batch();
|
||||||
// stale = not retained
|
let mut del_txs = 0_u32;
|
||||||
|
|
||||||
|
// delete stale (not retained) txs from db
|
||||||
self.txs
|
self.txs
|
||||||
.keys()
|
.keys()
|
||||||
.filter(|&txid| !self.retained_txs.contains(txid))
|
.filter(|&txid| !self.retained_txs.contains(txid))
|
||||||
.try_for_each(|txid| batch.del_tx(txid, false).map(|_| ()))?;
|
.try_for_each(|txid| -> Result<(), Error> {
|
||||||
|
batch.del_tx(txid, false)?;
|
||||||
|
del_txs += 1;
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
// update txs
|
// update txs
|
||||||
self.updated_txs
|
self.updated_txs
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|txid| self.txs.get(txid))
|
.inspect(|&txid| debug!("updating tx: {}", txid))
|
||||||
.try_for_each(|txd| batch.set_tx(txd))?;
|
.try_for_each(|txid| batch.set_tx(self.txs.get(txid).unwrap()))?;
|
||||||
|
|
||||||
// update utxos
|
// update utxos
|
||||||
self.updated_utxos
|
self.updated_utxos
|
||||||
.iter()
|
.iter()
|
||||||
.inspect(|&utxo| println!("updating: {:?}", utxo.txout))
|
.inspect(|&utxo| debug!("updating utxo: {}", utxo.outpoint))
|
||||||
.try_for_each(|utxo| batch.set_utxo(utxo))?;
|
.try_for_each(|utxo| batch.set_utxo(utxo))?;
|
||||||
|
|
||||||
// update last indexes
|
// update last indexes
|
||||||
self.updated_last_indexes
|
self.last_indexes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|keychain| self.last_indexes.get_key_value(keychain).unwrap())
|
|
||||||
.try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
|
.try_for_each(|(&keychain, &index)| batch.set_last_index(keychain, index))?;
|
||||||
|
|
||||||
Ok(())
|
info!(
|
||||||
|
"db batch updates: del_txs={}, update_txs={}, update_utxos={}",
|
||||||
|
del_txs,
|
||||||
|
self.updated_txs.len(),
|
||||||
|
self.updated_utxos.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(batch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -678,6 +695,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Iterates through results of multiple `listtransactions` calls.
|
||||||
struct CoreTxIter<'a> {
|
struct CoreTxIter<'a> {
|
||||||
client: &'a Client,
|
client: &'a Client,
|
||||||
page_size: usize,
|
page_size: usize,
|
||||||
@ -688,7 +706,11 @@ struct CoreTxIter<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> CoreTxIter<'a> {
|
impl<'a> CoreTxIter<'a> {
|
||||||
fn new(client: &'a Client, page_size: usize) -> Self {
|
fn new(client: &'a Client, mut page_size: usize) -> Self {
|
||||||
|
if page_size > 1000 {
|
||||||
|
page_size = 1000;
|
||||||
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
client,
|
client,
|
||||||
page_size,
|
page_size,
|
||||||
@ -700,7 +722,7 @@ impl<'a> CoreTxIter<'a> {
|
|||||||
|
|
||||||
/// We want to filter out conflicting transactions.
|
/// We want to filter out conflicting transactions.
|
||||||
/// Only accept transactions that are already confirmed, or existing in mempool.
|
/// Only accept transactions that are already confirmed, or existing in mempool.
|
||||||
fn tx_ok(&self, item: &ListTransactionResult) -> bool {
|
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
|
||||||
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
|
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -715,7 +737,7 @@ impl<'a> Iterator for CoreTxIter<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(item) = self.stack.pop() {
|
if let Some(item) = self.stack.pop() {
|
||||||
if self.tx_ok(&item) {
|
if self.keep_tx(&item) {
|
||||||
return Some(Ok(item));
|
return Some(Ok(item));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -750,32 +772,26 @@ impl<'a> Iterator for CoreTxIter<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_scanning_details(client: &Client) -> Result<ScanningDetails, Error> {
|
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct CallResult {
|
struct CallResult {
|
||||||
scanning: ScanningDetails,
|
scanning: ScanningDetails,
|
||||||
}
|
}
|
||||||
let result: CallResult = client.call("getwalletinfo", &[])?;
|
|
||||||
Ok(result.scanning)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn await_wallet_scan(
|
let dur = Duration::from_secs(rate_sec);
|
||||||
client: &Client,
|
|
||||||
poll_rate_sec: u64,
|
|
||||||
progress_update: &dyn Progress,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let dur = Duration::from_secs(poll_rate_sec);
|
|
||||||
loop {
|
loop {
|
||||||
match get_scanning_details(client)? {
|
match client.call::<CallResult>("getwalletinfo", &[])?.scanning {
|
||||||
ScanningDetails::Scanning { duration, progress } => {
|
ScanningDetails::Scanning {
|
||||||
println!("scanning: duration={}, progress={}", duration, progress);
|
duration,
|
||||||
progress_update
|
progress: pc,
|
||||||
.update(progress, Some(format!("elapsed for {} seconds", duration)))?;
|
} => {
|
||||||
|
debug!("scanning: duration={}, progress={}", duration, pc);
|
||||||
|
progress.update(pc, Some(format!("elapsed for {} seconds", duration)))?;
|
||||||
thread::sleep(dur);
|
thread::sleep(dur);
|
||||||
}
|
}
|
||||||
ScanningDetails::NotScanning(_) => {
|
ScanningDetails::NotScanning(_) => {
|
||||||
progress_update.update(1.0, None)?;
|
progress.update(1.0, None)?;
|
||||||
println!("scanning: done!");
|
info!("scanning: done!");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user