diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e9333d66c..bc96f5f8af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073)) - Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068)) - Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080)) +- Add parallelization in Redis commands. ([#4118](https://github.com/getsentry/relay/pull/4118)) ## 24.9.0 diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index e535c43125..f47d4c929e 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -1,10 +1,10 @@ -use std::error::Error; -use std::fmt; -use std::time::Duration; - use r2d2::{Builder, ManageConnection, Pool, PooledConnection}; pub use redis; use redis::ConnectionLike; +use std::error::Error; +use std::thread::Scope; +use std::time::Duration; +use std::{fmt, thread}; use thiserror::Error; use crate::config::RedisConfigOptions; @@ -25,6 +25,30 @@ pub enum RedisError { Redis(#[source] redis::RedisError), } +fn log_secondary_redis_error(result: redis::RedisResult) { + if let Err(error) = result { + relay_log::error!( + error = &error as &dyn Error, + "sending cmds to the secondary Redis instance failed", + ); + } +} + +fn spawn_secondary_thread<'scope, 'env: 'scope, T>( + scope: &'scope Scope<'scope, 'env>, + block: impl FnOnce() -> redis::RedisResult + Send + 'scope, +) { + let result = thread::Builder::new().spawn_scoped(scope, move || { + log_secondary_redis_error(block()); + }); + if let Err(error) = result { + relay_log::error!( + error = &error as &dyn Error, + "spawning the thread for the secondary Redis connection failed", + ); + } +} + enum ConnectionInner<'a> { Cluster(&'a mut redis::cluster::ClusterConnection), MultiWrite { @@ -37,24 +61,17 @@ enum ConnectionInner<'a> { impl ConnectionLike for ConnectionInner<'_> { fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult { match self { - ConnectionInner::Cluster(ref mut con) => con.req_packed_command(cmd), + ConnectionInner::Cluster(con) => con.req_packed_command(cmd), + ConnectionInner::Single(con) => con.req_packed_command(cmd), ConnectionInner::MultiWrite { - primary: primary_connection, - secondaries: secondary_connections, - } => { - let primary_result = primary_connection.req_packed_command(cmd); - for secondary_connection in secondary_connections.iter_mut() { - if let Err(error) = secondary_connection.req_packed_command(cmd) { - relay_log::error!( - error = &error as &dyn Error, - "sending cmd to the secondary Redis instance failed", - ); - } + primary, + secondaries, + } => thread::scope(|s| { + for connection in secondaries { + spawn_secondary_thread(s, || connection.req_packed_command(cmd)) } - - primary_result - } - ConnectionInner::Single(ref mut con) => con.req_packed_command(cmd), + primary.req_packed_command(cmd) + }), } } @@ -65,58 +82,50 @@ impl ConnectionLike for ConnectionInner<'_> { count: usize, ) -> redis::RedisResult> { match self { - ConnectionInner::Cluster(ref mut con) => con.req_packed_commands(cmd, offset, count), + ConnectionInner::Cluster(con) => con.req_packed_commands(cmd, offset, count), + ConnectionInner::Single(con) => con.req_packed_commands(cmd, offset, count), ConnectionInner::MultiWrite { - primary: primary_connection, - secondaries: secondary_connections, - } => { - let primary_result = primary_connection.req_packed_commands(cmd, offset, count); - for secondary_connection in secondary_connections.iter_mut() { - if let Err(error) = secondary_connection.req_packed_commands(cmd, offset, count) - { - relay_log::error!( - error = &error as &dyn Error, - "sending cmds to the secondary Redis instance failed", - ); - } + primary, + secondaries, + } => thread::scope(|s| { + for connection in secondaries { + spawn_secondary_thread(s, || connection.req_packed_commands(cmd, offset, count)) } - - primary_result - } - ConnectionInner::Single(ref mut con) => con.req_packed_commands(cmd, offset, count), + primary.req_packed_commands(cmd, offset, count) + }), } } fn get_db(&self) -> i64 { match self { - ConnectionInner::Cluster(ref con) => con.get_db(), + ConnectionInner::Cluster(con) => con.get_db(), ConnectionInner::MultiWrite { primary: primary_connection, .. } => primary_connection.get_db(), - ConnectionInner::Single(ref con) => con.get_db(), + ConnectionInner::Single(con) => con.get_db(), } } fn check_connection(&mut self) -> bool { match self { - ConnectionInner::Cluster(ref mut con) => con.check_connection(), + ConnectionInner::Cluster(con) => con.check_connection(), ConnectionInner::MultiWrite { primary: primary_connection, .. } => primary_connection.check_connection(), - ConnectionInner::Single(ref mut con) => con.check_connection(), + ConnectionInner::Single(con) => con.check_connection(), } } fn is_open(&self) -> bool { match self { - ConnectionInner::Cluster(ref con) => con.is_open(), + ConnectionInner::Cluster(con) => con.is_open(), ConnectionInner::MultiWrite { primary: primary_connection, .. } => primary_connection.is_open(), - ConnectionInner::Single(ref con) => con.is_open(), + ConnectionInner::Single(con) => con.is_open(), } } }