From a1304f0ef0ff3f3d001a8e2f83e6ace123043a8a Mon Sep 17 00:00:00 2001 From: pyakushin Date: Thu, 14 May 2020 18:33:02 +0300 Subject: [PATCH 1/2] libs update --- Cargo.toml | 4 ++-- src/benchmark/bin.rs | 2 +- src/storage/core.rs | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index aa0ba507ba..9d24ec9a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,13 +26,13 @@ log = "0.4" rand = "0.7" serde = "1.0" serde_derive = "1.0" -futures = "0.3.4" +futures = "0.3.5" clap = { version = "2.33", optional = true } bitvec = "0.17" ahash = "0.3" [dependencies.tokio] -version = "0.2.13" +version = "0.2.21" features = ["time", "rt-core", "macros"] [features] diff --git a/src/benchmark/bin.rs b/src/benchmark/bin.rs index f06d2537ad..1f0b5a720a 100644 --- a/src/benchmark/bin.rs +++ b/src/benchmark/bin.rs @@ -127,7 +127,7 @@ async fn start_app() { ); let write_limit = limit * 1000 / value_size_kb; let mut prev_p = 0; - while let Some(_) = futures_pool.next().await { + while futures_pool.next().await.is_some() { let percent = counter * 1000 / write_limit; if prev_p != percent { print!( diff --git a/src/storage/core.rs b/src/storage/core.rs index d9aeedc20b..6c19d83388 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -356,12 +356,12 @@ impl Storage { let safe_locked = self.inner.safe.lock(); let next = self.inner.next_blob_name()?; let config = self.filter_config(); - safe_locked.await.active_blob = Some( - Blob::open_new(next, config) - .await - .map_err(Error::new)? - .boxed(), - ); + let mut safe = safe_locked.await; + let blob = Blob::open_new(next, config) + .await + .map_err(Error::new)? + .boxed(); + safe.active_blob = Some(blob); Ok(()) } From 29e07673f43bca9d4f7b12d9be420479f4a42a93 Mon Sep 17 00:00:00 2001 From: pyakushin Date: Fri, 15 May 2020 19:28:11 +0300 Subject: [PATCH 2/2] add 3 count methods for storage --- src/blob/index/simple.rs | 2 +- src/storage/core.rs | 58 ++++++++++++++++++++++++++++++++++ tests/tests.rs | 67 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/blob/index/simple.rs b/src/blob/index/simple.rs index 6a917a0b4e..8fd0563a6c 100644 --- a/src/blob/index/simple.rs +++ b/src/blob/index/simple.rs @@ -315,7 +315,7 @@ impl Simple { if let State::InMemory(index) = self.inner { index.len() } else { - 0 + self.header.records_count } } } diff --git a/src/storage/core.rs b/src/storage/core.rs index 6c19d83388..5bb12e8156 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -450,6 +450,21 @@ impl Storage { in_active || in_closed } + /// Total records count in storage. + pub async fn records_count(&self) -> usize { + self.inner.records_count().await + } + + /// Records count per blob. Format: (blob_id, count). Last value is from active blob. + pub async fn records_count_detailed(&self) -> Vec<(usize, usize)> { + self.inner.records_count_detailed().await + } + + /// Records count in active blob. Returns None if active blob not set or any IO error occured. + pub async fn records_count_in_active_blob(&self) -> Option { + self.inner.records_count_in_active_blob().await + } + fn filter_config(&self) -> BloomConfig { self.inner.config.filter() } @@ -504,6 +519,18 @@ impl Inner { dir, )) } + + pub(crate) async fn records_count(&self) -> usize { + self.safe.lock().await.records_count().await + } + + pub(crate) async fn records_count_detailed(&self) -> Vec<(usize, usize)> { + self.safe.lock().await.records_count_detailed().await + } + + pub(crate) async fn records_count_in_active_blob(&self) -> Option { + self.safe.lock().await.records_count_in_active_blob().await + } } impl Safe { @@ -520,6 +547,37 @@ impl Safe { let blobs_max_id = self.blobs.last().map(Blob::id); active_blob_id.max(blobs_max_id) } + + pub(crate) async fn records_count(&self) -> usize { + let details = self.records_count_detailed().await; + details.iter().fold(0, |acc, (_, count)| acc + count) + } + + pub(crate) async fn records_count_detailed(&self) -> Vec<(usize, usize)> { + let mut results = Vec::new(); + for blob in self.blobs.iter() { + let count = blob.records_count().await; + if let Ok(c) = count { + let value = (blob.id(), c); + debug!("push: {:?}", value); + results.push(value); + } + } + if let Some(count) = self.records_count_in_active_blob().await { + let value = (self.blobs.len(), count); + debug!("push: {:?}", value); + results.push(value); + } + results + } + + pub(crate) async fn records_count_in_active_blob(&self) -> Option { + if let Some(ref blob) = self.active_blob { + blob.records_count().await.ok() + } else { + None + } + } } fn launch_observer(inner: Inner) { diff --git a/tests/tests.rs b/tests/tests.rs index 553c4e3203..be380e3e48 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -572,3 +572,70 @@ fn meta_with(version: &str) -> Meta { meta.insert("version".to_owned(), version); meta } + +#[tokio::test] +async fn test_records_count() { + let now = Instant::now(); + let path = common::init("records_count"); + let storage = common::create_test_storage(&path, 20000).await.unwrap(); + + let count = 30; + let records = common::generate_records(count, 1_000); + for (key, data) in &records { + write_one(&storage, *key, data, None).await.unwrap(); + delay_for(Duration::from_millis(10)).await; + } + + assert_eq!(storage.records_count().await, count); + assert!(storage.records_count_in_active_blob().await < Some(count)); + + common::clean(storage, path) + .await + .expect("work dir clean failed"); + warn!("elapsed: {:.3}", now.elapsed().as_secs_f64()); +} + +#[tokio::test] +async fn test_records_count_in_active() { + let now = Instant::now(); + let path = common::init("records_count_in_active"); + let storage = common::create_test_storage(&path, 20000).await.unwrap(); + + let count = 10; + let records = common::generate_records(count, 1_000); + for (key, data) in &records { + write_one(&storage, *key, data, None).await.unwrap(); + delay_for(Duration::from_millis(10)).await; + } + + assert_eq!(storage.records_count_in_active_blob().await, Some(count)); + + common::clean(storage, path) + .await + .expect("work dir clean failed"); + warn!("elapsed: {:.3}", now.elapsed().as_secs_f64()); +} + +#[tokio::test] +async fn test_records_count_detailed() { + let now = Instant::now(); + let path = common::init("records_count_detailed"); + let storage = common::create_test_storage(&path, 20000).await.unwrap(); + + let count = 30; + let records = common::generate_records(count, 1000); + for (key, data) in &records { + write_one(&storage, *key, data, None).await.unwrap(); + delay_for(Duration::from_millis(10)).await; + } + delay_for(Duration::from_millis(100)).await; + assert_eq!( + storage.records_count_detailed().await, + vec![(0, 19), (1, 11)] + ); + + common::clean(storage, path) + .await + .expect("work dir clean failed"); + warn!("elapsed: {:.3}", now.elapsed().as_secs_f64()); +}