Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Oct 9, 2024
1 parent e9dff31 commit 0521a82
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::error::Error;
use std::time::Duration;
use std::{fmt, thread};

use r2d2::{Builder, ManageConnection, Pool, PooledConnection};
pub use redis;
use redis::ConnectionLike;
use std::error::Error;
use std::thread::ScopedJoinHandle;
use std::time::Duration;
use std::{fmt, io, thread};
use thiserror::Error;

use crate::config::RedisConfigOptions;
Expand All @@ -25,7 +25,7 @@ pub enum RedisError {
Redis(#[source] redis::RedisError),
}

fn log_secondary<T>(result: redis::RedisResult<T>) {
fn log_secondary_redis_error<T>(result: redis::RedisResult<T>) {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
Expand All @@ -34,6 +34,15 @@ fn log_secondary<T>(result: redis::RedisResult<T>) {
}
}

fn log_secondary_thread_error<T>(result: io::Result<T>) {
if let Err(error) = result {
relay_log::error!(
error = &error as &dyn Error,
"spawning the thread for the secondary Redis connection failed",
);
}
}

enum ConnectionInner<'a> {
Cluster(&'a mut redis::cluster::ClusterConnection),
MultiWrite {
Expand All @@ -53,7 +62,10 @@ impl ConnectionLike for ConnectionInner<'_> {
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
s.spawn(move || log_secondary(connection.req_packed_command(cmd)));
let result = thread::Builder::new().spawn_scoped(s, move || {
log_secondary_redis_error(connection.req_packed_command(cmd));
});
log_secondary_thread_error(result);
}
primary.req_packed_command(cmd)
}),
Expand All @@ -74,9 +86,12 @@ impl ConnectionLike for ConnectionInner<'_> {
secondaries,
} => thread::scope(|s| {
for connection in secondaries {
s.spawn(move || {
log_secondary(connection.req_packed_commands(cmd, offset, count))
let result = thread::Builder::new().spawn_scoped(s, move || {
log_secondary_redis_error(
connection.req_packed_commands(cmd, offset, count),
);
});
log_secondary_thread_error(result);
}
primary.req_packed_commands(cmd, offset, count)
}),
Expand Down

0 comments on commit 0521a82

Please sign in to comment.