Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Redis crate and use pipeline to update counters #382

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ metrics = "0.22.3"

# Optional dependencies
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.25", optional = true, features = [
redis = { version = "0.27", optional = true, features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
"tokio-native-tls-comp",
"script",
] }
r2d2 = { version = "0.8", optional = true }
tokio = { version = "1", optional = true, features = [
Expand All @@ -62,8 +63,8 @@ time = "0.3.36"
[dev-dependencies]
serial_test = "3.0"
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
redis-test = { version = "0.4.0", features = ["aio"] }
redis = { version = "0.25", features = [
redis-test = { version = "0.6.0", features = ["aio"] }
redis = { version = "0.27", features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
Expand Down
67 changes: 49 additions & 18 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use redis::{AsyncCommands, RedisError};
use redis::{AsyncCommands, ErrorKind, RedisError};
use std::collections::HashSet;
use std::ops::Deref;
use std::str::FromStr;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, ()>(&mut con)
.invoke_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;

Expand Down Expand Up @@ -112,17 +112,35 @@ impl AsyncCounterStorage for AsyncRedisStorage {
}
}

// TODO: this can be optimized by using pipelines with multiple updates
for (counter_idx, key) in counter_keys.into_iter().enumerate() {
let script = redis::Script::new(SCRIPT_UPDATE_COUNTER);
let mut pipeline = redis::pipe();
let mut pipeline = &mut pipeline;
for (counter_idx, key) in counter_keys.iter().enumerate() {
let counter = &counters[counter_idx];
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.instrument(info_span!("datastore"))
.await?
pipeline = pipeline
.invoke_script(
script
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta),
)
.ignore()
}
if let Err(err) = pipeline
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await
{
if err.kind() == ErrorKind::NoScriptError {
script.prepare_invoke().load_async(&mut con).await?;
pipeline
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;
} else {
Err(err)?;
}
}

Ok(Authorization::Ok)
Expand Down Expand Up @@ -191,7 +209,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
async fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();
redis::cmd("FLUSHDB")
.query_async::<_, ()>(&mut con)
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;
Ok(())
Expand All @@ -201,17 +219,23 @@ impl AsyncCounterStorage for AsyncRedisStorage {
impl AsyncRedisStorage {
pub async fn new(redis_url: &str) -> Result<Self, RedisError> {
let info = ConnectionInfo::from_str(redis_url)?;
Ok(Self {
conn_manager: ConnectionManager::new(
Self::new_with_conn_manager(
ConnectionManager::new(
redis::Client::open(info)
.expect("This couldn't fail in the past, yet now it did somehow!"),
)
.await?,
})
)
.await
}

pub fn new_with_conn_manager(conn_manager: ConnectionManager) -> Self {
Self { conn_manager }
pub async fn new_with_conn_manager(
conn_manager: ConnectionManager,
) -> Result<Self, RedisError> {
let store = Self { conn_manager };
store.load_script(SCRIPT_UPDATE_COUNTER).await?;
store.load_script(VALUES_AND_TTLS).await?;
Ok(store)
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
Expand All @@ -231,6 +255,13 @@ impl AsyncRedisStorage {

Ok(())
}

pub(super) async fn load_script(&self, script: &str) -> Result<(), RedisError> {
let mut con = self.conn_manager.clone();
let script = redis::Script::new(script);
script.prepare_invoke().load_async(&mut con).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
24 changes: 13 additions & 11 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::storage::redis::{
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::gauge;
use redis::aio::{ConnectionLike, ConnectionManager};
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig};
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
Expand Down Expand Up @@ -170,15 +170,13 @@ impl CachedRedisStorage {
response_timeout: Duration,
) -> Result<Self, RedisError> {
let info = ConnectionInfo::from_str(redis_url)?;
let redis_conn_manager = ConnectionManager::new_with_backoff_and_timeouts(
let redis_conn_manager = ConnectionManager::new_with_config(
redis::Client::open(info)
.expect("This couldn't fail in the past, yet now it did somehow!"),
2,
100,
1,
response_timeout,
// TLS handshake might result in an additional 2 RTTs to Redis, adding some headroom as well
(response_timeout * 3) + Duration::from_millis(50),
ConnectionManagerConfig::default()
.set_connection_timeout((response_timeout * 3) + Duration::from_millis(50))
.set_response_timeout(response_timeout)
.set_number_of_retries(1),
)
.await?;

Expand All @@ -189,7 +187,7 @@ impl CachedRedisStorage {
let counters_cache = Arc::new(cached_counters);
let partitioned = Arc::new(AtomicBool::new(false));
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()).await?;

{
let counters_cache_clone = counters_cache.clone();
Expand All @@ -208,6 +206,10 @@ impl CachedRedisStorage {
});
}

async_redis_storage
.load_script(BATCH_UPDATE_COUNTERS)
.await?;

Ok(Self {
cached_counters: counters_cache,
async_redis_storage,
Expand Down Expand Up @@ -456,7 +458,7 @@ mod tests {
counters_and_deltas.insert(counter.clone(), arc);

let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1));
let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(NEW_VALUE_FROM_REDIS as i64),
Value::Int(
one_sec_from_now
Expand Down Expand Up @@ -510,7 +512,7 @@ mod tests {
Default::default(),
);

let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(8),
Value::Int(
SystemTime::now()
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CounterStorage for RedisStorage {
#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_pool.get()?;
redis::cmd("FLUSHDB").execute(&mut *con);
redis::cmd("FLUSHDB").exec(&mut *con)?;
Ok(())
}
}
Expand Down
Loading