Skip to content

Commit

Permalink
Merge pull request #47 from qoollo/count_methods
Browse files Browse the repository at this point in the history
Count methods
  • Loading branch information
agend authored May 15, 2020
2 parents 14bc889 + 29e0767 commit 1318f48
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 10 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ log = "0.4"
rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
futures = "0.3.4"
futures = "0.3.5"
clap = { version = "2.33", optional = true }
bitvec = "0.17"
ahash = "0.3"

[dependencies.tokio]
version = "0.2.13"
version = "0.2.21"
features = ["time", "rt-core", "macros"]

[features]
Expand Down
2 changes: 1 addition & 1 deletion src/benchmark/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn start_app() {
);
let write_limit = limit * 1000 / value_size_kb;
let mut prev_p = 0;
while let Some(_) = futures_pool.next().await {
while futures_pool.next().await.is_some() {
let percent = counter * 1000 / write_limit;
if prev_p != percent {
print!(
Expand Down
2 changes: 1 addition & 1 deletion src/blob/index/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Simple {
if let State::InMemory(index) = self.inner {
index.len()
} else {
0
self.header.records_count
}
}
}
Expand Down
70 changes: 64 additions & 6 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ impl<K> Storage<K> {
let safe_locked = self.inner.safe.lock();
let next = self.inner.next_blob_name()?;
let config = self.filter_config();
safe_locked.await.active_blob = Some(
Blob::open_new(next, config)
.await
.map_err(Error::new)?
.boxed(),
);
let mut safe = safe_locked.await;
let blob = Blob::open_new(next, config)
.await
.map_err(Error::new)?
.boxed();
safe.active_blob = Some(blob);
Ok(())
}

Expand Down Expand Up @@ -450,6 +450,21 @@ impl<K> Storage<K> {
in_active || in_closed
}

/// Total records count in storage.
pub async fn records_count(&self) -> usize {
self.inner.records_count().await
}

/// Records count per blob. Format: (blob_id, count). Last value is from active blob.
pub async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.inner.records_count_detailed().await
}

/// Records count in active blob. Returns None if active blob not set or any IO error occured.
pub async fn records_count_in_active_blob(&self) -> Option<usize> {
self.inner.records_count_in_active_blob().await
}

fn filter_config(&self) -> BloomConfig {
self.inner.config.filter()
}
Expand Down Expand Up @@ -504,6 +519,18 @@ impl Inner {
dir,
))
}

pub(crate) async fn records_count(&self) -> usize {
self.safe.lock().await.records_count().await
}

pub(crate) async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.safe.lock().await.records_count_detailed().await
}

pub(crate) async fn records_count_in_active_blob(&self) -> Option<usize> {
self.safe.lock().await.records_count_in_active_blob().await
}
}

impl Safe {
Expand All @@ -520,6 +547,37 @@ impl Safe {
let blobs_max_id = self.blobs.last().map(Blob::id);
active_blob_id.max(blobs_max_id)
}

pub(crate) async fn records_count(&self) -> usize {
let details = self.records_count_detailed().await;
details.iter().fold(0, |acc, (_, count)| acc + count)
}

pub(crate) async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
let mut results = Vec::new();
for blob in self.blobs.iter() {
let count = blob.records_count().await;
if let Ok(c) = count {
let value = (blob.id(), c);
debug!("push: {:?}", value);
results.push(value);
}
}
if let Some(count) = self.records_count_in_active_blob().await {
let value = (self.blobs.len(), count);
debug!("push: {:?}", value);
results.push(value);
}
results
}

pub(crate) async fn records_count_in_active_blob(&self) -> Option<usize> {
if let Some(ref blob) = self.active_blob {
blob.records_count().await.ok()
} else {
None
}
}
}

fn launch_observer(inner: Inner) {
Expand Down
67 changes: 67 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,70 @@ fn meta_with(version: &str) -> Meta {
meta.insert("version".to_owned(), version);
meta
}

#[tokio::test]
async fn test_records_count() {
let now = Instant::now();
let path = common::init("records_count");
let storage = common::create_test_storage(&path, 20000).await.unwrap();

let count = 30;
let records = common::generate_records(count, 1_000);
for (key, data) in &records {
write_one(&storage, *key, data, None).await.unwrap();
delay_for(Duration::from_millis(10)).await;
}

assert_eq!(storage.records_count().await, count);
assert!(storage.records_count_in_active_blob().await < Some(count));

common::clean(storage, path)
.await
.expect("work dir clean failed");
warn!("elapsed: {:.3}", now.elapsed().as_secs_f64());
}

#[tokio::test]
async fn test_records_count_in_active() {
let now = Instant::now();
let path = common::init("records_count_in_active");
let storage = common::create_test_storage(&path, 20000).await.unwrap();

let count = 10;
let records = common::generate_records(count, 1_000);
for (key, data) in &records {
write_one(&storage, *key, data, None).await.unwrap();
delay_for(Duration::from_millis(10)).await;
}

assert_eq!(storage.records_count_in_active_blob().await, Some(count));

common::clean(storage, path)
.await
.expect("work dir clean failed");
warn!("elapsed: {:.3}", now.elapsed().as_secs_f64());
}

#[tokio::test]
async fn test_records_count_detailed() {
let now = Instant::now();
let path = common::init("records_count_detailed");
let storage = common::create_test_storage(&path, 20000).await.unwrap();

let count = 30;
let records = common::generate_records(count, 1000);
for (key, data) in &records {
write_one(&storage, *key, data, None).await.unwrap();
delay_for(Duration::from_millis(10)).await;
}
delay_for(Duration::from_millis(100)).await;
assert_eq!(
storage.records_count_detailed().await,
vec![(0, 19), (1, 11)]
);

common::clean(storage, path)
.await
.expect("work dir clean failed");
warn!("elapsed: {:.3}", now.elapsed().as_secs_f64());
}

0 comments on commit 1318f48

Please sign in to comment.