From 66dc34e75ab1ec2ae533bd540327721f6226eca1 Mon Sep 17 00:00:00 2001 From: LLFourn Date: Mon, 22 Jan 2024 13:48:48 +1100 Subject: [PATCH] refactor(file_store): Use BufReader but simplify --- crates/file_store/src/entry_iter.rs | 121 +++++++--------------------- 1 file changed, 31 insertions(+), 90 deletions(-) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index d95a67f8..e5e70b3b 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -7,8 +7,6 @@ use std::{ use crate::bincode_options; -type EntryReader<'t> = CountingReader>; - /// Iterator over entries in a file store. /// /// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the @@ -16,8 +14,9 @@ type EntryReader<'t> = CountingReader>; /// /// [`next`]: Self::next pub struct EntryIter<'t, T> { - db_file: Option>, - + /// Buffered reader around the file + db_file: BufReader<&'t mut File>, + finished: bool, /// The file position for the first read of `db_file`. start_pos: Option, types: PhantomData, @@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> { impl<'t, T> EntryIter<'t, T> { pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { Self { - db_file: Some(CountingReader::new(BufReader::new(db_file))), + db_file: BufReader::new(db_file), start_pos: Some(start_pos), + finished: false, types: PhantomData, } } @@ -40,45 +40,34 @@ where type Item = Result; fn next(&mut self) -> Option { - // closure which reads a single entry starting from `self.pos` - let read_one = - |f: &mut EntryReader, start_pos: Option| -> Result, IterError> { - if let Some(pos) = start_pos { - f.seek(io::SeekFrom::Start(pos))?; - } - match bincode_options().deserialize_from(&mut *f) { - Ok(changeset) => { - f.clear_count(); - Ok(Some(changeset)) - } - Err(e) => { - // allow unexpected EOF if 0 bytes were read - if let bincode::ErrorKind::Io(inner) = &*e { - if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 { - f.clear_count(); - return Ok(None); - } - } - f.rewind()?; - Err(IterError::Bincode(*e)) - } - } - }; - let result = read_one(self.db_file.as_mut()?, self.start_pos.take()); - if result.is_err() { - self.db_file = None; + if self.finished { + return None; } - result.transpose() - } -} + (|| { + if let Some(start) = self.start_pos.take() { + self.db_file.seek(io::SeekFrom::Start(start))?; + } -impl<'t, T> Drop for EntryIter<'t, T> { - fn drop(&mut self) { - if let Some(r) = self.db_file.as_mut() { - // This syncs the underlying file's offset with the buffer's position. This way, no data - // is lost with future reads. - let _ = r.stream_position(); - } + let pos_before_read = self.db_file.stream_position()?; + match bincode_options().deserialize_from(&mut self.db_file) { + Ok(changeset) => Ok(Some(changeset)), + Err(e) => { + self.finished = true; + let pos_after_read = self.db_file.stream_position()?; + // allow unexpected EOF if 0 bytes were read + if let bincode::ErrorKind::Io(inner) = &*e { + if inner.kind() == io::ErrorKind::UnexpectedEof + && pos_after_read == pos_before_read + { + return Ok(None); + } + } + self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; + Err(IterError::Bincode(*e)) + } + } + })() + .transpose() } } @@ -107,51 +96,3 @@ impl From for IterError { } impl std::error::Error for IterError {} - -/// A wrapped [`Reader`] which counts total bytes read. -struct CountingReader { - r: R, - n: u64, -} - -impl CountingReader { - fn new(file: R) -> Self { - Self { r: file, n: 0 } - } - - /// Counted bytes read. - fn count(&self) -> u64 { - self.n - } - - /// Clear read count. - fn clear_count(&mut self) { - self.n = 0; - } -} - -impl CountingReader { - /// Rewind file descriptor offset to before all counted read operations. Then clear the read - /// count. - fn rewind(&mut self) -> io::Result { - let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?; - self.n = 0; - Ok(read) - } -} - -impl io::Read for CountingReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let read = self.r.read(&mut *buf)?; - self.n += read as u64; - Ok(read) - } -} - -impl io::Seek for CountingReader { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let res = self.r.seek(pos); - self.n = 0; - res - } -}