feat(persist): add PersistAsync trait and StagedPersistAsync struct
This commit is contained in:
parent
aa640ab277
commit
54b0c11cbe
@ -19,6 +19,7 @@ serde_crate = { package = "serde", version = "1", optional = true, features = ["
|
|||||||
# Use hashbrown as a feature flag to have HashSet and HashMap from it.
|
# Use hashbrown as a feature flag to have HashSet and HashMap from it.
|
||||||
hashbrown = { version = "0.9.1", optional = true, features = ["serde"] }
|
hashbrown = { version = "0.9.1", optional = true, features = ["serde"] }
|
||||||
miniscript = { version = "12.0.0", optional = true, default-features = false }
|
miniscript = { version = "12.0.0", optional = true, default-features = false }
|
||||||
|
async-trait = { version = "0.1.80", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
@ -29,3 +30,4 @@ default = ["std", "miniscript", "persist"]
|
|||||||
std = ["bitcoin/std", "miniscript?/std"]
|
std = ["bitcoin/std", "miniscript?/std"]
|
||||||
serde = ["serde_crate", "bitcoin/serde", "miniscript?/serde"]
|
serde = ["serde_crate", "bitcoin/serde", "miniscript?/serde"]
|
||||||
persist = ["miniscript"]
|
persist = ["miniscript"]
|
||||||
|
async = ["async-trait"]
|
||||||
|
@ -8,6 +8,10 @@
|
|||||||
//! typically persisted together.
|
//! typically persisted together.
|
||||||
|
|
||||||
use crate::{indexed_tx_graph, keychain, local_chain, Anchor, Append};
|
use crate::{indexed_tx_graph, keychain, local_chain, Anchor, Append};
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
use alloc::boxed::Box;
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
use async_trait::async_trait;
|
||||||
use bitcoin::Network;
|
use bitcoin::Network;
|
||||||
use core::convert::Infallible;
|
use core::convert::Infallible;
|
||||||
use core::default::Default;
|
use core::default::Default;
|
||||||
@ -122,6 +126,48 @@ impl<C> Persist<C> for () {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
/// An async persistence backend for writing and loading changesets.
|
||||||
|
///
|
||||||
|
/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
|
||||||
|
/// that are to be persisted, or retrieved from persistence.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait PersistAsync<C> {
|
||||||
|
/// The error the backend returns when it fails to write.
|
||||||
|
type WriteError: Debug + Display;
|
||||||
|
|
||||||
|
/// The error the backend returns when it fails to load changesets `C`.
|
||||||
|
type LoadError: Debug + Display;
|
||||||
|
|
||||||
|
/// Writes a changeset to the persistence backend.
|
||||||
|
///
|
||||||
|
/// It is up to the backend what it does with this. It could store every changeset in a list or
|
||||||
|
/// it inserts the actual changes into a more structured database. All it needs to guarantee is
|
||||||
|
/// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
|
||||||
|
/// changesets had been applied sequentially.
|
||||||
|
///
|
||||||
|
/// [`load_from_persistence`]: Self::load_changes
|
||||||
|
async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;
|
||||||
|
|
||||||
|
/// Return the aggregate changeset `C` from persistence.
|
||||||
|
async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
#[async_trait]
|
||||||
|
impl<C> PersistAsync<C> for () {
|
||||||
|
type WriteError = Infallible;
|
||||||
|
type LoadError = Infallible;
|
||||||
|
|
||||||
|
async fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// `StagedPersist` adds a convenient staging area for changesets before they are persisted.
|
/// `StagedPersist` adds a convenient staging area for changesets before they are persisted.
|
||||||
///
|
///
|
||||||
/// Not all changes to the in-memory representation needs to be written to disk right away, so
|
/// Not all changes to the in-memory representation needs to be written to disk right away, so
|
||||||
@ -208,6 +254,97 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
/// `StagedPersistAsync` adds a convenient async staging area for changesets before they are persisted.
|
||||||
|
///
|
||||||
|
/// Not all changes to the in-memory representation needs to be written to disk right away, so
|
||||||
|
/// [`StagedPersistAsync::stage`] can be used to *stage* changes first and then
|
||||||
|
/// [`StagedPersistAsync::commit`] can be used to write changes to disk.
|
||||||
|
pub struct StagedPersistAsync<C, P: PersistAsync<C>> {
|
||||||
|
inner: P,
|
||||||
|
staged: C,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
#[async_trait]
|
||||||
|
impl<C: Send + Sync, P: PersistAsync<C> + Send> PersistAsync<C> for StagedPersistAsync<C, P> {
|
||||||
|
type WriteError = P::WriteError;
|
||||||
|
type LoadError = P::LoadError;
|
||||||
|
|
||||||
|
async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> {
|
||||||
|
self.inner.write_changes(changeset).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_changes(&mut self) -> Result<Option<C>, Self::LoadError> {
|
||||||
|
self.inner.load_changes().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async")]
|
||||||
|
impl<C, P> StagedPersistAsync<C, P>
|
||||||
|
where
|
||||||
|
C: Default + Append + Send + Sync,
|
||||||
|
P: PersistAsync<C> + Send,
|
||||||
|
{
|
||||||
|
/// Create a new [`StagedPersistAsync`] adding staging to an inner data store that implements
|
||||||
|
/// [`PersistAsync`].
|
||||||
|
pub fn new(persist: P) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: persist,
|
||||||
|
staged: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stage a `changeset` to be committed later with [`commit`].
|
||||||
|
///
|
||||||
|
/// [`commit`]: Self::commit
|
||||||
|
pub fn stage(&mut self, changeset: C) {
|
||||||
|
self.staged.append(changeset)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the changes that have not been committed yet.
|
||||||
|
pub fn staged(&self) -> &C {
|
||||||
|
&self.staged
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Take the changes that have not been committed yet.
|
||||||
|
///
|
||||||
|
/// New staged is set to default;
|
||||||
|
pub fn take_staged(&mut self) -> C {
|
||||||
|
mem::take(&mut self.staged)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Commit the staged changes to the underlying persistence backend.
|
||||||
|
///
|
||||||
|
/// Changes that are committed (if any) are returned.
|
||||||
|
///
|
||||||
|
/// # Error
|
||||||
|
///
|
||||||
|
/// Returns a backend-defined error if this fails.
|
||||||
|
pub async fn commit(&mut self) -> Result<Option<C>, P::WriteError> {
|
||||||
|
if self.staged().is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let staged = self.take_staged();
|
||||||
|
self.write_changes(&staged)
|
||||||
|
.await
|
||||||
|
// if written successfully, take and return `self.stage`
|
||||||
|
.map(|_| Some(staged))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stages a new changeset and commits it (along with any other previously staged changes) to
|
||||||
|
/// the persistence backend
|
||||||
|
///
|
||||||
|
/// Convenience method for calling [`stage`] and then [`commit`].
|
||||||
|
///
|
||||||
|
/// [`stage`]: Self::stage
|
||||||
|
/// [`commit`]: Self::commit
|
||||||
|
pub async fn stage_and_commit(&mut self, changeset: C) -> Result<Option<C>, P::WriteError> {
|
||||||
|
self.stage(changeset);
|
||||||
|
self.commit().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
extern crate core;
|
extern crate core;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user