Skip to content

Commit

Permalink
Add Configurable Max Retries for get_config in ConfigService (#242)
Browse files Browse the repository at this point in the history
* add max_retries

* update max_retries type from i32 to u32.
  • Loading branch information
451846939 committed Aug 12, 2024
1 parent b648851 commit 2f6265c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ pub enum Error {

#[error("Wrong server address: {0}")]
WrongServerAddress(String),

#[error("Exceeded maximum retry attempts: {0}")]
MaxRetriesExceeded(u32),
}
14 changes: 14 additions & 0 deletions src/api/props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct ClientProps {
client_version: String,
/// auth context
auth_context: HashMap<String, String>,
/// max retries
max_retries: Option<u32>,
}

impl ClientProps {
Expand Down Expand Up @@ -129,6 +131,10 @@ impl ClientProps {

Ok(result)
}

pub(crate) fn get_max_retries(&self) -> Option<u32> {
self.max_retries
}
}

#[allow(clippy::new_without_default)]
Expand All @@ -149,6 +155,7 @@ impl ClientProps {
client_version,
auth_context: HashMap::default(),
grpc_port: None,
max_retries: None,
}
}

Expand Down Expand Up @@ -221,6 +228,12 @@ impl ClientProps {
self.auth_context.insert(key.into(), val.into());
self
}

/// Sets the max retries.
pub fn max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = Some(max_retries);
self
}
}

#[cfg(test)]
Expand All @@ -242,6 +255,7 @@ mod tests {
labels: HashMap::new(),
client_version: "test_version".to_string(),
auth_context: HashMap::new(),
max_retries: None,
};

let result = client_props.get_server_list();
Expand Down
8 changes: 8 additions & 0 deletions src/common/remote/grpc/nacos_grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub(crate) struct NacosGrpcClientBuilder {
disconnected_listener: Option<DisconnectedListener>,
unary_call_layer: Option<DynamicUnaryCallLayer>,
bi_call_layer: Option<DynamicBiStreamingCallLayer>,
max_retries: Option<u32>,
}

impl NacosGrpcClientBuilder {
Expand All @@ -89,6 +90,7 @@ impl NacosGrpcClientBuilder {
disconnected_listener: None,
unary_call_layer: None,
bi_call_layer: None,
max_retries: None,
}
}

Expand Down Expand Up @@ -117,6 +119,11 @@ impl NacosGrpcClientBuilder {
Self { ..self }
}

pub(crate) fn max_retries(mut self, max_retries: Option<u32>) -> Self {
self.max_retries = max_retries;
Self { ..self }
}

pub(crate) fn support_remote_connection(mut self, enable: bool) -> Self {
self.client_abilities.support_remote_connection(enable);
Self { ..self }
Expand Down Expand Up @@ -359,6 +366,7 @@ impl NacosGrpcClientBuilder {
self.namespace,
self.labels,
self.client_abilities,
self.max_retries,
);

if let Some(connected_listener) = self.connected_listener {
Expand Down
16 changes: 13 additions & 3 deletions src/common/remote/grpc/nacos_grpc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type ConnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;
type DisconnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;

type HandlerMap = HashMap<String, Arc<dyn ServerRequestHandler>>;
const MAX_RETRY: i32 = 6;
const MAX_RETRY: u32 = 6;

fn sleep_time(retry_count: i32) -> i32 {
fn sleep_time(retry_count: u32) -> u32 {
if retry_count > MAX_RETRY {
1 << (retry_count % MAX_RETRY)
} else {
Expand All @@ -58,11 +58,12 @@ where
state: State<M::Future, M::Service>,
health: Arc<AtomicBool>,
connection_id: Option<String>,
retry_count: i32,
retry_count: u32,
connection_id_watcher: (
watch::Sender<Option<String>>,
watch::Receiver<Option<String>>,
),
max_retries: Option<u32>,
}

impl<M> NacosGrpcConnection<M>
Expand All @@ -82,6 +83,7 @@ where
namespace: String,
labels: HashMap<String, String>,
client_abilities: NacosClientAbilities,
max_retries: Option<u32>,
) -> Self {
let connection_id_watcher = watch::channel(None);

Expand All @@ -98,6 +100,7 @@ where
connection_id: None,
retry_count: 0,
connection_id_watcher,
max_retries,
}
}

Expand Down Expand Up @@ -480,6 +483,13 @@ where
debug_span!(parent: None, "grpc_connection", id = self.id.clone()).entered();

loop {
if let Some(max_retries) = self.max_retries {
if self.retry_count > max_retries {
error!("Exceeded maximum retry attempts: {}", max_retries);
return Poll::Ready(Err(Self::Error::MaxRetriesExceeded(max_retries)));
}
}

match self.state {
State::Idle => {
info!("create new connection.");
Expand Down
1 change: 1 addition & 0 deletions src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl ConfigWorker {
))
.unary_call_layer(auth_layer.clone())
.bi_call_layer(auth_layer)
.max_retries(client_props.get_max_retries())
.build(client_id);

let remote_client = Arc::new(remote_client);
Expand Down
1 change: 1 addition & 0 deletions src/naming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl NacosNamingService {
})
.unary_call_layer(auth_layer.clone())
.bi_call_layer(auth_layer)
.max_retries(client_props.get_max_retries())
.build(client_id.clone());

let nacos_grpc_client = Arc::new(nacos_grpc_client);
Expand Down

0 comments on commit 2f6265c

Please sign in to comment.