fix(file_store): recover file offset after read
Because we use wrap the file with `BufReader` with the `EntryIter`, we need to sync the `BufReader`'s position with the file's offset when we drop the `EntryIter`. Therefore we have a custom drop impl for `EntryIter`.
This commit is contained in:
parent
66dc34e75a
commit
51bd01b3dd
@ -71,6 +71,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'t, T> Drop for EntryIter<'t, T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// This syncs the underlying file's offset with the buffer's position. This way, we
|
||||||
|
// maintain the correct position to start the next read/write.
|
||||||
|
if let Ok(pos) = self.db_file.stream_position() {
|
||||||
|
let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Error type for [`EntryIter`].
|
/// Error type for [`EntryIter`].
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum IterError {
|
pub enum IterError {
|
||||||
|
@ -410,4 +410,50 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn write_after_short_read() {
|
||||||
|
let temp_dir = tempfile::tempdir().unwrap();
|
||||||
|
|
||||||
|
let changesets = (0..20)
|
||||||
|
.map(|n| TestChangeSet::from([format!("{}", n)]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let last_changeset = TestChangeSet::from(["last".into()]);
|
||||||
|
|
||||||
|
for read_count in 0..changesets.len() {
|
||||||
|
let file_path = temp_dir.path().join(format!("{}.dat", read_count));
|
||||||
|
println!("Test file: {:?}", file_path);
|
||||||
|
|
||||||
|
// First, we create the file with all the changesets!
|
||||||
|
let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
|
||||||
|
for changeset in &changesets {
|
||||||
|
db.append_changeset(changeset).unwrap();
|
||||||
|
}
|
||||||
|
drop(db);
|
||||||
|
|
||||||
|
// We re-open the file and read `read_count` number of changesets.
|
||||||
|
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
|
||||||
|
let mut exp_aggregation = db
|
||||||
|
.iter_changesets()
|
||||||
|
.take(read_count)
|
||||||
|
.map(|r| r.expect("must read valid changeset"))
|
||||||
|
.fold(TestChangeSet::default(), |mut acc, v| {
|
||||||
|
Append::append(&mut acc, v);
|
||||||
|
acc
|
||||||
|
});
|
||||||
|
// We write after a short read.
|
||||||
|
db.write_changes(&last_changeset)
|
||||||
|
.expect("last write must succeed");
|
||||||
|
Append::append(&mut exp_aggregation, last_changeset.clone());
|
||||||
|
drop(db);
|
||||||
|
|
||||||
|
// We open the file again and check whether aggregate changeset is expected.
|
||||||
|
let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
|
||||||
|
.unwrap()
|
||||||
|
.aggregate_changesets()
|
||||||
|
.expect("must aggregate changesets")
|
||||||
|
.unwrap_or_default();
|
||||||
|
assert_eq!(aggregation, exp_aggregation);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user