Merge bitcoindevkit/bdk#1270: Improve performance of bdk_file_store::EntryIter
				
					
				
			51bd01b3dd3d68c1628d74be9dc6d0f1cf1f15fe fix(file_store): recover file offset after read (志宇) 66dc34e75ab1ec2ae533bd540327721f6226eca1 refactor(file_store): Use BufReader but simplify (LLFourn) c8717646700bdac0f1e13ec499481bd1fee30ffd test(file_store): `last_write_is_short` (志宇) a3aa8b6682a3a13958fd5fbadc4074a1907a78db feat(file_store)!: optimize `EntryIter` by reducing syscalls (志宇) Pull request description: ### Description `EntryIter` performance is improved by reducing syscalls. The underlying file reader is wrapped with `BufReader` (to reduce calls to `read` and `seek`). Two new tests are introduced. One ensures correct behavior when the last changeset write is too short. The other ensures the next write position is correct after a short read. ### Notes to the reviewers This is extracted from #1172 as suggested by https://github.com/bitcoindevkit/bdk/pull/1172#pullrequestreview-1817465627. ### Changelog notice Changed * `EntryIter` performance is improved by reducing syscalls. ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### New Features: * [x] I've added tests for the new feature * [x] I've added docs for the new feature ACKs for top commit: LLFourn: ACK 51bd01b3dd3d68c1628d74be9dc6d0f1cf1f15fe Tree-SHA512: 9c25f9f2032cb2d551f3fe4ac62b856ceeb69a388f037b34674af366c55629a2eaa2b90b1ae4fbd425415ea8d02f44493a6c643b4b1a57f4507e87aa7ade3736
This commit is contained in:
		
						commit
						07116df541
					
				| @ -1,7 +1,7 @@ | |||||||
