Skip to content

Commit

Permalink
feat(redis): Implement parallel cmd execution of Redis calls (#4118)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Oct 14, 2024
1 parent 2f2e7d0 commit e4761f0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073))
- Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068))
- Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080))
- Add parallelization in Redis commands. ([#4118](https://github.com/getsentry/relay/pull/4118))

## 24.9.0

Expand Down
95 changes: 52 additions & 43 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::error::Error;
use std::fmt;
use std::time::Duration;

use r2d2::{Builder, ManageConnection, Pool, PooledConnection};
pub use redis;
use redis::ConnectionLike;
use std::error::Error;
use std::thread::Scope;
use std::time::Duration;
use std::{fmt, thread};
use thiserror::Error;

use crate::config::RedisConfigOptions;
Expand All @@ -25,6 +25,30 @@ pub enum RedisError {
Redis(#[source] redis::RedisError),
}

fn log_secondary_redis_error<T>(result: redis::RedisResult<T>) {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"sending cmds to the secondary Redis instance failed",
);
}
}

fn spawn_secondary_thread<'scope, 'env: 'scope, T>(
scope: &'scope Scope<'scope, 'env>,
block: impl FnOnce() -> redis::RedisResult<T> + Send + 'scope,
) {
let result = thread::Builder::new().spawn_scoped(scope, move || {
log_secondary_redis_error(block());
});
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"spawning the thread for the secondary Redis connection failed",
);
}
}

enum ConnectionInner<'a> {
Cluster(&'a mut redis::cluster::ClusterConnection),
MultiWrite {
Expand All @@ -37,24 +61,17 @@ enum ConnectionInner<'a> {
impl ConnectionLike for ConnectionInner<'_> {
fn req_packed_command(&mut self, cmd: &[u8]) -> redis::RedisResult<redis::Value> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_command(cmd),
ConnectionInner::Cluster(con) => con.req_packed_command(cmd),
ConnectionInner::Single(con) => con.req_packed_command(cmd),
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_command(cmd);
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_command(cmd) {
relay_log::error!(
error = &error as &dyn Error,
"sending cmd to the secondary Redis instance failed",
);
}
primary,
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
spawn_secondary_thread(s, || connection.req_packed_command(cmd))
}

primary_result
}
ConnectionInner::Single(ref mut con) => con.req_packed_command(cmd),
primary.req_packed_command(cmd)
}),
}
}

Expand All @@ -65,58 +82,50 @@ impl ConnectionLike for ConnectionInner<'_> {
count: usize,
) -> redis::RedisResult<Vec<redis::Value>> {
match self {
ConnectionInner::Cluster(ref mut con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::Cluster(con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::Single(con) => con.req_packed_commands(cmd, offset, count),
ConnectionInner::MultiWrite {
primary: primary_connection,
secondaries: secondary_connections,
} => {
let primary_result = primary_connection.req_packed_commands(cmd, offset, count);
for secondary_connection in secondary_connections.iter_mut() {
if let Err(error) = secondary_connection.req_packed_commands(cmd, offset, count)
{
relay_log::error!(
error = &error as &dyn Error,
"sending cmds to the secondary Redis instance failed",
);
}
primary,
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
spawn_secondary_thread(s, || connection.req_packed_commands(cmd, offset, count))
}

primary_result
}
ConnectionInner::Single(ref mut con) => con.req_packed_commands(cmd, offset, count),
primary.req_packed_commands(cmd, offset, count)
}),
}
}

fn get_db(&self) -> i64 {
match self {
ConnectionInner::Cluster(ref con) => con.get_db(),
ConnectionInner::Cluster(con) => con.get_db(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.get_db(),
ConnectionInner::Single(ref con) => con.get_db(),
ConnectionInner::Single(con) => con.get_db(),
}
}

fn check_connection(&mut self) -> bool {
match self {
ConnectionInner::Cluster(ref mut con) => con.check_connection(),
ConnectionInner::Cluster(con) => con.check_connection(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.check_connection(),
ConnectionInner::Single(ref mut con) => con.check_connection(),
ConnectionInner::Single(con) => con.check_connection(),
}
}

fn is_open(&self) -> bool {
match self {
ConnectionInner::Cluster(ref con) => con.is_open(),
ConnectionInner::Cluster(con) => con.is_open(),
ConnectionInner::MultiWrite {
primary: primary_connection,
..
} => primary_connection.is_open(),
ConnectionInner::Single(ref con) => con.is_open(),
ConnectionInner::Single(con) => con.is_open(),
}
}
}
Expand Down

0 comments on commit e4761f0

Please sign in to comment.