From 51bd01b3dd3d68c1628d74be9dc6d0f1cf1f15fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 23 Jan 2024 12:32:13 +0800 Subject: [PATCH] fix(file_store): recover file offset after read Because we use wrap the file with `BufReader` with the `EntryIter`, we need to sync the `BufReader`'s position with the file's offset when we drop the `EntryIter`. Therefore we have a custom drop impl for `EntryIter`. --- crates/file_store/src/entry_iter.rs | 10 +++++++ crates/file_store/src/store.rs | 46 +++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index e5e70b3b..6be3fd03 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -71,6 +71,16 @@ where } } +impl<'t, T> Drop for EntryIter<'t, T> { + fn drop(&mut self) { + // This syncs the underlying file's offset with the buffer's position. This way, we + // maintain the correct position to start the next read/write. + if let Ok(pos) = self.db_file.stream_position() { + let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos)); + } + } +} + /// Error type for [`EntryIter`]. #[derive(Debug)] pub enum IterError { diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 83e5272f..0dc45d28 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -410,4 +410,50 @@ mod test { } } } + + #[test] + fn write_after_short_read() { + let temp_dir = tempfile::tempdir().unwrap(); + + let changesets = (0..20) + .map(|n| TestChangeSet::from([format!("{}", n)])) + .collect::>(); + let last_changeset = TestChangeSet::from(["last".into()]); + + for read_count in 0..changesets.len() { + let file_path = temp_dir.path().join(format!("{}.dat", read_count)); + println!("Test file: {:?}", file_path); + + // First, we create the file with all the changesets! + let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + drop(db); + + // We re-open the file and read `read_count` number of changesets. + let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let mut exp_aggregation = db + .iter_changesets() + .take(read_count) + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSet::default(), |mut acc, v| { + Append::append(&mut acc, v); + acc + }); + // We write after a short read. + db.write_changes(&last_changeset) + .expect("last write must succeed"); + Append::append(&mut exp_aggregation, last_changeset.clone()); + drop(db); + + // We open the file again and check whether aggregate changeset is expected. + let aggregation = Store::::open(&TEST_MAGIC_BYTES, &file_path) + .unwrap() + .aggregate_changesets() + .expect("must aggregate changesets") + .unwrap_or_default(); + assert_eq!(aggregation, exp_aggregation); + } + } }