| use bincode::Options; | use bincode::Options; | ||||||
| use std::{ | use std::{ | ||||||
|     fs::File, |     fs::File, | ||||||
|     io::{self, Seek}, |     io::{self, BufReader, Seek}, | ||||||
|     marker::PhantomData, |     marker::PhantomData, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| @ -14,8 +14,9 @@ use crate::bincode_options; | |||||||
| ///
 | ///
 | ||||||
| /// [`next`]: Self::next
 | /// [`next`]: Self::next
 | ||||||
| pub struct EntryIter<'t, T> { | pub struct EntryIter<'t, T> { | ||||||
|     db_file: Option<&'t mut File>, |     /// Buffered reader around the file
 | ||||||
| 
 |     db_file: BufReader<&'t mut File>, | ||||||
|  |     finished: bool, | ||||||
|     /// The file position for the first read of `db_file`.
 |     /// The file position for the first read of `db_file`.
 | ||||||
|     start_pos: Option<u64>, |     start_pos: Option<u64>, | ||||||
|     types: PhantomData<T>, |     types: PhantomData<T>, | ||||||
| @ -24,8 +25,9 @@ pub struct EntryIter<'t, T> { | |||||||
| impl<'t, T> EntryIter<'t, T> { | impl<'t, T> EntryIter<'t, T> { | ||||||
|     pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { |     pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { | ||||||
|         Self { |         Self { | ||||||
|             db_file: Some(db_file), |             db_file: BufReader::new(db_file), | ||||||
|             start_pos: Some(start_pos), |             start_pos: Some(start_pos), | ||||||
|  |             finished: false, | ||||||
|             types: PhantomData, |             types: PhantomData, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @ -38,44 +40,44 @@ where | |||||||
|     type Item = Result<T, IterError>; |     type Item = Result<T, IterError>; | ||||||
| 
 | 
 | ||||||
|     fn next(&mut self) -> Option<Self::Item> { |     fn next(&mut self) -> Option<Self::Item> { | ||||||
|         // closure which reads a single entry starting from `self.pos`
 |         if self.finished { | ||||||
|         let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> { |             return None; | ||||||
|             let pos = match start_pos { |  | ||||||
|                 Some(pos) => f.seek(io::SeekFrom::Start(pos))?, |  | ||||||
|                 None => f.stream_position()?, |  | ||||||
|             }; |  | ||||||
| 
 |  | ||||||
|             match bincode_options().deserialize_from(&*f) { |  | ||||||
|                 Ok(changeset) => { |  | ||||||
|                     f.stream_position()?; |  | ||||||
|                     Ok(Some(changeset)) |  | ||||||
|         } |         } | ||||||
|  |         (|| { | ||||||
|  |             if let Some(start) = self.start_pos.take() { | ||||||
|  |                 self.db_file.seek(io::SeekFrom::Start(start))?; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             let pos_before_read = self.db_file.stream_position()?; | ||||||
|  |             match bincode_options().deserialize_from(&mut self.db_file) { | ||||||
|  |                 Ok(changeset) => Ok(Some(changeset)), | ||||||
|                 Err(e) => { |                 Err(e) => { | ||||||
|  |                     self.finished = true; | ||||||
|  |                     let pos_after_read = self.db_file.stream_position()?; | ||||||
|  |                     // allow unexpected EOF if 0 bytes were read
 | ||||||
|                     if let bincode::ErrorKind::Io(inner) = &*e { |                     if let bincode::ErrorKind::Io(inner) = &*e { | ||||||
|                         if inner.kind() == io::ErrorKind::UnexpectedEof { |                         if inner.kind() == io::ErrorKind::UnexpectedEof | ||||||
|                             let eof = f.seek(io::SeekFrom::End(0))?; |                             && pos_after_read == pos_before_read | ||||||
|                             if pos == eof { |                         { | ||||||
|                             return Ok(None); |                             return Ok(None); | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                     } |                     self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; | ||||||
|                     f.seek(io::SeekFrom::Start(pos))?; |  | ||||||
|                     Err(IterError::Bincode(*e)) |                     Err(IterError::Bincode(*e)) | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         }; |         })() | ||||||
| 
 |         .transpose() | ||||||
|         let result = read_one(self.db_file.as_mut()?, self.start_pos.take()); |  | ||||||
|         if result.is_err() { |  | ||||||
|             self.db_file = None; |  | ||||||
|         } |  | ||||||
|         result.transpose() |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl From<io::Error> for IterError { | impl<'t, T> Drop for EntryIter<'t, T> { | ||||||
|     fn from(value: io::Error) -> Self { |     fn drop(&mut self) { | ||||||
|         IterError::Io(value) |         // This syncs the underlying file's offset with the buffer's position. This way, we
 | ||||||
|  |         // maintain the correct position to start the next read/write.
 | ||||||
|  |         if let Ok(pos) = self.db_file.stream_position() { | ||||||
|  |             let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos)); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -97,4 +99,10 @@ impl core::fmt::Display for IterError { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | impl From<io::Error> for IterError { | ||||||
|  |     fn from(value: io::Error) -> Self { | ||||||
|  |         IterError::Io(value) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl std::error::Error for IterError {} | impl std::error::Error for IterError {} | ||||||
|  | |||||||
| @ -219,6 +219,7 @@ mod test { | |||||||
| 
 | 
 | ||||||
|     use bincode::DefaultOptions; |     use bincode::DefaultOptions; | ||||||
|     use std::{ |     use std::{ | ||||||
|  |         collections::BTreeSet, | ||||||
|         io::{Read, Write}, |         io::{Read, Write}, | ||||||
|         vec::Vec, |         vec::Vec, | ||||||
|     }; |     }; | ||||||
| @ -228,7 +229,7 @@ mod test { | |||||||
|     const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = |     const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = | ||||||
|         [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; |         [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; | ||||||
| 
 | 
 | ||||||
|     type TestChangeSet = Vec<String>; |     type TestChangeSet = BTreeSet<String>; | ||||||
| 
 | 
 | ||||||
|     #[derive(Debug)] |     #[derive(Debug)] | ||||||
|     struct TestTracker; |     struct TestTracker; | ||||||
| @ -253,7 +254,7 @@ mod test { | |||||||
|     fn open_or_create_new() { |     fn open_or_create_new() { | ||||||
|         let temp_dir = tempfile::tempdir().unwrap(); |         let temp_dir = tempfile::tempdir().unwrap(); | ||||||
|         let file_path = temp_dir.path().join("db_file"); |         let file_path = temp_dir.path().join("db_file"); | ||||||
|         let changeset = vec!["hello".to_string(), "world".to_string()]; |         let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); | ||||||
| 
 | 
 | ||||||
|         { |         { | ||||||
|             let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) |             let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) | ||||||
| @ -304,7 +305,7 @@ mod test { | |||||||
|         let mut data = [255_u8; 2000]; |         let mut data = [255_u8; 2000]; | ||||||
|         data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); |         data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); | ||||||
| 
 | 
 | ||||||
|         let changeset = vec!["one".into(), "two".into(), "three!".into()]; |         let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); | ||||||
| 
 | 
 | ||||||
|         let mut file = NamedTempFile::new().unwrap(); |         let mut file = NamedTempFile::new().unwrap(); | ||||||
|         file.write_all(&data).expect("should write"); |         file.write_all(&data).expect("should write"); | ||||||
| @ -340,4 +341,119 @@ mod test { | |||||||
| 
 | 
 | ||||||
|         assert_eq!(got_bytes, expected_bytes); |         assert_eq!(got_bytes, expected_bytes); | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     #[test] | ||||||
|  |     fn last_write_is_short() { | ||||||
|  |         let temp_dir = tempfile::tempdir().unwrap(); | ||||||
|  | 
 | ||||||
|  |         let changesets = [ | ||||||
|  |             TestChangeSet::from(["1".into()]), | ||||||
|  |             TestChangeSet::from(["2".into(), "3".into()]), | ||||||
|  |             TestChangeSet::from(["4".into(), "5".into(), "6".into()]), | ||||||
|  |         ]; | ||||||
|  |         let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); | ||||||
|  |         let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); | ||||||
|  | 
 | ||||||
|  |         for short_write_len in 1..last_changeset_bytes.len() - 1 { | ||||||
|  |             let file_path = temp_dir.path().join(format!("{}.dat", short_write_len)); | ||||||
|  |             println!("Test file: {:?}", file_path); | ||||||
|  | 
 | ||||||
|  |             // simulate creating a file, writing data where the last write is incomplete
 | ||||||
|  |             { | ||||||
|  |                 let mut db = | ||||||
|  |                     Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); | ||||||
|  |                 for changeset in &changesets { | ||||||
|  |                     db.append_changeset(changeset).unwrap(); | ||||||
|  |                 } | ||||||
|  |                 // this is the incomplete write
 | ||||||
|  |                 db.db_file | ||||||
|  |                     .write_all(&last_changeset_bytes[..short_write_len]) | ||||||
|  |                     .unwrap(); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             // load file again and aggregate changesets
 | ||||||
|  |             // write the last changeset again (this time it succeeds)
 | ||||||
|  |             { | ||||||
|  |                 let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); | ||||||
|  |                 let err = db | ||||||
|  |                     .aggregate_changesets() | ||||||
|  |                     .expect_err("should return error as last read is short"); | ||||||
|  |                 assert_eq!( | ||||||
|  |                     err.changeset, | ||||||
|  |                     changesets.iter().cloned().reduce(|mut acc, cs| { | ||||||
|  |                         Append::append(&mut acc, cs); | ||||||
|  |                         acc | ||||||
|  |                     }), | ||||||
|  |                     "should recover all changesets that are written in full", | ||||||
|  |                 ); | ||||||
|  |                 db.db_file.write_all(&last_changeset_bytes).unwrap(); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             // load file again - this time we should successfully aggregate all changesets
 | ||||||
|  |             { | ||||||
|  |                 let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); | ||||||
|  |                 let aggregated_changesets = db | ||||||
|  |                     .aggregate_changesets() | ||||||
|  |                     .expect("aggregating all changesets should succeed"); | ||||||
|  |                 assert_eq!( | ||||||
|  |                     aggregated_changesets, | ||||||
|  |                     changesets | ||||||
|  |                         .iter() | ||||||
|  |                         .cloned() | ||||||
|  |                         .chain(core::iter::once(last_changeset.clone())) | ||||||
|  |                         .reduce(|mut acc, cs| { | ||||||
|  |                             Append::append(&mut acc, cs); | ||||||
|  |                             acc | ||||||
|  |                         }), | ||||||
|  |                     "should recover all changesets", | ||||||
|  |                 ); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     #[test] | ||||||
|  |     fn write_after_short_read() { | ||||||
|  |         let temp_dir = tempfile::tempdir().unwrap(); | ||||||
|  | 
 | ||||||
|  |         let changesets = (0..20) | ||||||
|  |             .map(|n| TestChangeSet::from([format!("{}", n)])) | ||||||
|  |             .collect::<Vec<_>>(); | ||||||
|  |         let last_changeset = TestChangeSet::from(["last".into()]); | ||||||
|  | 
 | ||||||
|  |         for read_count in 0..changesets.len() { | ||||||
|  |             let file_path = temp_dir.path().join(format!("{}.dat", read_count)); | ||||||
|  |             println!("Test file: {:?}", file_path); | ||||||
|  | 
 | ||||||
|  |             // First, we create the file with all the changesets!
 | ||||||
|  |             let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); | ||||||
|  |             for changeset in &changesets { | ||||||
|  |                 db.append_changeset(changeset).unwrap(); | ||||||
|  |             } | ||||||
|  |             drop(db); | ||||||
|  | 
 | ||||||
|  |             // We re-open the file and read `read_count` number of changesets.
 | ||||||
|  |             let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); | ||||||
|  |             let mut exp_aggregation = db | ||||||
|  |                 .iter_changesets() | ||||||
|  |                 .take(read_count) | ||||||
|  |                 .map(|r| r.expect("must read valid changeset")) | ||||||
|  |                 .fold(TestChangeSet::default(), |mut acc, v| { | ||||||
|  |                     Append::append(&mut acc, v); | ||||||
|  |                     acc | ||||||
|  |                 }); | ||||||
|  |             // We write after a short read.
 | ||||||
|  |             db.write_changes(&last_changeset) | ||||||
|  |                 .expect("last write must succeed"); | ||||||
|  |             Append::append(&mut exp_aggregation, last_changeset.clone()); | ||||||
|  |             drop(db); | ||||||
|  | 
 | ||||||
|  |             // We open the file again and check whether aggregate changeset is expected.
 | ||||||
|  |             let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path) | ||||||
|  |                 .unwrap() | ||||||
|  |                 .aggregate_changesets() | ||||||
|  |                 .expect("must aggregate changesets") | ||||||
|  |                 .unwrap_or_default(); | ||||||
|  |             assert_eq!(aggregation, exp_aggregation); | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user