From 3c6af88cd461b2db13cdd26ac2ecd47eaa080893 Mon Sep 17 00:00:00 2001 From: KAAANG <79990647+SAKURA-CAT@users.noreply.github.com> Date: Tue, 26 Nov 2024 20:57:52 +0800 Subject: [PATCH 1/3] feat: license --- LICENSE | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5f9c837 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 KAAANG + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file From 750f6b1495cdde83e4d57b2f9893698e52952e7e Mon Sep 17 00:00:00 2001 From: KAAANG <79990647+SAKURA-CAT@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:09:54 +0800 Subject: [PATCH 2/3] feat: bg task --- src/db.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/src/db.rs b/src/db.rs index f456f9c..584871e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use std::collections::{BTreeSet, HashMap}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::sync::Notify; use tokio::time::Instant; /// A wrapper around a `Db` instance. @@ -24,11 +25,11 @@ pub(crate) struct Db { #[derive(Debug)] struct Shared { state: Mutex, - // TODO background task to remove expired entries + bg_task_notify: Notify, } /// DB state entry. -#[derive(Debug)] +#[derive(Debug, Default)] struct State { entries: HashMap, /// Tracks key TTLs. @@ -66,7 +67,10 @@ impl Db { entries: HashMap::new(), expirations: BTreeSet::new(), }), + bg_task_notify: Notify::new(), }); + // Create a background task to purge expired keys. + tokio::spawn(purge_expired_keys(shared.clone())); Db { shared } } @@ -92,20 +96,53 @@ impl Db { if let Some(expires_at) = expires_at { state.expirations.insert((expires_at, key)); } + + // Notify the background task to check the expiration time. + // Before notifying, we need to drop the lock to avoid deadlock. + drop(state); + + self.shared.bg_task_notify.notify_one(); } pub(crate) fn get(&self, key: &str) -> Option { - let mut state = self.shared.state.lock().unwrap(); + let state = self.shared.state.lock().unwrap(); let entry = state.entries.get(key)?; - if let Some(expires_at) = entry.expires_at { - if Instant::now() >= expires_at { - // Entry has expired, remove it. - // TODO It will be better to use a background task to remove expired entries. - state.entries.remove(key); - state.expirations.remove(&(expires_at, key.to_string())); - return None; + Some(entry.data.clone()) + } +} + +impl Shared { + /// Remove expired keys. And return the next expiration time if any. + pub(crate) fn purge_expired_keys(&self) -> Option { + let mut state = self.state.lock().unwrap(); + let now = Instant::now(); + // This is needed to make the borrow checker happy. + // `state.expirations.iter()` borrows `state` immutably, but `state.entries.remove` borrows `state` mutably. + // So we need to split the borrow and make sure the mutable borrow is dropped before the immutable borrow. + let state = &mut *state; + if let Some(&(when, ref key)) = state.expirations.iter().next() { + if when > now { + // No more keys to expire. + return Some(when); } + state.entries.remove(key); + state.expirations.remove(&(when, key.clone())); + } + None + } +} + +async fn purge_expired_keys(shared: Arc) { + loop { + if let Some(when) = shared.purge_expired_keys() { + // Wait until the next key expires, or notified by someone. + tokio::select! { + _ = tokio::time::sleep_until(when) => {} + _ = shared.bg_task_notify.notified() => {} + }; + } else { + // Wait until notified by someone. + shared.bg_task_notify.notified().await; } - Some(entry.data.clone()) } } From 1db7f3d64271aeba13238bf7acac46f992ba41a0 Mon Sep 17 00:00:00 2001 From: KAAANG <79990647+SAKURA-CAT@users.noreply.github.com> Date: Wed, 27 Nov 2024 12:37:03 +0800 Subject: [PATCH 3/3] feat: opt bg task --- src/db.rs | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 159 insertions(+), 8 deletions(-) diff --git a/src/db.rs b/src/db.rs index 584871e..b2ce300 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,8 @@ use bytes::Bytes; use std::collections::{BTreeSet, HashMap}; use std::sync::{Arc, Mutex}; -use std::time::Duration; use tokio::sync::Notify; -use tokio::time::Instant; +use tokio::time::{self, Duration, Instant}; /// A wrapper around a `Db` instance. #[derive(Debug)] @@ -76,7 +75,15 @@ impl Db { pub(crate) fn set(&self, key: String, value: Bytes, expire: Option) { let mut state = self.shared.state.lock().unwrap(); - let expires_at = expire.map(|d| Instant::now() + d); + // In addition to reduce the bg task's work, we need to judge this key is the next expiration time. + let mut notify = false; + let expires_at = expire.map(|d| { + let when = Instant::now() + d; + // If the new key is the next expiration time, notify the bg task. + // First key or earlier than the current next expiration time. + notify = state.next_expiration().map(|t| t > when).unwrap_or(true); + when + }); // Insert the entry into the `HashMap`. let prev = state.entries.insert( key.clone(), @@ -101,7 +108,10 @@ impl Db { // Before notifying, we need to drop the lock to avoid deadlock. drop(state); - self.shared.bg_task_notify.notify_one(); + if notify { + // Only notify the background task if it needs + self.shared.bg_task_notify.notify_one(); + } } pub(crate) fn get(&self, key: &str) -> Option { @@ -111,6 +121,50 @@ impl Db { } } +#[cfg(test)] +mod test_db { + use crate::db::Db; + use bytes::Bytes; + use std::time::Duration; + + #[tokio::test] + async fn test_set_get() { + let db = Db::new(); + db.set("key1".to_string(), Bytes::from("value1"), None); + db.set("key2".to_string(), Bytes::from("value2"), Some(Duration::from_secs(1))); + + assert_eq!(db.get("key1").unwrap(), Bytes::from("value1")); + assert_eq!(db.get("key2").unwrap(), Bytes::from("value2")); + } + + #[tokio::test] + async fn test_expire() { + let db = Db::new(); + db.set( + "key1".to_string(), + Bytes::from("value1"), + Some(Duration::from_millis(100)), + ); + db.set( + "key2".to_string(), + Bytes::from("value2"), + Some(Duration::from_millis(200)), + ); + + assert_eq!(db.get("key1").unwrap(), Bytes::from("value1")); + assert_eq!(db.get("key2").unwrap(), Bytes::from("value2")); + + tokio::time::sleep(Duration::from_millis(110)).await; + + assert_eq!(db.get("key1"), None); + assert_eq!(db.get("key2").unwrap(), Bytes::from("value2")); + + tokio::time::sleep(Duration::from_millis(110)).await; + + assert_eq!(db.get("key2"), None); + } +} + impl Shared { /// Remove expired keys. And return the next expiration time if any. pub(crate) fn purge_expired_keys(&self) -> Option { @@ -120,15 +174,81 @@ impl Shared { // `state.expirations.iter()` borrows `state` immutably, but `state.entries.remove` borrows `state` mutably. // So we need to split the borrow and make sure the mutable borrow is dropped before the immutable borrow. let state = &mut *state; - if let Some(&(when, ref key)) = state.expirations.iter().next() { + let when = if let Some(&(when, ref key)) = state.expirations.iter().next() { if when > now { // No more keys to expire. return Some(when); } state.entries.remove(key); state.expirations.remove(&(when, key.clone())); + // Return the next expiration time if any. + // It's different from the mini-redis, which always returns None. + state.expirations.iter().next().map(|x| x.0) + } else { + None + }; + when + } +} + +#[cfg(test)] +mod test_shared { + use crate::db::{Db, Shared}; + use bytes::Bytes; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use tokio::time::Instant; + + fn roughly_equal(a: Instant, b: Instant) -> bool { + a <= b + Duration::from_millis(10) && a >= b - Duration::from_millis(10) + } + + impl Db { + fn delete(&self, key: &str) -> Option { + let mut state = self.shared.state.lock().unwrap(); + let entry = state.entries.remove(key)?; + if let Some(expires_at) = entry.expires_at { + state.expirations.remove(&(expires_at, key.to_string())); + } + Some(entry.data) } - None + } + + #[tokio::test] + async fn test_purge_expired_keys() { + let shared = Arc::new(Shared { + state: Mutex::new(crate::db::State { + entries: std::collections::HashMap::new(), + expirations: std::collections::BTreeSet::new(), + }), + bg_task_notify: tokio::sync::Notify::new(), + }); + let db = Db { shared: shared.clone() }; + + // Insert a key that will expire in 1 second. + let first_when = Duration::from_secs(1); + let second_when = Duration::from_secs(2); + db.set("key1".to_string(), Bytes::from("value1"), Some(first_when)); + // Insert a key that will expire in 2 seconds. + db.set("key2".to_string(), Bytes::from("value2"), Some(second_when)); + + // The first key should expire in 1 second. + assert!( + roughly_equal(shared.purge_expired_keys().unwrap(), Instant::now() + first_when), + "first key should expire in 1 second" + ); + + // delete the first key + db.delete("key1"); + + assert!( + roughly_equal(shared.purge_expired_keys().unwrap(), Instant::now() + second_when), + "second key should expire in 2 seconds" + ); + // delete the second key + db.delete("key2"); + // No more keys to expire. + assert_eq!(shared.purge_expired_keys(), None); } } @@ -137,12 +257,43 @@ async fn purge_expired_keys(shared: Arc) { if let Some(when) = shared.purge_expired_keys() { // Wait until the next key expires, or notified by someone. tokio::select! { - _ = tokio::time::sleep_until(when) => {} + _ = time::sleep_until(when) => {}, _ = shared.bg_task_notify.notified() => {} - }; + } } else { // Wait until notified by someone. shared.bg_task_notify.notified().await; } } } + +impl State { + fn next_expiration(&self) -> Option { + self.expirations.iter().next().map(|x| x.0) + } +} + +#[cfg(test)] +mod test_state { + use crate::db::State; + use std::collections::{BTreeSet, HashMap}; + use tokio::time::Instant; + + #[test] + fn test_next_expiration() { + let mut state = State { + entries: HashMap::new(), + expirations: BTreeSet::new(), + }; + assert_eq!(state.next_expiration(), None); + + let now = Instant::now(); + state.expirations.insert((now, "key1".to_string())); + assert_eq!(state.next_expiration(), Some(now)); + + // Only return the earliest expiration time. + let next_now = Instant::now(); + state.expirations.insert((next_now, "key2".to_string())); + assert_eq!(state.next_expiration(), Some(now)); + } +}