Merge bitcoindevkit/bdk#965: Implement persistence with the new structures
4963240599364c6520d0a6ecae97db77cc7b8ba8 Add more `impl`s for `Append` and docs for file store `magic` (志宇) 2aa08a5898545f670df9ed9c4804231f321d811a [persist_redesign] Introduce redesigned `persist` types (志宇) Pull request description: ### Description This is part of #895 and #971 * Introduce a more generic version of the `keychain::persist::*` structures that only needs a single generic for the changeset type. Additional changes: * The `Append` trait has a new method `is_empty`. * Introduce `Store` structure for `bdk_file_store` (which implements `PersistBackend`). ### Changelog notice ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### New Features: * [x] I've added tests for the new feature * [x] I've added docs for the new feature Top commit has no ACKs. Tree-SHA512: 0211fbe7d7e27805d3ed3a80b42f184cdff1cebb32fd559aa9838e4a7f7c7e47b6c366b6ef68e299f876bafed549b8d1d8b8cc0366bf5b61db079504a565b9b4
This commit is contained in:
commit
05d353c0ad
@ -301,6 +301,10 @@ impl<A: Anchor, IA: Append> Append for IndexedAdditions<A, IA> {
|
||||
self.graph_additions.append(other.graph_additions);
|
||||
self.index_additions.append(other.index_additions);
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.graph_additions.is_empty() && self.index_additions.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a structure that can index transaction data.
|
||||
|
@ -84,6 +84,10 @@ impl<K: Ord> Append for DerivationAdditions<K> {
|
||||
|
||||
self.0.append(&mut other.0);
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> Default for DerivationAdditions<K> {
|
||||
|
@ -33,6 +33,8 @@ pub mod tx_graph;
|
||||
pub use tx_data_traits::*;
|
||||
mod chain_oracle;
|
||||
pub use chain_oracle::*;
|
||||
mod persist;
|
||||
pub use persist::*;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub mod example_utils;
|
||||
|
89
crates/chain/src/persist.rs
Normal file
89
crates/chain/src/persist.rs
Normal file
@ -0,0 +1,89 @@
|
||||
use core::convert::Infallible;
|
||||
|
||||
use crate::Append;
|
||||
|
||||
/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`)
|
||||
/// before they are persisted.
|
||||
///
|
||||
/// Not all changes to the in-memory representation needs to be written to disk right away, so
|
||||
/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used
|
||||
/// to write changes to disk.
|
||||
#[derive(Debug)]
|
||||
pub struct Persist<B, C> {
|
||||
backend: B,
|
||||
stage: C,
|
||||
}
|
||||
|
||||
impl<B, C> Persist<B, C>
|
||||
where
|
||||
B: PersistBackend<C>,
|
||||
C: Default + Append,
|
||||
{
|
||||
/// Create a new [`Persist`] from [`PersistBackend`].
|
||||
pub fn new(backend: B) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
stage: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stage a `changeset` to be commited later with [`commit`].
|
||||
///
|
||||
/// [`commit`]: Self::commit
|
||||
pub fn stage(&mut self, changeset: C) {
|
||||
self.stage.append(changeset)
|
||||
}
|
||||
|
||||
/// Get the changes that have not been commited yet.
|
||||
pub fn staged(&self) -> &C {
|
||||
&self.stage
|
||||
}
|
||||
|
||||
/// Commit the staged changes to the underlying persistance backend.
|
||||
///
|
||||
/// Returns a backend-defined error if this fails.
|
||||
pub fn commit(&mut self) -> Result<(), B::WriteError> {
|
||||
let mut temp = C::default();
|
||||
core::mem::swap(&mut temp, &mut self.stage);
|
||||
self.backend.write_changes(&temp)
|
||||
}
|
||||
}
|
||||
|
||||
/// A persistence backend for [`Persist`].
|
||||
///
|
||||
/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
|
||||
/// that are to be persisted, or retrieved from persistence.
|
||||
pub trait PersistBackend<C> {
|
||||
/// The error the backend returns when it fails to write.
|
||||
type WriteError: core::fmt::Debug;
|
||||
|
||||
/// The error the backend returns when it fails to load changesets `C`.
|
||||
type LoadError: core::fmt::Debug;
|
||||
|
||||
/// Writes a changeset to the persistence backend.
|
||||
///
|
||||
/// It is up to the backend what it does with this. It could store every changeset in a list or
|
||||
/// it inserts the actual changes into a more structured database. All it needs to guarantee is
|
||||
/// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
|
||||
/// changesets had been applied sequentially.
|
||||
///
|
||||
/// [`load_from_persistence`]: Self::load_from_persistence
|
||||
fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;
|
||||
|
||||
/// Return the aggregate changeset `C` from persistence.
|
||||
fn load_from_persistence(&mut self) -> Result<C, Self::LoadError>;
|
||||
}
|
||||
|
||||
impl<C: Default> PersistBackend<C> for () {
|
||||
type WriteError = Infallible;
|
||||
|
||||
type LoadError = Infallible;
|
||||
|
||||
fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
|
||||
Ok(C::default())
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
use crate::collections::BTreeMap;
|
||||
use crate::collections::BTreeSet;
|
||||
use crate::BlockId;
|
||||
use alloc::vec::Vec;
|
||||
use bitcoin::{Block, OutPoint, Transaction, TxOut};
|
||||
|
||||
/// Trait to do something with every txout contained in a structure.
|
||||
@ -64,20 +65,56 @@ impl<A: Anchor> Anchor for &'static A {
|
||||
pub trait Append {
|
||||
/// Append another object of the same type onto `self`.
|
||||
fn append(&mut self, other: Self);
|
||||
|
||||
/// Returns whether the structure is considered empty.
|
||||
fn is_empty(&self) -> bool;
|
||||
}
|
||||
|
||||
impl Append for () {
|
||||
fn append(&mut self, _other: Self) {}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Ord, V> Append for BTreeMap<K, V> {
|
||||
fn append(&mut self, mut other: Self) {
|
||||
BTreeMap::append(self, &mut other)
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
BTreeMap::is_empty(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Ord> Append for BTreeSet<T> {
|
||||
fn append(&mut self, mut other: Self) {
|
||||
BTreeSet::append(self, &mut other)
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
BTreeSet::is_empty(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Append for Vec<T> {
|
||||
fn append(&mut self, mut other: Self) {
|
||||
Vec::append(self, &mut other)
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
Vec::is_empty(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Append, B: Append> Append for (A, B) {
|
||||
fn append(&mut self, other: Self) {
|
||||
Append::append(&mut self.0, other.0);
|
||||
Append::append(&mut self.1, other.1);
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
Append::is_empty(&self.0) && Append::is_empty(&self.1)
|
||||
}
|
||||
}
|
||||
|
@ -940,6 +940,13 @@ impl<A: Ord> Append for Additions<A> {
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.tx.is_empty()
|
||||
&& self.txout.is_empty()
|
||||
&& self.anchors.is_empty()
|
||||
&& self.last_seen.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> AsRef<TxGraph<A>> for TxGraph<A> {
|
||||
|
100
crates/file_store/src/entry_iter.rs
Normal file
100
crates/file_store/src/entry_iter.rs
Normal file
@ -0,0 +1,100 @@
|
||||
use bincode::Options;
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{self, Seek},
|
||||
marker::PhantomData,
|
||||
};
|
||||
|
||||
use crate::bincode_options;
|
||||
|
||||
/// Iterator over entries in a file store.
|
||||
///
|
||||
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
|
||||
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
|
||||
///
|
||||
/// [`next`]: Self::next
|
||||
pub struct EntryIter<'t, T> {
|
||||
db_file: Option<&'t mut File>,
|
||||
|
||||
/// The file position for the first read of `db_file`.
|
||||
start_pos: Option<u64>,
|
||||
types: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'t, T> EntryIter<'t, T> {
|
||||
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
|
||||
Self {
|
||||
db_file: Some(db_file),
|
||||
start_pos: Some(start_pos),
|
||||
types: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t, T> Iterator for EntryIter<'t, T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
type Item = Result<T, IterError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// closure which reads a single entry starting from `self.pos`
|
||||
let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
|
||||
let pos = match start_pos {
|
||||
Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
|
||||
None => f.stream_position()?,
|
||||
};
|
||||
|
||||
match bincode_options().deserialize_from(&*f) {
|
||||
Ok(changeset) => {
|
||||
f.stream_position()?;
|
||||
Ok(Some(changeset))
|
||||
}
|
||||
Err(e) => {
|
||||
if let bincode::ErrorKind::Io(inner) = &*e {
|
||||
if inner.kind() == io::ErrorKind::UnexpectedEof {
|
||||
let eof = f.seek(io::SeekFrom::End(0))?;
|
||||
if pos == eof {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
f.seek(io::SeekFrom::Start(pos))?;
|
||||
Err(IterError::Bincode(*e))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
|
||||
if result.is_err() {
|
||||
self.db_file = None;
|
||||
}
|
||||
result.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for IterError {
|
||||
fn from(value: io::Error) -> Self {
|
||||
IterError::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Error type for [`EntryIter`].
|
||||
#[derive(Debug)]
|
||||
pub enum IterError {
|
||||
/// Failure to read from the file.
|
||||
Io(io::Error),
|
||||
/// Failure to decode data from the file.
|
||||
Bincode(bincode::ErrorKind),
|
||||
}
|
||||
|
||||
impl core::fmt::Display for IterError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
|
||||
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for IterError {}
|
@ -6,14 +6,15 @@ use bdk_chain::{
|
||||
keychain::{KeychainChangeSet, KeychainTracker},
|
||||
sparse_chain,
|
||||
};
|
||||
use bincode::{DefaultOptions, Options};
|
||||
use core::marker::PhantomData;
|
||||
use bincode::Options;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, Read, Seek, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use crate::{bincode_options, EntryIter, IterError};
|
||||
|
||||
/// BDK File Store magic bytes length.
|
||||
const MAGIC_BYTES_LEN: usize = 12;
|
||||
|
||||
@ -28,10 +29,6 @@ pub struct KeychainStore<K, P> {
|
||||
changeset_type_params: core::marker::PhantomData<(K, P)>,
|
||||
}
|
||||
|
||||
fn bincode() -> impl bincode::Options {
|
||||
DefaultOptions::new().with_varint_encoding()
|
||||
}
|
||||
|
||||
impl<K, P> KeychainStore<K, P>
|
||||
where
|
||||
K: Ord + Clone + core::fmt::Debug,
|
||||
@ -85,11 +82,8 @@ where
|
||||
/// **WARNING**: This method changes the write position in the underlying file. You should
|
||||
/// always iterate over all entries until `None` is returned if you want your next write to go
|
||||
/// at the end; otherwise, you will write over existing entries.
|
||||
pub fn iter_changesets(&mut self) -> Result<EntryIter<'_, KeychainChangeSet<K, P>>, io::Error> {
|
||||
self.db_file
|
||||
.seek(io::SeekFrom::Start(MAGIC_BYTES_LEN as _))?;
|
||||
|
||||
Ok(EntryIter::new(&mut self.db_file))
|
||||
pub fn iter_changesets(&mut self) -> Result<EntryIter<KeychainChangeSet<K, P>>, io::Error> {
|
||||
Ok(EntryIter::new(MAGIC_BYTES_LEN as u64, &mut self.db_file))
|
||||
}
|
||||
|
||||
/// Loads all the changesets that have been stored as one giant changeset.
|
||||
@ -144,7 +138,7 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
bincode()
|
||||
bincode_options()
|
||||
.serialize_into(&mut self.db_file, changeset)
|
||||
.map_err(|e| match *e {
|
||||
bincode::ErrorKind::Io(inner) => inner,
|
||||
@ -197,92 +191,6 @@ impl From<io::Error> for FileError {
|
||||
|
||||
impl std::error::Error for FileError {}
|
||||
|
||||
/// Error type for [`EntryIter`].
|
||||
#[derive(Debug)]
|
||||
pub enum IterError {
|
||||
/// Failure to read from the file.
|
||||
Io(io::Error),
|
||||
/// Failure to decode data from the file.
|
||||
Bincode(bincode::ErrorKind),
|
||||
}
|
||||
|
||||
impl core::fmt::Display for IterError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
|
||||
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for IterError {}
|
||||
|
||||
/// Iterator over entries in a file store.
|
||||
///
|
||||
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
|
||||
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
|
||||
///
|
||||
/// [`next`]: Self::next
|
||||
pub struct EntryIter<'a, V> {
|
||||
db_file: &'a mut File,
|
||||
types: PhantomData<V>,
|
||||
error_exit: bool,
|
||||
}
|
||||
|
||||
impl<'a, V> EntryIter<'a, V> {
|
||||
pub fn new(db_file: &'a mut File) -> Self {
|
||||
Self {
|
||||
db_file,
|
||||
types: PhantomData,
|
||||
error_exit: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> Iterator for EntryIter<'a, V>
|
||||
where
|
||||
V: serde::de::DeserializeOwned,
|
||||
{
|
||||
type Item = Result<V, IterError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let result = (|| {
|
||||
let pos = self.db_file.stream_position()?;
|
||||
|
||||
match bincode().deserialize_from(&mut self.db_file) {
|
||||
Ok(changeset) => Ok(Some(changeset)),
|
||||
Err(e) => {
|
||||
if let bincode::ErrorKind::Io(inner) = &*e {
|
||||
if inner.kind() == io::ErrorKind::UnexpectedEof {
|
||||
let eof = self.db_file.seek(io::SeekFrom::End(0))?;
|
||||
if pos == eof {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.db_file.seek(io::SeekFrom::Start(pos))?;
|
||||
Err(IterError::Bincode(*e))
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
let result = result.transpose();
|
||||
|
||||
if let Some(Err(_)) = &result {
|
||||
self.error_exit = true;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for IterError {
|
||||
fn from(value: io::Error) -> Self {
|
||||
IterError::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
@ -290,6 +198,7 @@ mod test {
|
||||
keychain::{DerivationAdditions, KeychainChangeSet},
|
||||
TxHeight,
|
||||
};
|
||||
use bincode::DefaultOptions;
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
vec::Vec,
|
@ -1,10 +1,51 @@
|
||||
#![doc = include_str!("../README.md")]
|
||||
mod file_store;
|
||||
mod entry_iter;
|
||||
mod keychain_store;
|
||||
mod store;
|
||||
use std::io;
|
||||
|
||||
use bdk_chain::{
|
||||
keychain::{KeychainChangeSet, KeychainTracker, PersistBackend},
|
||||
sparse_chain::ChainPosition,
|
||||
};
|
||||
pub use file_store::*;
|
||||
use bincode::{DefaultOptions, Options};
|
||||
pub use entry_iter::*;
|
||||
pub use keychain_store::*;
|
||||
pub use store::*;
|
||||
|
||||
pub(crate) fn bincode_options() -> impl bincode::Options {
|
||||
DefaultOptions::new().with_varint_encoding()
|
||||
}
|
||||
|
||||
/// Error that occurs due to problems encountered with the file.
|
||||
#[derive(Debug)]
|
||||
pub enum FileError<'a> {
|
||||
/// IO error, this may mean that the file is too short.
|
||||
Io(io::Error),
|
||||
/// Magic bytes do not match what is expected.
|
||||
InvalidMagicBytes { got: Vec<u8>, expected: &'a [u8] },
|
||||
}
|
||||
|
||||
impl<'a> core::fmt::Display for FileError<'a> {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
Self::Io(e) => write!(f, "io error trying to read file: {}", e),
|
||||
Self::InvalidMagicBytes { got, expected } => write!(
|
||||
f,
|
||||
"file has invalid magic bytes: expected={:?} got={:?}",
|
||||
expected, got,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<io::Error> for FileError<'a> {
|
||||
fn from(value: io::Error) -> Self {
|
||||
Self::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::error::Error for FileError<'a> {}
|
||||
|
||||
impl<K, P> PersistBackend<K, P> for KeychainStore<K, P>
|
||||
where
|
||||
|
255
crates/file_store/src/store.rs
Normal file
255
crates/file_store/src/store.rs
Normal file
@ -0,0 +1,255 @@
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, Read, Seek, Write},
|
||||
marker::PhantomData,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use bdk_chain::{Append, PersistBackend};
|
||||
use bincode::Options;
|
||||
|
||||
use crate::{bincode_options, EntryIter, FileError, IterError};
|
||||
|
||||
/// Persists an append-only list of changesets (`C`) to a single file.
|
||||
///
|
||||
/// The changesets are the results of altering a tracker implementation (`T`).
|
||||
#[derive(Debug)]
|
||||
pub struct Store<'a, C> {
|
||||
magic: &'a [u8],
|
||||
db_file: File,
|
||||
marker: PhantomData<C>,
|
||||
}
|
||||
|
||||
impl<'a, C> PersistBackend<C> for Store<'a, C>
|
||||
where
|
||||
C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
|
||||
{
|
||||
type WriteError = std::io::Error;
|
||||
|
||||
type LoadError = IterError;
|
||||
|
||||
fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> {
|
||||
self.append_changeset(changeset)
|
||||
}
|
||||
|
||||
fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
|
||||
let (changeset, result) = self.aggregate_changesets();
|
||||
result.map(|_| changeset)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, C> Store<'a, C>
|
||||
where
|
||||
C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
|
||||
{
|
||||
/// Creates a new store from a [`File`].
|
||||
///
|
||||
/// The file must have been opened with read and write permissions.
|
||||
///
|
||||
/// `magic` is the expected prefixed bytes of the file. If this does not match, an error will be
|
||||
/// returned.
|
||||
///
|
||||
/// [`File`]: std::fs::File
|
||||
pub fn new(magic: &'a [u8], mut db_file: File) -> Result<Self, FileError> {
|
||||
db_file.rewind()?;
|
||||
|
||||
let mut magic_buf = vec![0_u8; magic.len()];
|
||||
db_file.read_exact(magic_buf.as_mut())?;
|
||||
|
||||
if magic_buf != magic {
|
||||
return Err(FileError::InvalidMagicBytes {
|
||||
got: magic_buf,
|
||||
expected: magic,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
magic,
|
||||
db_file,
|
||||
marker: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates or loads a store from `db_path`.
|
||||
///
|
||||
/// If no file exists there, it will be created.
|
||||
///
|
||||
/// Refer to [`new`] for documentation on the `magic` input.
|
||||
///
|
||||
/// [`new`]: Self::new
|
||||
pub fn new_from_path<P>(magic: &'a [u8], db_path: P) -> Result<Self, FileError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let already_exists = db_path.as_ref().exists();
|
||||
|
||||
let mut db_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(db_path)?;
|
||||
|
||||
if !already_exists {
|
||||
db_file.write_all(magic)?;
|
||||
}
|
||||
|
||||
Self::new(magic, db_file)
|
||||
}
|
||||
|
||||
/// Iterates over the stored changeset from first to last, changing the seek position at each
|
||||
/// iteration.
|
||||
///
|
||||
/// The iterator may fail to read an entry and therefore return an error. However, the first time
|
||||
/// it returns an error will be the last. After doing so, the iterator will always yield `None`.
|
||||
///
|
||||
/// **WARNING**: This method changes the write position in the underlying file. You should
|
||||
/// always iterate over all entries until `None` is returned if you want your next write to go
|
||||
/// at the end; otherwise, you will write over existing entries.
|
||||
pub fn iter_changesets(&mut self) -> EntryIter<C> {
|
||||
EntryIter::new(self.magic.len() as u64, &mut self.db_file)
|
||||
}
|
||||
|
||||
/// Loads all the changesets that have been stored as one giant changeset.
|
||||
///
|
||||
/// This function returns a tuple of the aggregate changeset and a result that indicates
|
||||
/// whether an error occurred while reading or deserializing one of the entries. If so the
|
||||
/// changeset will consist of all of those it was able to read.
|
||||
///
|
||||
/// You should usually check the error. In many applications, it may make sense to do a full
|
||||
/// wallet scan with a stop-gap after getting an error, since it is likely that one of the
|
||||
/// changesets it was unable to read changed the derivation indices of the tracker.
|
||||
///
|
||||
/// **WARNING**: This method changes the write position of the underlying file. The next
|
||||
/// changeset will be written over the erroring entry (or the end of the file if none existed).
|
||||
pub fn aggregate_changesets(&mut self) -> (C, Result<(), IterError>) {
|
||||
let mut changeset = C::default();
|
||||
let result = (|| {
|
||||
for next_changeset in self.iter_changesets() {
|
||||
changeset.append(next_changeset?);
|
||||
}
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
(changeset, result)
|
||||
}
|
||||
|
||||
/// Append a new changeset to the file and truncate the file to the end of the appended
|
||||
/// changeset.
|
||||
///
|
||||
/// The truncation is to avoid the possibility of having a valid but inconsistent changeset
|
||||
/// directly after the appended changeset.
|
||||
pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> {
|
||||
// no need to write anything if changeset is empty
|
||||
if changeset.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
bincode_options()
|
||||
.serialize_into(&mut self.db_file, changeset)
|
||||
.map_err(|e| match *e {
|
||||
bincode::ErrorKind::Io(inner) => inner,
|
||||
unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
|
||||
})?;
|
||||
|
||||
// truncate file after this changeset addition
|
||||
// if this is not done, data after this changeset may represent valid changesets, however
|
||||
// applying those changesets on top of this one may result in an inconsistent state
|
||||
let pos = self.db_file.stream_position()?;
|
||||
self.db_file.set_len(pos)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
use bincode::DefaultOptions;
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
vec::Vec,
|
||||
};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
const TEST_MAGIC_BYTES_LEN: usize = 12;
|
||||
const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
|
||||
[98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
|
||||
|
||||
type TestChangeSet = Vec<String>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestTracker;
|
||||
|
||||
#[test]
|
||||
fn new_fails_if_file_is_too_short() {
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1])
|
||||
.expect("should write");
|
||||
|
||||
match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
|
||||
Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
|
||||
unexpected => panic!("unexpected result: {:?}", unexpected),
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_fails_if_magic_bytes_are_invalid() {
|
||||
let invalid_magic_bytes = "ldkfs0000000";
|
||||
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
file.write_all(invalid_magic_bytes.as_bytes())
|
||||
.expect("should write");
|
||||
|
||||
match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
|
||||
Err(FileError::InvalidMagicBytes { got, .. }) => {
|
||||
assert_eq!(got, invalid_magic_bytes.as_bytes())
|
||||
}
|
||||
unexpected => panic!("unexpected result: {:?}", unexpected),
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn append_changeset_truncates_invalid_bytes() {
|
||||
// initial data to write to file (magic bytes + invalid data)
|
||||
let mut data = [255_u8; 2000];
|
||||
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
|
||||
|
||||
let changeset = vec!["one".into(), "two".into(), "three!".into()];
|
||||
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
file.write_all(&data).expect("should write");
|
||||
|
||||
let mut store = Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap())
|
||||
.expect("should open");
|
||||
match store.iter_changesets().next() {
|
||||
Some(Err(IterError::Bincode(_))) => {}
|
||||
unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
|
||||
}
|
||||
|
||||
store.append_changeset(&changeset).expect("should append");
|
||||
|
||||
drop(store);
|
||||
|
||||
let got_bytes = {
|
||||
let mut buf = Vec::new();
|
||||
file.reopen()
|
||||
.unwrap()
|
||||
.read_to_end(&mut buf)
|
||||
.expect("should read");
|
||||
buf
|
||||
};
|
||||
|
||||
let expected_bytes = {
|
||||
let mut buf = TEST_MAGIC_BYTES.to_vec();
|
||||
DefaultOptions::new()
|
||||
.with_varint_encoding()
|
||||
.serialize_into(&mut buf, &changeset)
|
||||
.expect("should encode");
|
||||
buf
|
||||
};
|
||||
|
||||
assert_eq!(got_bytes, expected_bytes);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user