Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(redis): Implement parallel cmd execution of Redis calls #4118

Merged
merged 12 commits into from
Oct 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073))
- Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068))
- Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080))
- Add parallelization in Redis commands. ([#4118](https://github.com/getsentry/relay/pull/4118))

## 24.9.0

Expand Down
95 changes: 52 additions & 43 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::error::Error;
use std::fmt;
use std::time::Duration;

use r2d2::{Builder, ManageConnection, Pool, PooledConnection};
pub use redis;
use redis::ConnectionLike;
use std::error::Error;
use std::thread::Scope;
use std::time::Duration;
use std::{fmt, thread};
use thiserror::Error;

use crate::config::RedisConfigOptions;
Expand All @@ -25,6 +25,30 @@ pub enum RedisError {
Redis(#[source] redis::RedisError),
}

fn log_secondary_redis_error<T>(result: redis::RedisResult<T>) {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"sending cmds to the secondary Redis instance failed",
);
}
}

fn spawn_secondary_thread<'scope, 'env: 'scope, T>(
scope: &'scope Scope<'scope, 'env>,
block: impl FnOnce() -> redis::RedisResult<T> + Send + 'scope,
) {
let result = thread::Builder::new().spawn_scoped(scope, move || {
log_secondary_redis_error(block());
});
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"spawning the thread for the secondary Redis connection failed",
);
}
}

enum ConnectionInner<'a> {
Cluster(&'a mut redis::cluster::ClusterConnection),
MultiWrite {
Expand All @@ -37,24 +61,17 @@ enum ConnectionInner<'a> {
impl ConnectionLike for ConnectionInner<'_> {
fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult<redis::Value> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_command(cmd),
ConnectionInner::Cluster(con) => con.req_packed_command(cmd),
ConnectionInner::Single(con) => con.req_packed_command(cmd),
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_command(cmd);
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_command(cmd) {
relay_log::error!(
error = &error as &dyn Error,
"sending cmd to the secondary Redis instance failed",
);
}
primary,
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
spawn_secondary_thread(s, || connection.req_packed_command(cmd))
}

primary_result
}
ConnectionInner::Single(ref mut con) => con.req_packed_command(cmd),
primary.req_packed_command(cmd)
}),
}
}

Expand All @@ -65,58 +82,50 @@ impl ConnectionLike for ConnectionInner<'_> {
count: usize,
) -> redis::RedisResult<Vec<redis::Value>> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::Cluster(con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::Single(con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_commands(cmd, offset, count);
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_commands(cmd, offset, count)
{
relay_log::error!(
error = &error as &dyn Error,
"sending cmds to the secondary Redis instance failed",
);
}
primary,
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
spawn_secondary_thread(s, || connection.req_packed_commands(cmd, offset, count))
}

primary_result
}
ConnectionInner::Single(ref mut con) => con.req_packed_commands(cmd, offset, count),
primary.req_packed_commands(cmd, offset, count)
}),
}
}

fn get_db(&self) -> i64 {
match self {
ConnectionInner::Cluster(ref con) => con.get_db(),
ConnectionInner::Cluster(con) => con.get_db(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.get_db(),
ConnectionInner::Single(ref con) => con.get_db(),
ConnectionInner::Single(con) => con.get_db(),
}
}

fn check_connection(&mut self) -> bool {
match self {
ConnectionInner::Cluster(ref mut con) => con.check_connection(),
ConnectionInner::Cluster(con) => con.check_connection(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.check_connection(),
ConnectionInner::Single(ref mut con) => con.check_connection(),
ConnectionInner::Single(con) => con.check_connection(),
}
}

fn is_open(&self) -> bool {
match self {
ConnectionInner::Cluster(ref con) => con.is_open(),
ConnectionInner::Cluster(con) => con.is_open(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.is_open(),
ConnectionInner::Single(ref con) => con.is_open(),
ConnectionInner::Single(con) => con.is_open(),
}
}
}
Expand Down
Loading