From 54b0c11cbe1d08eb955e50f0ac719a0b19e3032a Mon Sep 17 00:00:00 2001 From: Steve Myers Date: Sat, 1 Jun 2024 22:45:11 -0500 Subject: [PATCH] feat(persist): add PersistAsync trait and StagedPersistAsync struct --- crates/chain/Cargo.toml | 2 + crates/chain/src/persist.rs | 137 ++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/crates/chain/Cargo.toml b/crates/chain/Cargo.toml index a2f355df..e6adfe5e 100644 --- a/crates/chain/Cargo.toml +++ b/crates/chain/Cargo.toml @@ -19,6 +19,7 @@ serde_crate = { package = "serde", version = "1", optional = true, features = [" # Use hashbrown as a feature flag to have HashSet and HashMap from it. hashbrown = { version = "0.9.1", optional = true, features = ["serde"] } miniscript = { version = "12.0.0", optional = true, default-features = false } +async-trait = { version = "0.1.80", optional = true } [dev-dependencies] rand = "0.8" @@ -29,3 +30,4 @@ default = ["std", "miniscript", "persist"] std = ["bitcoin/std", "miniscript?/std"] serde = ["serde_crate", "bitcoin/serde", "miniscript?/serde"] persist = ["miniscript"] +async = ["async-trait"] diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs index d52ebdf5..a953e724 100644 --- a/crates/chain/src/persist.rs +++ b/crates/chain/src/persist.rs @@ -8,6 +8,10 @@ //! typically persisted together. use crate::{indexed_tx_graph, keychain, local_chain, Anchor, Append}; +#[cfg(feature = "async")] +use alloc::boxed::Box; +#[cfg(feature = "async")] +use async_trait::async_trait; use bitcoin::Network; use core::convert::Infallible; use core::default::Default; @@ -122,6 +126,48 @@ impl Persist for () { } } +#[cfg(feature = "async")] +/// An async persistence backend for writing and loading changesets. +/// +/// `C` represents the changeset; a datatype that records changes made to in-memory data structures +/// that are to be persisted, or retrieved from persistence. +#[async_trait] +pub trait PersistAsync { + /// The error the backend returns when it fails to write. + type WriteError: Debug + Display; + + /// The error the backend returns when it fails to load changesets `C`. + type LoadError: Debug + Display; + + /// Writes a changeset to the persistence backend. + /// + /// It is up to the backend what it does with this. It could store every changeset in a list or + /// it inserts the actual changes into a more structured database. All it needs to guarantee is + /// that [`load_from_persistence`] restores a keychain tracker to what it should be if all + /// changesets had been applied sequentially. + /// + /// [`load_from_persistence`]: Self::load_changes + async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>; + + /// Return the aggregate changeset `C` from persistence. + async fn load_changes(&mut self) -> Result, Self::LoadError>; +} + +#[cfg(feature = "async")] +#[async_trait] +impl PersistAsync for () { + type WriteError = Infallible; + type LoadError = Infallible; + + async fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> { + Ok(()) + } + + async fn load_changes(&mut self) -> Result, Self::LoadError> { + Ok(None) + } +} + /// `StagedPersist` adds a convenient staging area for changesets before they are persisted. /// /// Not all changes to the in-memory representation needs to be written to disk right away, so @@ -208,6 +254,97 @@ where } } +#[cfg(feature = "async")] +/// `StagedPersistAsync` adds a convenient async staging area for changesets before they are persisted. +/// +/// Not all changes to the in-memory representation needs to be written to disk right away, so +/// [`StagedPersistAsync::stage`] can be used to *stage* changes first and then +/// [`StagedPersistAsync::commit`] can be used to write changes to disk. +pub struct StagedPersistAsync> { + inner: P, + staged: C, +} + +#[cfg(feature = "async")] +#[async_trait] +impl + Send> PersistAsync for StagedPersistAsync { + type WriteError = P::WriteError; + type LoadError = P::LoadError; + + async fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> { + self.inner.write_changes(changeset).await + } + + async fn load_changes(&mut self) -> Result, Self::LoadError> { + self.inner.load_changes().await + } +} + +#[cfg(feature = "async")] +impl StagedPersistAsync +where + C: Default + Append + Send + Sync, + P: PersistAsync + Send, +{ + /// Create a new [`StagedPersistAsync`] adding staging to an inner data store that implements + /// [`PersistAsync`]. + pub fn new(persist: P) -> Self { + Self { + inner: persist, + staged: Default::default(), + } + } + + /// Stage a `changeset` to be committed later with [`commit`]. + /// + /// [`commit`]: Self::commit + pub fn stage(&mut self, changeset: C) { + self.staged.append(changeset) + } + + /// Get the changes that have not been committed yet. + pub fn staged(&self) -> &C { + &self.staged + } + + /// Take the changes that have not been committed yet. + /// + /// New staged is set to default; + pub fn take_staged(&mut self) -> C { + mem::take(&mut self.staged) + } + + /// Commit the staged changes to the underlying persistence backend. + /// + /// Changes that are committed (if any) are returned. + /// + /// # Error + /// + /// Returns a backend-defined error if this fails. + pub async fn commit(&mut self) -> Result, P::WriteError> { + if self.staged().is_empty() { + return Ok(None); + } + let staged = self.take_staged(); + self.write_changes(&staged) + .await + // if written successfully, take and return `self.stage` + .map(|_| Some(staged)) + } + + /// Stages a new changeset and commits it (along with any other previously staged changes) to + /// the persistence backend + /// + /// Convenience method for calling [`stage`] and then [`commit`]. + /// + /// [`stage`]: Self::stage + /// [`commit`]: Self::commit + pub async fn stage_and_commit(&mut self, changeset: C) -> Result, P::WriteError> { + self.stage(changeset); + self.commit().await + } +} + #[cfg(test)] mod test { extern crate core;