diff --git a/CHANGELOG.md b/CHANGELOG.md index 40eb35b69c..072d1feec1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Pearl changelog ## [Unreleased] #### Added +- Check blob validity on index regeneration (#289) - Checksum validation in `Entry::load_data` (#274) - Add fsync to header writing in blob (#243) - Add periodic fsync (#234) diff --git a/src/blob/core.rs b/src/blob/core.rs index d074211122..2c0d26f5e6 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -568,6 +568,7 @@ impl RawRecords { buf.len() ) })?; + header.validate()?; self.current_offset += self.record_header_size; self.current_offset += header.meta_size(); let data = if read_data { diff --git a/tests/common.rs b/tests/common.rs index 60edbbddae..a97468a7ff 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -201,6 +201,7 @@ pub fn generate_records(count: usize, avg_size: usize) -> Vec<(u32, Vec)> { pub enum CorruptionType { ZeroedAtBegin(u64), + ZeroedAt(u64, u64), } pub fn corrupt_file(path: impl AsRef, corruption_type: CorruptionType) -> Result<()> { @@ -216,6 +217,11 @@ pub fn corrupt_file(path: impl AsRef, corruption_type: CorruptionType) -> file.seek(SeekFrom::Start(0))?; file.write_all(&vec![0u8; write_size as usize])?; } + CorruptionType::ZeroedAt(start, len) => { + let write_size = len.min(size); + file.seek(SeekFrom::Start(start))?; + file.write_all(&vec![0u8; write_size as usize])?; + } } file.sync_all()?; Ok(()) diff --git a/tests/tests.rs b/tests/tests.rs index 825dcfc0a5..e5ee548aa3 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -429,6 +429,43 @@ async fn test_corrupted_index_regeneration() { warn!("elapsed: {:.3}", now.elapsed().as_secs_f64()); } +#[tokio::test] +async fn test_corrupted_blob_index_regeneration() { + use std::mem::size_of; + use common::CorruptionType; + + let now = Instant::now(); + let path = common::init("corrupted_blob_index"); + let storage = common::create_test_storage(&path, 70_000).await.unwrap(); + let records = common::generate_records(100, 10_000); + for (i, data) in &records { + write_one(&storage, *i, data, None).await.unwrap(); + } + storage.close().await.unwrap(); + + let blob_header_size = 2 * size_of::() + size_of::(); + let magic_len_size = size_of::() + size_of::(); + let corruption = CorruptionType::ZeroedAt((blob_header_size + magic_len_size) as u64, 1); + + let blob_file_path = path.join("test.0.blob"); + let index_file_path = path.join("test.0.index"); + assert!(blob_file_path.exists()); + assert!(index_file_path.exists()); + + common::corrupt_file(blob_file_path, corruption).expect("blob corruption failed"); + std::fs::remove_file(index_file_path.clone()).expect("index removal"); + let new_storage = common::create_test_storage(&path, 1_000_000) + .await + .expect("storage should be loaded successfully"); + + assert_eq!(new_storage.corrupted_blobs_count(), 1); + let index_file_path = path.join("test.0.index"); + assert!(!index_file_path.exists()); + + common::clean(new_storage, path).await; + warn!("elapsed: {:.3}", now.elapsed().as_secs_f64()); +} + #[tokio::test] async fn test_index_from_blob() { let now = Instant::now();