From 0521a822a61aec1394c86b69fdf2e5dd63710814 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 9 Oct 2024 15:55:14 +0200 Subject: [PATCH] Fix --- relay-redis/src/real.rs | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/relay-redis/src/real.rs b/relay-redis/src/real.rs index 2fc630f49d..1da3f91785 100644 --- a/relay-redis/src/real.rs +++ b/relay-redis/src/real.rs @@ -1,10 +1,10 @@ -use std::error::Error; -use std::time::Duration; -use std::{fmt, thread}; - use r2d2::{Builder, ManageConnection, Pool, PooledConnection}; pub use redis; use redis::ConnectionLike; +use std::error::Error; +use std::thread::ScopedJoinHandle; +use std::time::Duration; +use std::{fmt, io, thread}; use thiserror::Error; use crate::config::RedisConfigOptions; @@ -25,7 +25,7 @@ pub enum RedisError { Redis(#[source] redis::RedisError), } -fn log_secondary(result: redis::RedisResult) { +fn log_secondary_redis_error(result: redis::RedisResult) { if let Err(error) = result { relay_log::error!( error = &error as &dyn Error, @@ -34,6 +34,15 @@ fn log_secondary(result: redis::RedisResult) { } } +fn log_secondary_thread_error(result: io::Result) { + if let Err(error) = result { + relay_log::error!( + error = &error as &dyn Error, + "spawning the thread for the secondary Redis connection failed", + ); + } +} + enum ConnectionInner<'a> { Cluster(&'a mut redis::cluster::ClusterConnection), MultiWrite { @@ -53,7 +62,10 @@ impl ConnectionLike for ConnectionInner<'_> { secondaries, } => thread::scope(|s| { for connection in secondaries { - s.spawn(move || log_secondary(connection.req_packed_command(cmd))); + let result = thread::Builder::new().spawn_scoped(s, move || { + log_secondary_redis_error(connection.req_packed_command(cmd)); + }); + log_secondary_thread_error(result); } primary.req_packed_command(cmd) }), @@ -74,9 +86,12 @@ impl ConnectionLike for ConnectionInner<'_> { secondaries, } => thread::scope(|s| { for connection in secondaries { - s.spawn(move || { - log_secondary(connection.req_packed_commands(cmd, offset, count)) + let result = thread::Builder::new().spawn_scoped(s, move || { + log_secondary_redis_error( + connection.req_packed_commands(cmd, offset, count), + ); }); + log_secondary_thread_error(result); } primary.req_packed_commands(cmd, offset, count) }),