refactor(file_store): Use BufReader but simplify
This commit is contained in:
parent
c871764670
commit
66dc34e75a
@ -7,8 +7,6 @@ use std::{
|
|||||||
|
|
||||||
use crate::bincode_options;
|
use crate::bincode_options;
|
||||||
|
|
||||||
type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
|
|
||||||
|
|
||||||
/// Iterator over entries in a file store.
|
/// Iterator over entries in a file store.
|
||||||
///
|
///
|
||||||
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
|
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
|
||||||
@ -16,8 +14,9 @@ type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
|
|||||||
///
|
///
|
||||||
/// [`next`]: Self::next
|
/// [`next`]: Self::next
|
||||||
pub struct EntryIter<'t, T> {
|
pub struct EntryIter<'t, T> {
|
||||||
db_file: Option<EntryReader<'t>>,
|
/// Buffered reader around the file
|
||||||
|
db_file: BufReader<&'t mut File>,
|
||||||
|
finished: bool,
|
||||||
/// The file position for the first read of `db_file`.
|
/// The file position for the first read of `db_file`.
|
||||||
start_pos: Option<u64>,
|
start_pos: Option<u64>,
|
||||||
types: PhantomData<T>,
|
types: PhantomData<T>,
|
||||||
@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> {
|
|||||||
impl<'t, T> EntryIter<'t, T> {
|
impl<'t, T> EntryIter<'t, T> {
|
||||||
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
|
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db_file: Some(CountingReader::new(BufReader::new(db_file))),
|
db_file: BufReader::new(db_file),
|
||||||
start_pos: Some(start_pos),
|
start_pos: Some(start_pos),
|
||||||
|
finished: false,
|
||||||
types: PhantomData,
|
types: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -40,45 +40,34 @@ where
|
|||||||
type Item = Result<T, IterError>;
|
type Item = Result<T, IterError>;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
// closure which reads a single entry starting from `self.pos`
|
if self.finished {
|
||||||
let read_one =
|
return None;
|
||||||
|f: &mut EntryReader, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
|
|
||||||
if let Some(pos) = start_pos {
|
|
||||||
f.seek(io::SeekFrom::Start(pos))?;
|
|
||||||
}
|
|
||||||
match bincode_options().deserialize_from(&mut *f) {
|
|
||||||
Ok(changeset) => {
|
|
||||||
f.clear_count();
|
|
||||||
Ok(Some(changeset))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// allow unexpected EOF if 0 bytes were read
|
|
||||||
if let bincode::ErrorKind::Io(inner) = &*e {
|
|
||||||
if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 {
|
|
||||||
f.clear_count();
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
f.rewind()?;
|
|
||||||
Err(IterError::Bincode(*e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
|
|
||||||
if result.is_err() {
|
|
||||||
self.db_file = None;
|
|
||||||
}
|
}
|
||||||
result.transpose()
|
(|| {
|
||||||
}
|
if let Some(start) = self.start_pos.take() {
|
||||||
}
|
self.db_file.seek(io::SeekFrom::Start(start))?;
|
||||||
|
}
|
||||||
|
|
||||||
impl<'t, T> Drop for EntryIter<'t, T> {
|
let pos_before_read = self.db_file.stream_position()?;
|
||||||
fn drop(&mut self) {
|
match bincode_options().deserialize_from(&mut self.db_file) {
|
||||||
if let Some(r) = self.db_file.as_mut() {
|
Ok(changeset) => Ok(Some(changeset)),
|
||||||
// This syncs the underlying file's offset with the buffer's position. This way, no data
|
Err(e) => {
|
||||||
// is lost with future reads.
|
self.finished = true;
|
||||||
let _ = r.stream_position();
|
let pos_after_read = self.db_file.stream_position()?;
|
||||||
}
|
// allow unexpected EOF if 0 bytes were read
|
||||||
|
if let bincode::ErrorKind::Io(inner) = &*e {
|
||||||
|
if inner.kind() == io::ErrorKind::UnexpectedEof
|
||||||
|
&& pos_after_read == pos_before_read
|
||||||
|
{
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
|
||||||
|
Err(IterError::Bincode(*e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})()
|
||||||
|
.transpose()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,51 +96,3 @@ impl From<io::Error> for IterError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for IterError {}
|
impl std::error::Error for IterError {}
|
||||||
|
|
||||||
/// A wrapped [`Reader`] which counts total bytes read.
|
|
||||||
struct CountingReader<R> {
|
|
||||||
r: R,
|
|
||||||
n: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R> CountingReader<R> {
|
|
||||||
fn new(file: R) -> Self {
|
|
||||||
Self { r: file, n: 0 }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Counted bytes read.
|
|
||||||
fn count(&self) -> u64 {
|
|
||||||
self.n
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clear read count.
|
|
||||||
fn clear_count(&mut self) {
|
|
||||||
self.n = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: io::Seek> CountingReader<R> {
|
|
||||||
/// Rewind file descriptor offset to before all counted read operations. Then clear the read
|
|
||||||
/// count.
|
|
||||||
fn rewind(&mut self) -> io::Result<u64> {
|
|
||||||
let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?;
|
|
||||||
self.n = 0;
|
|
||||||
Ok(read)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: io::Read> io::Read for CountingReader<R> {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
let read = self.r.read(&mut *buf)?;
|
|
||||||
self.n += read as u64;
|
|
||||||
Ok(read)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: io::Seek> io::Seek for CountingReader<R> {
|
|
||||||
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
|
||||||
let res = self.r.seek(pos);
|
|
||||||
self.n = 0;
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user