From 4eb9c44972734ba5f81287bf0818918ab0557a53 Mon Sep 17 00:00:00 2001 From: Kould Date: Thu, 18 Jul 2024 17:22:38 +0800 Subject: [PATCH 1/7] feat: impl `Compaction` --- src/arrows/mod.rs | 29 +- src/compaction/mod.rs | 735 ++++++++++++++++++++++++++++++++++++++++ src/fs/tokio.rs | 8 +- src/inmem/immutable.rs | 29 +- src/lib.rs | 11 +- src/ondisk/scan.rs | 2 + src/oracle/timestamp.rs | 18 +- src/record/mod.rs | 6 +- src/record/str.rs | 2 +- src/scope.rs | 4 + src/serdes/string.rs | 17 +- src/stream/level.rs | 42 +++ src/stream/merge.rs | 4 +- src/stream/mod.rs | 25 +- src/version/mod.rs | 32 +- 15 files changed, 924 insertions(+), 40 deletions(-) create mode 100644 src/compaction/mod.rs diff --git a/src/arrows/mod.rs b/src/arrows/mod.rs index 82876964..c1428193 100644 --- a/src/arrows/mod.rs +++ b/src/arrows/mod.rs @@ -61,31 +61,34 @@ where R: Record, { let (lower_key, lower_cmp) = get_range_bound_fn::(range.0); - let (upper_key, upper_cmp) = get_range_bound_fn::(range.0); + let (upper_key, upper_cmp) = get_range_bound_fn::(range.1); - let predictions: Vec> = vec![ - Box::new(ArrowPredicateFn::new( + let mut predictions: Vec> = vec![Box::new(ArrowPredicateFn::new( + ProjectionMask::roots(schema_descriptor, [1]), + move |record_batch| lt_eq(record_batch.column(0), &ts.to_arrow_scalar() as &dyn Datum), + ))]; + if let Some(lower_key) = lower_key { + predictions.push(Box::new(ArrowPredicateFn::new( ProjectionMask::roots(schema_descriptor, [2]), move |record_batch| { lower_cmp( record_batch.column(0), - &lower_key.unwrap().to_arrow_datum() as &dyn Datum, + &lower_key.to_arrow_datum() as &dyn Datum, ) }, - )), - Box::new(ArrowPredicateFn::new( + ))); + } + if let Some(upper_key) = upper_key { + predictions.push(Box::new(ArrowPredicateFn::new( ProjectionMask::roots(schema_descriptor, [2]), move |record_batch| { upper_cmp( record_batch.column(0), - &upper_key.unwrap().to_arrow_datum() as &dyn Datum, + &upper_key.to_arrow_datum() as &dyn Datum, ) }, - )), - Box::new(ArrowPredicateFn::new( - ProjectionMask::roots(schema_descriptor, [1]), - move |record_batch| lt_eq(record_batch.column(0), &ts.to_arrow_scalar() as &dyn Datum), - )), - ]; + ))); + } + RowFilter::new(predictions) } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs new file mode 100644 index 00000000..5fc4d437 --- /dev/null +++ b/src/compaction/mod.rs @@ -0,0 +1,735 @@ +use std::{ + cmp, + collections::{Bound, VecDeque}, + mem, + pin::Pin, + sync::Arc, +}; + +use async_lock::RwLock; +use futures_util::StreamExt; +use parquet::arrow::AsyncArrowWriter; +use thiserror::Error; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use ulid::Ulid; + +use crate::{ + executor::Executor, + fs::FileId, + inmem::immutable::{ArrowArrays, Builder, Immutable}, + ondisk::sstable::SsTable, + record::{KeyRef, Record}, + scope::Scope, + stream::{level::LevelStream, merge::MergeStream, ScanStream}, + version::{edit::VersionEdit, set::VersionSet, Version, VersionError, MAX_LEVEL}, + DbOption, Schema, +}; + +pub(crate) struct Compactor +where + R: Record, + E: Executor, +{ + pub(crate) option: Arc, + pub(crate) schema: Arc>>, + pub(crate) version_set: VersionSet, +} + +impl Compactor +where + R: Record, + E: Executor, +{ + pub(crate) fn new( + schema: Arc>>, + option: Arc, + version_set: VersionSet, + ) -> Self { + Compactor:: { + option, + schema, + version_set, + } + } + + pub(crate) async fn check_then_compaction( + &mut self, + // TODO + // option_tx: Option>, + ) -> Result<(), CompactionError> { + let mut guard = self.schema.write().await; + + if guard.immutables.len() > self.option.immutable_chunk_num { + let excess = guard.immutables.split_off(self.option.immutable_chunk_num); + + if let Some(scope) = + Self::minor_compaction(&self.option, mem::replace(&mut guard.immutables, excess)) + .await? + { + let version_ref = self.version_set.current().await; + let mut version_edits = vec![]; + let mut delete_gens = vec![]; + + if self.option.is_threshold_exceeded_major(&version_ref, 0) { + Self::major_compaction( + &version_ref, + &self.option, + &scope.min, + &scope.max, + &mut version_edits, + &mut delete_gens, + ) + .await?; + } + version_edits.insert(0, VersionEdit::Add { level: 0, scope }); + + self.version_set + .apply_edits(version_edits, Some(delete_gens), false) + .await + .map_err(CompactionError::Version)?; + } + } + // TODO + // if let Some(tx) = option_tx { + // let _ = tx.send(()); + // } + Ok(()) + } + + pub(crate) async fn minor_compaction( + option: &DbOption, + batches: VecDeque>, + ) -> Result>, CompactionError> { + if !batches.is_empty() { + let mut min = None; + let mut max = None; + + let gen = FileId::new(); + // TODO: WAL CLEAN + // let mut wal_ids = Vec::with_capacity(batches.len()); + + let mut writer = AsyncArrowWriter::try_new( + E::open(option.table_path(&gen)) + .await + .map_err(CompactionError::Io)? + .compat(), + R::arrow_schema().clone(), + option.write_parquet_option.clone(), + ) + .map_err(CompactionError::Parquet)?; + + for batch in batches { + if let (Some(batch_min), Some(batch_max)) = batch.scope() { + if matches!(min.as_ref().map(|min| min > batch_min), Some(true) | None) { + min = Some(batch_min.clone()) + } + if matches!(max.as_ref().map(|max| max < batch_max), Some(true) | None) { + max = Some(batch_max.clone()) + } + } + writer + .write(batch.as_record_batch()) + .await + .map_err(CompactionError::Parquet)?; + // TODO: WAL CLEAN + // wal_ids.push(wal_id); + } + writer.close().await.map_err(CompactionError::Parquet)?; + return Ok(Some(Scope { + min: min.ok_or(CompactionError::EmptyLevel)?, + max: max.ok_or(CompactionError::EmptyLevel)?, + gen, + // TODO: WAL CLEAN + wal_ids: None, + // wal_ids: Some(wal_ids), + })); + } + Ok(None) + } + + pub(crate) async fn major_compaction( + version: &Version, + option: &DbOption, + mut min: &R::Key, + mut max: &R::Key, + version_edits: &mut Vec>, + delete_gens: &mut Vec, + ) -> Result<(), CompactionError> { + let mut level = 0; + + while level < MAX_LEVEL - 2 { + if !option.is_threshold_exceeded_major(version, level) { + break; + } + + let mut meet_scopes_l = Vec::new(); + let start_l = Version::::scope_search(min, &version.level_slice[level]); + let mut end_l = start_l; + { + for scope in version.level_slice[level][start_l..].iter() { + if scope.contains(min) || scope.contains(max) { + meet_scopes_l.push(scope); + end_l += 1; + } else { + break; + } + } + if meet_scopes_l.is_empty() { + return Ok(()); + } + } + let mut meet_scopes_ll = Vec::new(); + let mut start_ll = 0; + let mut end_ll = 0; + { + if !version.level_slice[level + 1].is_empty() { + let min_key = &meet_scopes_l + .first() + .ok_or(CompactionError::EmptyLevel)? + .min; + let max_key = &meet_scopes_l.last().ok_or(CompactionError::EmptyLevel)?.max; + min = min_key; + max = max_key; + + start_ll = + Version::::scope_search(min_key, &version.level_slice[level + 1]); + end_ll = + Version::::scope_search(max_key, &version.level_slice[level + 1]); + + let next_level_len = version.level_slice[level + 1].len(); + for scope in version.level_slice[level + 1] + [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] + .iter() + { + if scope.contains(min) || scope.contains(max) { + meet_scopes_ll.push(scope); + } + } + } + } + let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len()); + + // This Level + if level == 0 { + for scope in meet_scopes_l.iter() { + let file = E::open(option.table_path(&scope.gen)) + .await + .map_err(CompactionError::Io)?; + + streams.push(ScanStream::SsTable { + inner: SsTable::open(file) + .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into()) + .await + .map_err(CompactionError::Parquet)?, + }); + } + } else { + let (lower, upper) = Self::full_scope(&mut meet_scopes_l)?; + let level_scan_l = LevelStream::new( + version, + level, + start_l, + end_l, + (Bound::Included(lower), Bound::Included(upper)), + u32::MAX.into(), + ) + .ok_or(CompactionError::EmptyLevel)?; + + streams.push(ScanStream::Level { + inner: level_scan_l, + }); + } + // Next Level + let (lower, upper) = Self::full_scope(&mut meet_scopes_ll)?; + let level_scan_ll = LevelStream::new( + version, + level + 1, + start_ll, + end_ll, + (Bound::Included(lower), Bound::Included(upper)), + u32::MAX.into(), + ) + .ok_or(CompactionError::EmptyLevel)?; + + streams.push(ScanStream::Level { + inner: level_scan_ll, + }); + let mut stream = MergeStream::::from_vec(streams) + .await + .map_err(CompactionError::Parquet)?; + + // Kould: is the capacity parameter necessary? + let mut builder = R::Columns::builder(8192); + let mut written_size = 0; + let mut min = None; + let mut max = None; + + while let Some(result) = Pin::new(&mut stream).next().await { + let entry = result.map_err(CompactionError::Parquet)?; + let key = entry.key(); + + if min.is_none() { + min = Some(key.value.to_key()) + } + max = Some(key.value.to_key()); + + written_size += key.size(); + builder.push(key, Some(entry.value())); + + if written_size >= option.max_sst_file_size { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + written_size = 0; + } + } + if written_size > 0 { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + } + for scope in meet_scopes_l { + version_edits.push(VersionEdit::Remove { + level: level as u8, + gen: scope.gen, + }); + delete_gens.push(scope.gen); + } + for scope in meet_scopes_ll { + version_edits.push(VersionEdit::Remove { + level: (level + 1) as u8, + gen: scope.gen, + }); + delete_gens.push(scope.gen); + } + level += 1; + } + + Ok(()) + } + + fn full_scope<'a>( + meet_scopes: &[&'a Scope<::Key>], + ) -> Result<(&'a ::Key, &'a ::Key), CompactionError> { + let lower = &meet_scopes.first().ok_or(CompactionError::EmptyLevel)?.min; + let upper = &meet_scopes.last().ok_or(CompactionError::EmptyLevel)?.max; + Ok((lower, upper)) + } + + async fn build_table( + option: &DbOption, + version_edits: &mut Vec>, + level: usize, + builder: &mut ::Builder, + min: &mut Option, + max: &mut Option, + ) -> Result<(), CompactionError> { + assert!(min.is_some()); + assert!(max.is_some()); + + let gen = Ulid::new(); + let columns = builder.finish(); + let mut writer = AsyncArrowWriter::try_new( + E::open(option.table_path(&gen)) + .await + .map_err(CompactionError::Io)? + .compat(), + R::arrow_schema().clone(), + option.write_parquet_option.clone(), + ) + .map_err(CompactionError::Parquet)?; + writer + .write(columns.as_record_batch()) + .await + .map_err(CompactionError::Parquet)?; + writer.close().await.map_err(CompactionError::Parquet)?; + version_edits.push(VersionEdit::Add { + level: (level + 1) as u8, + scope: Scope { + min: min.take().ok_or(CompactionError::EmptyLevel)?, + max: max.take().ok_or(CompactionError::EmptyLevel)?, + gen, + wal_ids: None, + }, + }); + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum CompactionError +where + R: Record, +{ + #[error("compaction io error: {0}")] + Io(#[source] std::io::Error), + #[error("compaction parquet error: {0}")] + Parquet(#[source] parquet::errors::ParquetError), + #[error("compaction version error: {0}")] + Version(#[source] VersionError), + #[error("the level being compacted does not have a table")] + EmptyLevel, +} + +#[cfg(test)] +mod tests { + use std::{collections::VecDeque, sync::Arc}; + + use flume::bounded; + use parquet::{arrow::AsyncArrowWriter, errors::ParquetError}; + use tempfile::TempDir; + use tokio_util::compat::FuturesAsyncReadCompatExt; + + use crate::{ + compaction::Compactor, + executor::{tokio::TokioExecutor, Executor}, + fs::FileId, + inmem::{immutable::Immutable, mutable::Mutable}, + record::Record, + scope::Scope, + tests::Test, + version::{edit::VersionEdit, Version}, + DbOption, + }; + + fn build_immutable( + fn_mutable: impl FnOnce(&mut Mutable), + ) -> Immutable { + let mut mutable = Mutable::new(); + + fn_mutable(&mut mutable); + Immutable::from(mutable) + } + + async fn build_parquet_table( + option: &DbOption, + gen: FileId, + fn_mutable: impl FnOnce(&mut Mutable), + ) -> Result<(), ParquetError> { + let immutable = build_immutable(fn_mutable); + let mut writer = AsyncArrowWriter::try_new( + E::open(option.table_path(&gen)) + .await + .map_err(ParquetError::from)? + .compat(), + R::arrow_schema().clone(), + None, + )?; + writer.write(immutable.as_record_batch()).await?; + writer.close().await?; + + Ok(()) + } + + #[tokio::test] + async fn minor_compaction() { + let temp_dir = tempfile::tempdir().unwrap(); + + let batch_1 = build_immutable(|mutable| { + mutable.insert( + Test { + vstring: 3.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 5.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 6.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }); + let batch_2 = build_immutable(|mutable| { + mutable.insert( + Test { + vstring: 4.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 2.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 1.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }); + + let scope = Compactor::::minor_compaction( + &DbOption::new(temp_dir.path()), + VecDeque::from(vec![batch_2, batch_1]), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(scope.min, 1.to_string()); + assert_eq!(scope.max, 6.to_string()); + } + + #[tokio::test] + async fn major_compaction() { + let temp_dir = TempDir::new().unwrap(); + + let mut option = DbOption::new(temp_dir.path()); + option.major_threshold_with_sst_size = 2; + let option = Arc::new(option); + + // level 0 + let table_gen_1 = FileId::new(); + let table_gen_2 = FileId::new(); + build_parquet_table::(&option, table_gen_1, |mutable| { + mutable.insert( + Test { + vstring: 1.to_string(), + vu32: 0, + vobool: None, + }, + 1.into(), + ); + mutable.insert( + Test { + vstring: 2.to_string(), + vu32: 0, + vobool: None, + }, + 1.into(), + ); + mutable.insert( + Test { + vstring: 3.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }) + .await + .unwrap(); + build_parquet_table::(&option, table_gen_2, |mutable| { + mutable.insert( + Test { + vstring: 4.to_string(), + vu32: 0, + vobool: None, + }, + 1.into(), + ); + mutable.insert( + Test { + vstring: 5.to_string(), + vu32: 0, + vobool: None, + }, + 1.into(), + ); + mutable.insert( + Test { + vstring: 6.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }) + .await + .unwrap(); + + // level 1 + let table_gen_3 = FileId::new(); + let table_gen_4 = FileId::new(); + let table_gen_5 = FileId::new(); + build_parquet_table::(&option, table_gen_3, |mutable| { + mutable.insert( + Test { + vstring: 1.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 2.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 3.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }) + .await + .unwrap(); + build_parquet_table::(&option, table_gen_4, |mutable| { + mutable.insert( + Test { + vstring: 4.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 5.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 6.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }) + .await + .unwrap(); + build_parquet_table::(&option, table_gen_5, |mutable| { + mutable.insert( + Test { + vstring: 7.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 8.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + mutable.insert( + Test { + vstring: 9.to_string(), + vu32: 0, + vobool: None, + }, + 0.into(), + ); + }) + .await + .unwrap(); + + let (sender, _) = bounded(1); + let mut version = Version::::new(option.clone(), sender); + version.level_slice[0].push(Scope { + min: 1.to_string(), + max: 3.to_string(), + gen: table_gen_1, + wal_ids: None, + }); + version.level_slice[0].push(Scope { + min: 4.to_string(), + max: 6.to_string(), + gen: table_gen_2, + wal_ids: None, + }); + version.level_slice[1].push(Scope { + min: 1.to_string(), + max: 3.to_string(), + gen: table_gen_3, + wal_ids: None, + }); + version.level_slice[1].push(Scope { + min: 4.to_string(), + max: 6.to_string(), + gen: table_gen_4, + wal_ids: None, + }); + version.level_slice[1].push(Scope { + min: 7.to_string(), + max: 9.to_string(), + gen: table_gen_5, + wal_ids: None, + }); + + let min = 2.to_string(); + let max = 5.to_string(); + let mut version_edits = Vec::new(); + + Compactor::::major_compaction( + &version, + &option, + &min, + &max, + &mut version_edits, + &mut vec![], + ) + .await + .unwrap(); + if let VersionEdit::Add { level, scope } = &version_edits[0] { + assert_eq!(*level, 1); + assert_eq!(scope.min, 1.to_string()); + assert_eq!(scope.max, 6.to_string()); + } + assert_eq!( + version_edits[1..5].to_vec(), + vec![ + VersionEdit::Remove { + level: 0, + gen: table_gen_1, + }, + VersionEdit::Remove { + level: 0, + gen: table_gen_2, + }, + VersionEdit::Remove { + level: 1, + gen: table_gen_3, + }, + VersionEdit::Remove { + level: 1, + gen: table_gen_4, + }, + ] + ); + } +} diff --git a/src/fs/tokio.rs b/src/fs/tokio.rs index ca177926..338db1c3 100644 --- a/src/fs/tokio.rs +++ b/src/fs/tokio.rs @@ -1,6 +1,6 @@ use std::{io, path::Path}; -use tokio::fs::{remove_file, File}; +use tokio::fs::{remove_file, File, OpenOptions}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use super::FileProvider; @@ -10,7 +10,11 @@ impl FileProvider for TokioExecutor { type File = Compat; async fn open(path: impl AsRef) -> io::Result { - File::create_new(path) + OpenOptions::new() + .create(true) + .write(true) + .read(true) + .open(path) .await .map(TokioAsyncReadCompatExt::compat) } diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 659ef9f1..a2206a9f 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -12,7 +12,7 @@ use crate::{ timestamp::{Timestamped, TimestampedRef}, Timestamp, EPOCH, }, - record::{internal::InternalRecordRef, Record, RecordRef}, + record::{internal::InternalRecordRef, Key, Record, RecordRef}, stream::record_batch::RecordBatchEntry, }; @@ -32,7 +32,11 @@ pub trait Builder where S: ArrowArrays, { - fn push(&mut self, key: &Timestamped<::Key>, row: &Option); + fn push( + &mut self, + key: Timestamped<<::Key as Key>::Ref<'_>>, + row: Option<::Ref<'_>>, + ); fn finish(&mut self) -> S; } @@ -55,7 +59,10 @@ where let mut builder = A::builder(mutable.len()); for (offset, (key, value)) in mutable.into_iter().enumerate() { - builder.push(&key, &value); + builder.push( + Timestamped::new(key.value.as_key_ref(), key.ts), + value.as_ref().map(Record::as_record_ref), + ); index.insert(key, offset as u32); } @@ -69,6 +76,18 @@ impl Immutable where A: ArrowArrays, { + pub(crate) fn scope( + &self, + ) -> ( + Option<&::Key>, + Option<&::Key>, + ) { + ( + self.index.first_key_value().map(|(key, _)| key.value()), + self.index.last_key_value().map(|(key, _)| key.value()), + ) + } + pub(crate) fn as_record_batch(&self) -> &RecordBatch { self.data.as_record_batch() } @@ -220,12 +239,12 @@ pub(crate) mod tests { } impl Builder for TestBuilder { - fn push(&mut self, key: &Timestamped, row: &Option) { + fn push(&mut self, key: Timestamped<&str>, row: Option) { self.vstring.append_value(&key.value); match row { Some(row) => { self.vu32.append_value(row.vu32); - match row.vobool { + match row.vbool { Some(vobool) => self.vobool.append_value(vobool), None => self.vobool.append_null(), } diff --git a/src/lib.rs b/src/lib.rs index e40e5877..2b004907 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] pub(crate) mod arrows; +mod compaction; pub mod executor; pub mod fs; mod inmem; @@ -10,7 +11,7 @@ mod scope; pub mod serdes; mod stream; mod transaction; -mod version; +pub(crate) mod version; use std::{ collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc, @@ -21,7 +22,7 @@ use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; use oracle::Timestamp; -use parquet::errors::ParquetError; +use parquet::{errors::ParquetError, file::properties::WriterProperties}; use record::Record; use crate::{ @@ -40,6 +41,7 @@ pub struct DbOption { pub level_sst_magnification: usize, pub max_sst_file_size: usize, pub clean_channel_buffer: usize, + pub write_parquet_option: Option, } pub struct DB @@ -61,6 +63,7 @@ impl DbOption { level_sst_magnification: 10, max_sst_file_size: 24 * 1024 * 1024, clean_channel_buffer: 10, + write_parquet_option: None, } } @@ -172,7 +175,7 @@ where ts: Timestamp, ) -> Result>, ParquetError> where - E: Executor, + E: Executor + 'get, { self.scan::(Bound::Included(key), Bound::Unbounded, ts) .await? @@ -188,7 +191,7 @@ where ts: Timestamp, ) -> Result, ParquetError>>, ParquetError> where - E: Executor, + E: Executor + 'scan, { let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); streams.push(self.mutable.scan((lower, uppwer), ts).into()); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 4616e738..dda3d1dc 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -18,6 +18,7 @@ pin_project! { #[derive(Debug)] pub struct SsTableScan where + R: Record, E: Executor, { #[pin] @@ -28,6 +29,7 @@ pin_project! { impl SsTableScan where + R: Record, E: Executor, { pub fn new(stream: ParquetRecordBatchStream>) -> Self { diff --git a/src/oracle/timestamp.rs b/src/oracle/timestamp.rs index d26c2d2c..655118b5 100644 --- a/src/oracle/timestamp.rs +++ b/src/oracle/timestamp.rs @@ -1,6 +1,11 @@ -use std::{borrow::Borrow, cmp::Ordering, marker::PhantomData, mem::transmute}; +use std::{ + borrow::Borrow, + cmp::Ordering, + marker::PhantomData, + mem::{size_of, transmute}, +}; -use crate::oracle::Timestamp; +use crate::{oracle::Timestamp, serdes::Encode}; #[derive(PartialEq, Eq, Debug, Clone)] pub(crate) struct Timestamped { @@ -35,6 +40,15 @@ impl Timestamped { } } +impl Timestamped +where + V: Encode, +{ + pub(crate) fn size(&self) -> usize { + self.value.size() + size_of::() + } +} + impl PartialOrd for Timestamped where V: PartialOrd, diff --git a/src/record/mod.rs b/src/record/mod.rs index fa805c19..b76cc376 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -1,7 +1,7 @@ pub(crate) mod internal; mod str; -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use arrow::{ array::{Datum, RecordBatch}, @@ -14,7 +14,7 @@ use crate::{ serdes::{Decode, Encode}, }; -pub trait Key: 'static + Encode + Decode + Ord + Clone + Send { +pub trait Key: 'static + Debug + Encode + Decode + Ord + Clone + Send { type Ref<'r>: KeyRef<'r, Key = Self> + Copy where Self: 'r; @@ -24,7 +24,7 @@ pub trait Key: 'static + Encode + Decode + Ord + Clone + Send { fn to_arrow_datum(&self) -> impl Datum; } -pub trait KeyRef<'r>: Clone + PartialEq + Ord { +pub trait KeyRef<'r>: Clone + Encode + PartialEq + Ord { type Key: Key = Self>; fn to_key(&self) -> Self::Key; diff --git a/src/record/str.rs b/src/record/str.rs index deea23cd..debfe218 100644 --- a/src/record/str.rs +++ b/src/record/str.rs @@ -128,7 +128,7 @@ pub struct StringColumnsBuilder { } impl Builder for StringColumnsBuilder { - fn push(&mut self, key: &Timestamped, row: &Option) { + fn push(&mut self, key: Timestamped<&str>, row: Option<&str>) { self._null.append(row.is_none()); self._ts.append_value(key.ts.into()); if let Some(row) = row { diff --git a/src/scope.rs b/src/scope.rs index 020c71db..19762900 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -39,6 +39,10 @@ where pub(crate) fn meets(&self, target: &Self) -> bool { self.contains(&target.min) || self.contains(&target.max) } + + pub(crate) fn gen(&self) -> FileId { + self.gen + } } impl Encode for Scope diff --git a/src/serdes/string.rs b/src/serdes/string.rs index 3b9ea2da..9d685ff0 100644 --- a/src/serdes/string.rs +++ b/src/serdes/string.rs @@ -5,7 +5,7 @@ use futures_util::{AsyncReadExt, AsyncWriteExt}; use super::{Decode, Encode}; -impl Encode for String { +impl<'r> Encode for &'r str { type Error = io::Error; async fn encode( @@ -21,6 +21,21 @@ impl Encode for String { } } +impl Encode for String { + type Error = io::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + self.as_str().encode(writer).await + } + + fn size(&self) -> usize { + self.as_str().size() + } +} + impl Decode for String { type Error = io::Error; diff --git a/src/stream/level.rs b/src/stream/level.rs index ad5de4a1..50de6284 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -16,7 +16,9 @@ use crate::{ ondisk::{scan::SsTableScan, sstable::SsTable}, oracle::Timestamp, record::Record, + scope::Scope, stream::record_batch::RecordBatchEntry, + version::Version, DbOption, }; @@ -25,6 +27,7 @@ where R: Record, E: Executor, { + Init(FileId), Ready(SsTableScan), OpenFile(Pin> + 'level>>), LoadStream(Pin, ParquetError>> + 'level>>), @@ -43,6 +46,39 @@ where status: FutureStatus<'level, R, E>, } +impl<'level, R, E> LevelStream<'level, R, E> +where + R: Record, + E: Executor, +{ + // Kould: only used by Compaction now, and the start and end of the sstables range are known + pub(crate) fn new( + version: &Version, + level: usize, + start: usize, + end: usize, + range: (Bound<&'level R::Key>, Bound<&'level R::Key>), + ts: Timestamp, + ) -> Option { + let (lower, upper) = range; + let mut gens: VecDeque = version.level_slice[level][start..end + 1] + .iter() + .map(Scope::gen) + .collect(); + let first_gen = gens.pop_front()?; + let status = FutureStatus::Init(first_gen); + + Some(LevelStream { + lower, + upper, + ts, + option: version.option().clone(), + gens, + status, + }) + } +} + impl<'level, R, E> Stream for LevelStream<'level, R, E> where R: Record, @@ -53,6 +89,12 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { return match &mut self.status { + FutureStatus::Init(gen) => { + let gen = *gen; + self.status = + FutureStatus::OpenFile(Box::pin(E::open(self.option.table_path(&gen)))); + continue; + } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { Poll::Ready(None) => match self.gens.pop_front() { None => Poll::Ready(None), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 2734572b..e8f1f36a 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -27,7 +27,7 @@ pin_project! { impl<'merge, R, E> MergeStream<'merge, R, E> where R: Record, - E: Executor, + E: Executor + 'merge, { pub(crate) async fn from_vec( mut streams: Vec>, @@ -54,7 +54,7 @@ where impl<'merge, R, E> Stream for MergeStream<'merge, R, E> where R: Record, - E: Executor, + E: Executor + 'merge, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 733a252d..3b89f315 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -20,6 +20,7 @@ use crate::{ ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, record::{Key, Record}, + stream::level::LevelStream, }; pub enum Entry<'entry, R> @@ -29,6 +30,7 @@ where Mutable(crossbeam_skiplist::map::Entry<'entry, Timestamped, Option>), Immutable(RecordBatchEntry), SsTable(RecordBatchEntry), + Level(RecordBatchEntry), } impl Entry<'_, R> @@ -42,6 +44,7 @@ where .map(|key| unsafe { transmute(key.as_key_ref()) }), Entry::SsTable(entry) => entry.internal_key(), Entry::Immutable(entry) => entry.internal_key(), + Entry::Level(entry) => entry.internal_key(), } } @@ -50,6 +53,7 @@ where Entry::Mutable(entry) => entry.value().as_ref().map(R::as_record_ref).unwrap(), Entry::SsTable(entry) => entry.get(), Entry::Immutable(entry) => entry.get(), + Entry::Level(entry) => entry.get(), } } } @@ -69,6 +73,7 @@ where ), Entry::SsTable(sstable) => write!(f, "Entry::SsTable({:?})", sstable), Entry::Immutable(immutable) => write!(f, "Entry::Immutable({:?})", immutable), + Entry::Level(level) => write!(f, "Entry::Level({:?})", level), } } } @@ -92,6 +97,10 @@ pin_project! { #[pin] inner: SsTableScan, }, + Level { + #[pin] + inner: LevelStream<'scan, R, E>, + } } } @@ -129,6 +138,16 @@ where } } +impl<'scan, R, E> From> for ScanStream<'scan, R, E> +where + R: Record, + E: Executor, +{ + fn from(inner: LevelStream<'scan, R, E>) -> Self { + ScanStream::Level { inner } + } +} + impl fmt::Debug for ScanStream<'_, R, E> where R: Record, @@ -139,6 +158,7 @@ where ScanStream::Mutable { .. } => write!(f, "ScanStream::Mutable"), ScanStream::SsTable { .. } => write!(f, "ScanStream::SsTable"), ScanStream::Immutable { .. } => write!(f, "ScanStream::Immutable"), + ScanStream::Level { .. } => write!(f, "ScanStream::Level"), } } } @@ -146,7 +166,7 @@ where impl<'scan, R, E> Stream for ScanStream<'scan, R, E> where R: Record, - E: Executor, + E: Executor + 'scan, { type Item = Result, parquet::errors::ParquetError>; @@ -161,6 +181,9 @@ where ScanStreamProject::Immutable { inner } => { Poll::Ready(ready!(inner.poll_next(cx)).map(|entry| Ok(Entry::Immutable(entry)))) } + ScanStreamProject::Level { inner } => { + Poll::Ready(ready!(inner.poll_next(cx)).map(|entry| entry.map(Entry::Level))) + } } } } diff --git a/src/version/mod.rs b/src/version/mod.rs index fa077e26..a22066c6 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -1,6 +1,6 @@ mod cleaner; -mod edit; -mod set; +pub(crate) mod edit; +pub(crate) mod set; use std::{marker::PhantomData, ops::Bound, sync::Arc}; @@ -21,7 +21,7 @@ use crate::{ DbOption, }; -const MAX_LEVEL: usize = 7; +pub(crate) const MAX_LEVEL: usize = 7; pub(crate) type VersionRef = Arc>; @@ -29,13 +29,33 @@ pub(crate) struct Version where R: Record, { - pub(crate) num: usize, + num: usize, pub(crate) level_slice: [Vec>; MAX_LEVEL], - pub(crate) clean_sender: Sender, - pub(crate) option: Arc, + clean_sender: Sender, + option: Arc, _p: PhantomData, } +impl Version +where + R: Record, + E: Executor, +{ + pub(crate) fn new(option: Arc, clean_sender: Sender) -> Self { + Version { + num: 0, + level_slice: [const { Vec::new() }; MAX_LEVEL], + clean_sender, + option: option.clone(), + _p: Default::default(), + } + } + + pub(crate) fn option(&self) -> &Arc { + &self.option + } +} + impl Clone for Version where R: Record, From 24f052bdf0509c5773cccf3238009abe2984d754 Mon Sep 17 00:00:00 2001 From: Kould Date: Thu, 18 Jul 2024 17:35:35 +0800 Subject: [PATCH 2/7] fix: `SsTableScan` returns only the first element --- src/compaction/mod.rs | 8 +++++--- src/ondisk/sstable.rs | 28 ++++++++++++++++++---------- src/stream/level.rs | 19 +++++++++++++++---- src/version/mod.rs | 6 +++++- 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 5fc4d437..7c58ddec 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -218,13 +218,13 @@ where streams.push(ScanStream::SsTable { inner: SsTable::open(file) - .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into()) + .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None) .await .map_err(CompactionError::Parquet)?, }); } } else { - let (lower, upper) = Self::full_scope(&mut meet_scopes_l)?; + let (lower, upper) = Self::full_scope(&meet_scopes_l)?; let level_scan_l = LevelStream::new( version, level, @@ -232,6 +232,7 @@ where end_l, (Bound::Included(lower), Bound::Included(upper)), u32::MAX.into(), + None, ) .ok_or(CompactionError::EmptyLevel)?; @@ -240,7 +241,7 @@ where }); } // Next Level - let (lower, upper) = Self::full_scope(&mut meet_scopes_ll)?; + let (lower, upper) = Self::full_scope(&meet_scopes_ll)?; let level_scan_ll = LevelStream::new( version, level + 1, @@ -248,6 +249,7 @@ where end_ll, (Bound::Included(lower), Bound::Included(upper)), u32::MAX.into(), + None, ) .ok_or(CompactionError::EmptyLevel)?; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 59b2cb83..78ffde81 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -75,33 +75,41 @@ where async fn into_parquet_builder( self, - limit: usize, + limit: Option, ) -> parquet::errors::Result>>> { - Ok(ParquetRecordBatchStreamBuilder::new_with_options( + let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), ) - .await? - .with_limit(limit)) + .await?; + if let Some(limit) = limit { + builder = builder.with_limit(limit); + } + Ok(builder) } pub(crate) async fn get( self, key: &TimestampedRef, ) -> parquet::errors::Result>> { - self.scan((Bound::Included(key.value()), Bound::Unbounded), key.ts()) - .await? - .next() - .await - .transpose() + self.scan( + (Bound::Included(key.value()), Bound::Unbounded), + key.ts(), + Some(1), + ) + .await? + .next() + .await + .transpose() } pub(crate) async fn scan<'scan>( self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, + limit: Option, ) -> Result, parquet::errors::ParquetError> { - let builder = self.into_parquet_builder(1).await?; + let builder = self.into_parquet_builder(limit).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); let filter = unsafe { get_range_filter::(schema_descriptor, range, ts) }; diff --git a/src/stream/level.rs b/src/stream/level.rs index 50de6284..a05ab981 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -43,6 +43,7 @@ where ts: Timestamp, option: Arc, gens: VecDeque, + limit: Option, status: FutureStatus<'level, R, E>, } @@ -59,6 +60,7 @@ where end: usize, range: (Bound<&'level R::Key>, Bound<&'level R::Key>), ts: Timestamp, + limit: Option, ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -74,6 +76,7 @@ where ts, option: version.option().clone(), gens, + limit, status, }) } @@ -105,13 +108,21 @@ where continue; } }, - poll => poll, + Poll::Ready(Some(result)) => { + if let Some(limit) = &mut self.limit { + *limit -= 1; + } + Poll::Ready(Some(result)) + } + Poll::Pending => Poll::Pending, }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::LoadStream(Box::pin( - SsTable::open(file).scan((self.lower, self.upper), self.ts), - )); + self.status = FutureStatus::LoadStream(Box::pin(SsTable::open(file).scan( + (self.lower, self.upper), + self.ts, + self.limit, + ))); continue; } Poll::Ready(Err(err)) => Poll::Ready(Some(Err(ParquetError::from(err)))), diff --git a/src/version/mod.rs b/src/version/mod.rs index a22066c6..f4b9fc45 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -140,6 +140,7 @@ where iters: &mut Vec>, range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), ts: Timestamp, + limit: Option, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { let file = E::open(self.option.table_path(&scope.gen)) @@ -148,7 +149,10 @@ where let table = SsTable::open(file); iters.push(ScanStream::SsTable { - inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?, + inner: table + .scan(range, ts, limit) + .await + .map_err(VersionError::Parquet)?, }) } for scopes in self.level_slice[1..].iter() { From 86c476b235c66a52ac75d633ebc045a2f6ed0d3d Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Fri, 19 Jul 2024 01:03:48 +0800 Subject: [PATCH 3/7] fix: unnecessary borrow --- src/inmem/immutable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index a2206a9f..53c3b5ca 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -240,7 +240,7 @@ pub(crate) mod tests { impl Builder for TestBuilder { fn push(&mut self, key: Timestamped<&str>, row: Option) { - self.vstring.append_value(&key.value); + self.vstring.append_value(key.value); match row { Some(row) => { self.vu32.append_value(row.vu32); From 48cc40fd0549fb0509caaaeeea88ba5d9ee5d7f0 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Mon, 22 Jul 2024 00:05:59 +0800 Subject: [PATCH 4/7] refactor: use file provider instead of executor --- src/compaction/mod.rs | 37 ++++++++++++++++++------------------- src/lib.rs | 3 ++- src/ondisk/scan.rs | 20 +++++++++----------- src/ondisk/sstable.rs | 19 +++++++++---------- src/stream/level.rs | 33 ++++++++++++++++----------------- src/stream/merge.rs | 18 +++++++++--------- src/stream/mod.rs | 38 +++++++++++++++++++------------------- src/version/mod.rs | 30 ++++++++++++++---------------- src/version/set.rs | 35 +++++++++++++++++------------------ 9 files changed, 113 insertions(+), 120 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 7c58ddec..f8bec6c1 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -14,8 +14,7 @@ use tokio_util::compat::FuturesAsyncReadCompatExt; use ulid::Ulid; use crate::{ - executor::Executor, - fs::FileId, + fs::{FileId, FileProvider}, inmem::immutable::{ArrowArrays, Builder, Immutable}, ondisk::sstable::SsTable, record::{KeyRef, Record}, @@ -25,27 +24,27 @@ use crate::{ DbOption, Schema, }; -pub(crate) struct Compactor +pub(crate) struct Compactor where R: Record, - E: Executor, + FP: FileProvider, { pub(crate) option: Arc, pub(crate) schema: Arc>>, - pub(crate) version_set: VersionSet, + pub(crate) version_set: VersionSet, } -impl Compactor +impl Compactor where R: Record, - E: Executor, + FP: FileProvider, { pub(crate) fn new( schema: Arc>>, option: Arc, - version_set: VersionSet, + version_set: VersionSet, ) -> Self { - Compactor:: { + Compactor:: { option, schema, version_set, @@ -109,7 +108,7 @@ where // let mut wal_ids = Vec::with_capacity(batches.len()); let mut writer = AsyncArrowWriter::try_new( - E::open(option.table_path(&gen)) + FP::open(option.table_path(&gen)) .await .map_err(CompactionError::Io)? .compat(), @@ -148,7 +147,7 @@ where } pub(crate) async fn major_compaction( - version: &Version, + version: &Version, option: &DbOption, mut min: &R::Key, mut max: &R::Key, @@ -163,7 +162,7 @@ where } let mut meet_scopes_l = Vec::new(); - let start_l = Version::::scope_search(min, &version.level_slice[level]); + let start_l = Version::::scope_search(min, &version.level_slice[level]); let mut end_l = start_l; { for scope in version.level_slice[level][start_l..].iter() { @@ -192,9 +191,9 @@ where max = max_key; start_ll = - Version::::scope_search(min_key, &version.level_slice[level + 1]); + Version::::scope_search(min_key, &version.level_slice[level + 1]); end_ll = - Version::::scope_search(max_key, &version.level_slice[level + 1]); + Version::::scope_search(max_key, &version.level_slice[level + 1]); let next_level_len = version.level_slice[level + 1].len(); for scope in version.level_slice[level + 1] @@ -212,7 +211,7 @@ where // This Level if level == 0 { for scope in meet_scopes_l.iter() { - let file = E::open(option.table_path(&scope.gen)) + let file = FP::open(option.table_path(&scope.gen)) .await .map_err(CompactionError::Io)?; @@ -256,7 +255,7 @@ where streams.push(ScanStream::Level { inner: level_scan_ll, }); - let mut stream = MergeStream::::from_vec(streams) + let mut stream = MergeStream::::from_vec(streams) .await .map_err(CompactionError::Parquet)?; @@ -344,7 +343,7 @@ where let gen = Ulid::new(); let columns = builder.finish(); let mut writer = AsyncArrowWriter::try_new( - E::open(option.table_path(&gen)) + FP::open(option.table_path(&gen)) .await .map_err(CompactionError::Io)? .compat(), @@ -415,14 +414,14 @@ mod tests { Immutable::from(mutable) } - async fn build_parquet_table( + async fn build_parquet_table( option: &DbOption, gen: FileId, fn_mutable: impl FnOnce(&mut Mutable), ) -> Result<(), ParquetError> { let immutable = build_immutable(fn_mutable); let mut writer = AsyncArrowWriter::try_new( - E::open(option.table_path(&gen)) + FP::open(option.table_path(&gen)) .await .map_err(ParquetError::from)? .compat(), diff --git a/src/lib.rs b/src/lib.rs index 2b004907..6d7d4493 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ use std::{ }; use async_lock::{RwLock, RwLockReadGuard}; +use fs::FileProvider; use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; @@ -86,7 +87,7 @@ impl DbOption { ) -> bool where R: Record, - E: Executor, + E: FileProvider, { Version::::tables_len(version, level) >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index dda3d1dc..fa5d397a 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -9,38 +9,36 @@ use pin_project_lite::pin_project; use tokio_util::compat::Compat; use crate::{ - executor::Executor, + fs::FileProvider, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; pin_project! { #[derive(Debug)] - pub struct SsTableScan + pub struct SsTableScan where - R: Record, - E: Executor, + FP: FileProvider, { #[pin] - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream>, iter: Option>, } } -impl SsTableScan +impl SsTableScan where - R: Record, - E: Executor, + FP: FileProvider, { - pub fn new(stream: ParquetRecordBatchStream>) -> Self { + pub fn new(stream: ParquetRecordBatchStream>) -> Self { SsTableScan { stream, iter: None } } } -impl Stream for SsTableScan +impl Stream for SsTableScan where R: Record, - E: Executor, + FP: FileProvider, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 78ffde81..0295acb6 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -17,28 +17,27 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; use super::scan::SsTableScan; use crate::{ arrows::get_range_filter, - executor::Executor, - fs::AsyncFile, + fs::{AsyncFile, FileProvider}, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, stream::record_batch::RecordBatchEntry, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, - E: Executor, + FP: FileProvider, { - file: E::File, + file: FP::File, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, - E: Executor, + FP: FileProvider, { - pub(crate) fn open(file: E::File) -> Self { + pub(crate) fn open(file: FP::File) -> Self { SsTable { file, _marker: PhantomData, @@ -76,7 +75,7 @@ where async fn into_parquet_builder( self, limit: Option, - ) -> parquet::errors::Result>>> { + ) -> parquet::errors::Result>>> { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), @@ -108,7 +107,7 @@ where range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, limit: Option, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self.into_parquet_builder(limit).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); diff --git a/src/stream/level.rs b/src/stream/level.rs index a05ab981..7a62a3da 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -11,8 +11,7 @@ use futures_core::Stream; use parquet::errors::ParquetError; use crate::{ - executor::Executor, - fs::FileId, + fs::{FileId, FileProvider}, ondisk::{scan::SsTableScan, sstable::SsTable}, oracle::Timestamp, record::Record, @@ -22,21 +21,21 @@ use crate::{ DbOption, }; -enum FutureStatus<'level, R, E> +enum FutureStatus<'level, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { Init(FileId), - Ready(SsTableScan), - OpenFile(Pin> + 'level>>), - LoadStream(Pin, ParquetError>> + 'level>>), + Ready(SsTableScan), + OpenFile(Pin> + 'level>>), + LoadStream(Pin, ParquetError>> + 'level>>), } -pub(crate) struct LevelStream<'level, R, E> +pub(crate) struct LevelStream<'level, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { lower: Bound<&'level R::Key>, upper: Bound<&'level R::Key>, @@ -44,17 +43,17 @@ where option: Arc, gens: VecDeque, limit: Option, - status: FutureStatus<'level, R, E>, + status: FutureStatus<'level, R, FP>, } -impl<'level, R, E> LevelStream<'level, R, E> +impl<'level, R, FP> LevelStream<'level, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { // Kould: only used by Compaction now, and the start and end of the sstables range are known pub(crate) fn new( - version: &Version, + version: &Version, level: usize, start: usize, end: usize, @@ -82,10 +81,10 @@ where } } -impl<'level, R, E> Stream for LevelStream<'level, R, E> +impl<'level, R, FP> Stream for LevelStream<'level, R, FP> where R: Record, - E: Executor + 'level, + FP: FileProvider + 'level, { type Item = Result, ParquetError>; @@ -95,14 +94,14 @@ where FutureStatus::Init(gen) => { let gen = *gen; self.status = - FutureStatus::OpenFile(Box::pin(E::open(self.option.table_path(&gen)))); + FutureStatus::OpenFile(Box::pin(FP::open(self.option.table_path(&gen)))); continue; } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { Poll::Ready(None) => match self.gens.pop_front() { None => Poll::Ready(None), Some(gen) => { - self.status = FutureStatus::OpenFile(Box::pin(E::open( + self.status = FutureStatus::OpenFile(Box::pin(FP::open( self.option.table_path(&gen), ))); continue; diff --git a/src/stream/merge.rs b/src/stream/merge.rs index e8f1f36a..739ad469 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -10,27 +10,27 @@ use futures_util::stream::StreamExt; use pin_project_lite::pin_project; use super::{Entry, ScanStream}; -use crate::{executor::Executor, record::Record}; +use crate::{fs::FileProvider, record::Record}; pin_project! { - pub(crate) struct MergeStream<'merge, R, E> + pub(crate) struct MergeStream<'merge, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, } } -impl<'merge, R, E> MergeStream<'merge, R, E> +impl<'merge, R, FP> MergeStream<'merge, R, FP> where R: Record, - E: Executor + 'merge, + FP: FileProvider + 'merge, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -51,10 +51,10 @@ where } } -impl<'merge, R, E> Stream for MergeStream<'merge, R, E> +impl<'merge, R, FP> Stream for MergeStream<'merge, R, FP> where R: Record, - E: Executor + 'merge, + FP: FileProvider + 'merge, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 3b89f315..119af3b7 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -15,7 +15,7 @@ use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; use crate::{ - executor::Executor, + fs::FileProvider, inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, @@ -80,10 +80,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R, E> + pub enum ScanStream<'scan, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { Mutable { #[pin] @@ -95,19 +95,19 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan, + inner: SsTableScan, }, Level { #[pin] - inner: LevelStream<'scan, R, E>, + inner: LevelStream<'scan, R, FP>, } } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -116,10 +116,10 @@ where } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -128,30 +128,30 @@ where } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { - fn from(inner: SsTableScan) -> Self { + fn from(inner: SsTableScan) -> Self { ScanStream::SsTable { inner } } } -impl<'scan, R, E> From> for ScanStream<'scan, R, E> +impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { - fn from(inner: LevelStream<'scan, R, E>) -> Self { + fn from(inner: LevelStream<'scan, R, FP>) -> Self { ScanStream::Level { inner } } } -impl fmt::Debug for ScanStream<'_, R, E> +impl fmt::Debug for ScanStream<'_, R, FP> where R: Record, - E: Executor, + FP: FileProvider, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -163,10 +163,10 @@ where } } -impl<'scan, R, E> Stream for ScanStream<'scan, R, E> +impl<'scan, R, FP> Stream for ScanStream<'scan, R, FP> where R: Record, - E: Executor + 'scan, + FP: FileProvider + 'scan, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/version/mod.rs b/src/version/mod.rs index f4b9fc45..288c2c6e 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -9,8 +9,7 @@ use thiserror::Error; use tracing::error; use crate::{ - executor::Executor, - fs::FileId, + fs::{FileId, FileProvider}, ondisk::sstable::SsTable, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, @@ -23,9 +22,9 @@ use crate::{ pub(crate) const MAX_LEVEL: usize = 7; -pub(crate) type VersionRef = Arc>; +pub(crate) type VersionRef = Arc>; -pub(crate) struct Version +pub(crate) struct Version where R: Record, { @@ -33,13 +32,13 @@ where pub(crate) level_slice: [Vec>; MAX_LEVEL], clean_sender: Sender, option: Arc, - _p: PhantomData, + _p: PhantomData, } -impl Version +impl Version where R: Record, - E: Executor, + FP: FileProvider, { pub(crate) fn new(option: Arc, clean_sender: Sender) -> Self { Version { @@ -56,10 +55,9 @@ where } } -impl Clone for Version +impl Clone for Version where R: Record, - E: Executor, { fn clone(&self) -> Self { let mut level_slice = [const { Vec::new() }; MAX_LEVEL]; @@ -78,10 +76,10 @@ where } } -impl Version +impl Version where R: Record, - E: Executor, + FP: FileProvider, { pub(crate) async fn query( &self, @@ -116,10 +114,10 @@ where key: &TimestampedRef<::Key>, gen: &FileId, ) -> Result>, VersionError> { - let file = E::open(self.option.table_path(gen)) + let file = FP::open(self.option.table_path(gen)) .await .map_err(VersionError::Io)?; - SsTable::::open(file) + SsTable::::open(file) .get(key) .await .map_err(VersionError::Parquet) @@ -137,13 +135,13 @@ where pub(crate) async fn iters<'iters>( &self, - iters: &mut Vec>, + iters: &mut Vec>, range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), ts: Timestamp, limit: Option, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { - let file = E::open(self.option.table_path(&scope.gen)) + let file = FP::open(self.option.table_path(&scope.gen)) .await .map_err(VersionError::Io)?; let table = SsTable::open(file); @@ -169,7 +167,7 @@ where } } -impl Drop for Version +impl Drop for Version where R: Record, { diff --git a/src/version/set.rs b/src/version/set.rs index c5c26f42..b58ca1f9 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -6,37 +6,36 @@ use futures_util::{AsyncSeekExt, AsyncWriteExt}; use super::MAX_LEVEL; use crate::{ - executor::Executor, - fs::FileId, + fs::{FileId, FileProvider}, record::Record, serdes::Encode, version::{cleaner::CleanTag, edit::VersionEdit, Version, VersionError, VersionRef}, DbOption, }; -pub(crate) struct VersionSetInner +pub(crate) struct VersionSetInner where R: Record, - E: Executor, + FP: FileProvider, { - current: VersionRef, - log: E::File, + current: VersionRef, + log: FP::File, } -pub(crate) struct VersionSet +pub(crate) struct VersionSet where R: Record, - E: Executor, + FP: FileProvider, { - inner: Arc>>, + inner: Arc>>, clean_sender: Sender, option: Arc, } -impl Clone for VersionSet +impl Clone for VersionSet where R: Record, - E: Executor, + FP: FileProvider, { fn clone(&self) -> Self { VersionSet { @@ -47,24 +46,24 @@ where } } -impl VersionSet +impl VersionSet where R: Record, - E: Executor, + FP: FileProvider, { pub(crate) async fn new( clean_sender: Sender, option: Arc, ) -> Result> { - let mut log = E::open(option.version_path()) + let mut log = FP::open(option.version_path()) .await .map_err(VersionError::Io)?; let edits = VersionEdit::recover(&mut log).await; log.seek(SeekFrom::End(0)).await.map_err(VersionError::Io)?; - let set = VersionSet:: { + let set = VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { - current: Arc::new(Version:: { + current: Arc::new(Version:: { num: 0, level_slice: [const { Vec::new() }; MAX_LEVEL], clean_sender: clean_sender.clone(), @@ -81,7 +80,7 @@ where Ok(set) } - pub(crate) async fn current(&self) -> VersionRef { + pub(crate) async fn current(&self) -> VersionRef { self.inner.read().await.current.clone() } @@ -106,7 +105,7 @@ where VersionEdit::Add { mut scope, level } => { if let Some(wal_ids) = scope.wal_ids.take() { for wal_id in wal_ids { - E::remove(self.option.wal_path(&wal_id)) + FP::remove(self.option.wal_path(&wal_id)) .await .map_err(VersionError::Io)?; } From 0c31d94a4830ef02058e628b052de7228949ad2a Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Mon, 22 Jul 2024 00:20:04 +0800 Subject: [PATCH 5/7] refactor: use ? instead of map_err in compaction mod --- src/compaction/mod.rs | 56 ++++++++++++++----------------------------- 1 file changed, 18 insertions(+), 38 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index f8bec6c1..0ce88a76 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -84,8 +84,7 @@ where self.version_set .apply_edits(version_edits, Some(delete_gens), false) - .await - .map_err(CompactionError::Version)?; + .await?; } } // TODO @@ -108,14 +107,10 @@ where // let mut wal_ids = Vec::with_capacity(batches.len()); let mut writer = AsyncArrowWriter::try_new( - FP::open(option.table_path(&gen)) - .await - .map_err(CompactionError::Io)? - .compat(), + FP::open(option.table_path(&gen)).await?.compat(), R::arrow_schema().clone(), option.write_parquet_option.clone(), - ) - .map_err(CompactionError::Parquet)?; + )?; for batch in batches { if let (Some(batch_min), Some(batch_max)) = batch.scope() { @@ -126,14 +121,11 @@ where max = Some(batch_max.clone()) } } - writer - .write(batch.as_record_batch()) - .await - .map_err(CompactionError::Parquet)?; + writer.write(batch.as_record_batch()).await?; // TODO: WAL CLEAN // wal_ids.push(wal_id); } - writer.close().await.map_err(CompactionError::Parquet)?; + writer.close().await?; return Ok(Some(Scope { min: min.ok_or(CompactionError::EmptyLevel)?, max: max.ok_or(CompactionError::EmptyLevel)?, @@ -211,15 +203,12 @@ where // This Level if level == 0 { for scope in meet_scopes_l.iter() { - let file = FP::open(option.table_path(&scope.gen)) - .await - .map_err(CompactionError::Io)?; + let file = FP::open(option.table_path(&scope.gen)).await?; streams.push(ScanStream::SsTable { inner: SsTable::open(file) .scan((Bound::Unbounded, Bound::Unbounded), u32::MAX.into(), None) - .await - .map_err(CompactionError::Parquet)?, + .await?, }); } } else { @@ -255,9 +244,7 @@ where streams.push(ScanStream::Level { inner: level_scan_ll, }); - let mut stream = MergeStream::::from_vec(streams) - .await - .map_err(CompactionError::Parquet)?; + let mut stream = MergeStream::::from_vec(streams).await?; // Kould: is the capacity parameter necessary? let mut builder = R::Columns::builder(8192); @@ -266,7 +253,7 @@ where let mut max = None; while let Some(result) = Pin::new(&mut stream).next().await { - let entry = result.map_err(CompactionError::Parquet)?; + let entry = result?; let key = entry.key(); if min.is_none() { @@ -337,25 +324,18 @@ where min: &mut Option, max: &mut Option, ) -> Result<(), CompactionError> { - assert!(min.is_some()); - assert!(max.is_some()); + debug_assert!(min.is_some()); + debug_assert!(max.is_some()); let gen = Ulid::new(); let columns = builder.finish(); let mut writer = AsyncArrowWriter::try_new( - FP::open(option.table_path(&gen)) - .await - .map_err(CompactionError::Io)? - .compat(), + FP::open(option.table_path(&gen)).await?.compat(), R::arrow_schema().clone(), option.write_parquet_option.clone(), - ) - .map_err(CompactionError::Parquet)?; - writer - .write(columns.as_record_batch()) - .await - .map_err(CompactionError::Parquet)?; - writer.close().await.map_err(CompactionError::Parquet)?; + )?; + writer.write(columns.as_record_batch()).await?; + writer.close().await?; version_edits.push(VersionEdit::Add { level: (level + 1) as u8, scope: Scope { @@ -375,11 +355,11 @@ where R: Record, { #[error("compaction io error: {0}")] - Io(#[source] std::io::Error), + Io(#[from] std::io::Error), #[error("compaction parquet error: {0}")] - Parquet(#[source] parquet::errors::ParquetError), + Parquet(#[from] parquet::errors::ParquetError), #[error("compaction version error: {0}")] - Version(#[source] VersionError), + Version(#[from] VersionError), #[error("the level being compacted does not have a table")] EmptyLevel, } From 870d74f7759f70b801b4017a0438202bf1e7748e Mon Sep 17 00:00:00 2001 From: Kould Date: Mon, 22 Jul 2024 11:59:15 +0800 Subject: [PATCH 6/7] chore: `Compactor::major_compaction` split it to several submethods --- src/compaction/mod.rs | 214 ++++++++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 89 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 0ce88a76..53e1fa70 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -152,54 +152,15 @@ where if !option.is_threshold_exceeded_major(version, level) { break; } + let (meet_scopes_l, start_l, end_l) = + match Self::this_level_scopes(version, min, max, level) { + Some(value) => value, + None => return Ok(()), + }; + let (meet_scopes_ll, start_ll, end_ll) = + Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?; - let mut meet_scopes_l = Vec::new(); - let start_l = Version::::scope_search(min, &version.level_slice[level]); - let mut end_l = start_l; - { - for scope in version.level_slice[level][start_l..].iter() { - if scope.contains(min) || scope.contains(max) { - meet_scopes_l.push(scope); - end_l += 1; - } else { - break; - } - } - if meet_scopes_l.is_empty() { - return Ok(()); - } - } - let mut meet_scopes_ll = Vec::new(); - let mut start_ll = 0; - let mut end_ll = 0; - { - if !version.level_slice[level + 1].is_empty() { - let min_key = &meet_scopes_l - .first() - .ok_or(CompactionError::EmptyLevel)? - .min; - let max_key = &meet_scopes_l.last().ok_or(CompactionError::EmptyLevel)?.max; - min = min_key; - max = max_key; - - start_ll = - Version::::scope_search(min_key, &version.level_slice[level + 1]); - end_ll = - Version::::scope_search(max_key, &version.level_slice[level + 1]); - - let next_level_len = version.level_slice[level + 1].len(); - for scope in version.level_slice[level + 1] - [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] - .iter() - { - if scope.contains(min) || scope.contains(max) { - meet_scopes_ll.push(scope); - } - } - } - } let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len()); - // This Level if level == 0 { for scope in meet_scopes_l.iter() { @@ -244,50 +205,8 @@ where streams.push(ScanStream::Level { inner: level_scan_ll, }); - let mut stream = MergeStream::::from_vec(streams).await?; - - // Kould: is the capacity parameter necessary? - let mut builder = R::Columns::builder(8192); - let mut written_size = 0; - let mut min = None; - let mut max = None; + Self::build_tables(option, version_edits, level, streams).await?; - while let Some(result) = Pin::new(&mut stream).next().await { - let entry = result?; - let key = entry.key(); - - if min.is_none() { - min = Some(key.value.to_key()) - } - max = Some(key.value.to_key()); - - written_size += key.size(); - builder.push(key, Some(entry.value())); - - if written_size >= option.max_sst_file_size { - Self::build_table( - option, - version_edits, - level, - &mut builder, - &mut min, - &mut max, - ) - .await?; - written_size = 0; - } - } - if written_size > 0 { - Self::build_table( - option, - version_edits, - level, - &mut builder, - &mut min, - &mut max, - ) - .await?; - } for scope in meet_scopes_l { version_edits.push(VersionEdit::Remove { level: level as u8, @@ -308,6 +227,123 @@ where Ok(()) } + fn next_level_scopes<'a>( + version: &'a Version, + min: &mut &'a ::Key, + max: &mut &'a ::Key, + level: usize, + meet_scopes_l: &[&'a Scope<::Key>], + ) -> Result<(Vec<&'a Scope<::Key>>, usize, usize), CompactionError> { + let mut meet_scopes_ll = Vec::new(); + let mut start_ll = 0; + let mut end_ll = 0; + { + if !version.level_slice[level + 1].is_empty() { + *min = &meet_scopes_l + .first() + .ok_or(CompactionError::EmptyLevel)? + .min; + *max = &meet_scopes_l + .iter() + .last() + .ok_or(CompactionError::EmptyLevel)? + .max; + + start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); + end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); + + let next_level_len = version.level_slice[level + 1].len(); + for scope in version.level_slice[level + 1] + [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] + .iter() + { + if scope.contains(min) || scope.contains(max) { + meet_scopes_ll.push(scope); + } + } + } + } + Ok((meet_scopes_ll, start_ll, end_ll)) + } + + fn this_level_scopes<'a>( + version: &'a Version, + min: &::Key, + max: &::Key, + level: usize, + ) -> Option<(Vec<&'a Scope<::Key>>, usize, usize)> { + let mut meet_scopes_l = Vec::new(); + let start_l = Version::::scope_search(min, &version.level_slice[level]); + let mut end_l = start_l; + { + for scope in version.level_slice[level][start_l..].iter() { + if scope.contains(min) || scope.contains(max) { + meet_scopes_l.push(scope); + end_l += 1; + } else { + break; + } + } + if meet_scopes_l.is_empty() { + return None; + } + } + Some((meet_scopes_l, start_l, end_l)) + } + + async fn build_tables<'scan>( + option: &DbOption, + version_edits: &mut Vec::Key>>, + level: usize, + streams: Vec>, + ) -> Result<(), CompactionError> { + let mut stream = MergeStream::::from_vec(streams).await?; + + // Kould: is the capacity parameter necessary? + let mut builder = R::Columns::builder(8192); + let mut written_size = 0; + let mut min = None; + let mut max = None; + + while let Some(result) = Pin::new(&mut stream).next().await { + let entry = result?; + let key = entry.key(); + + if min.is_none() { + min = Some(key.value.to_key()) + } + max = Some(key.value.to_key()); + + written_size += key.size(); + builder.push(key, Some(entry.value())); + + if written_size >= option.max_sst_file_size { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + written_size = 0; + } + } + if written_size > 0 { + Self::build_table( + option, + version_edits, + level, + &mut builder, + &mut min, + &mut max, + ) + .await?; + } + Ok(()) + } + fn full_scope<'a>( meet_scopes: &[&'a Scope<::Key>], ) -> Result<(&'a ::Key, &'a ::Key), CompactionError> { From d260406dca0b374861310f2ade5e33dc8b4c258e Mon Sep 17 00:00:00 2001 From: Kould Date: Mon, 22 Jul 2024 12:46:41 +0800 Subject: [PATCH 7/7] chore: remove useless braces --- src/compaction/mod.rs | 66 +++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 53e1fa70..d5ba0569 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -237,29 +237,28 @@ where let mut meet_scopes_ll = Vec::new(); let mut start_ll = 0; let mut end_ll = 0; - { - if !version.level_slice[level + 1].is_empty() { - *min = &meet_scopes_l - .first() - .ok_or(CompactionError::EmptyLevel)? - .min; - *max = &meet_scopes_l - .iter() - .last() - .ok_or(CompactionError::EmptyLevel)? - .max; - - start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); - end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); - - let next_level_len = version.level_slice[level + 1].len(); - for scope in version.level_slice[level + 1] - [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] - .iter() - { - if scope.contains(min) || scope.contains(max) { - meet_scopes_ll.push(scope); - } + + if !version.level_slice[level + 1].is_empty() { + *min = &meet_scopes_l + .first() + .ok_or(CompactionError::EmptyLevel)? + .min; + *max = &meet_scopes_l + .iter() + .last() + .ok_or(CompactionError::EmptyLevel)? + .max; + + start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); + end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); + + let next_level_len = version.level_slice[level + 1].len(); + for scope in version.level_slice[level + 1] + [start_ll..cmp::min(end_ll + 1, next_level_len - 1)] + .iter() + { + if scope.contains(min) || scope.contains(max) { + meet_scopes_ll.push(scope); } } } @@ -275,19 +274,18 @@ where let mut meet_scopes_l = Vec::new(); let start_l = Version::::scope_search(min, &version.level_slice[level]); let mut end_l = start_l; - { - for scope in version.level_slice[level][start_l..].iter() { - if scope.contains(min) || scope.contains(max) { - meet_scopes_l.push(scope); - end_l += 1; - } else { - break; - } - } - if meet_scopes_l.is_empty() { - return None; + + for scope in version.level_slice[level][start_l..].iter() { + if scope.contains(min) || scope.contains(max) { + meet_scopes_l.push(scope); + end_l += 1; + } else { + break; } } + if meet_scopes_l.is_empty() { + return None; + } Some((meet_scopes_l, start_l, end_l)) }