Skip to content

Commit 0bac319

Browse files
authored
Add retry mechanism (#102)
1 parent 3646b8a commit 0bac319

File tree

6 files changed

+47
-17
lines changed

6 files changed

+47
-17
lines changed

dsh_sdk/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.4.11] -2024-09-30
9+
### Fixed
10+
- Retry mechanism for when PKI endpoint is not yet avaialble during rolling restart DSH ([#101](https://github.com/kpn-dsh/dsh-sdk-platform-rs/issues/101))
11+
812
## [0.4.10] -2024-09-30
913
### Added
1014
- Add new with client methods to REST and MQTTT token fetcher

dsh_sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ license.workspace = true
99
name = "dsh_sdk"
1010
readme = 'README.md'
1111
repository.workspace = true
12-
version = "0.4.10"
12+
version = "0.4.11"
1313

1414
[package.metadata.docs.rs]
1515
all-features = true

dsh_sdk/src/dsh/bootstrap.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub(crate) fn bootstrap(
2424
) -> Result<Cert, DshError> {
2525
let dsh_config = DshConfig::new(config_host, tenant_name, task_id)?;
2626
let client = reqwest_ca_client(dsh_config.dsh_ca_certificate.as_bytes())?;
27-
let dn = DshBootstapCall::Dn(&dsh_config).perform_call(&client)?;
27+
let dn = DshBootstapCall::Dn(&dsh_config).retryable_call(&client)?;
2828
let dn = Dn::parse_string(&dn)?;
2929
let certificates = Cert::get_signed_client_cert(dn, &dsh_config, &client)?;
3030
info!("Successfully connected to DSH");
@@ -115,7 +115,7 @@ impl DshBootstapCall<'_> {
115115
}
116116
}
117117

118-
pub(crate) fn perform_call(&self, client: &Client) -> Result<String, DshError> {
118+
fn perform_call(&self, client: &Client) -> Result<String, DshError> {
119119
let response = self.request_builder(client).send()?;
120120
if !response.status().is_success() {
121121
return Err(DshError::DshCallError {
@@ -126,6 +126,28 @@ impl DshBootstapCall<'_> {
126126
}
127127
Ok(response.text()?)
128128
}
129+
130+
pub(crate) fn retryable_call(&self, client: &Client) -> Result<String, DshError> {
131+
let mut retries = 0;
132+
loop {
133+
match self.perform_call(client) {
134+
Ok(response) => return Ok(response),
135+
Err(err) => {
136+
if retries >= 30 {
137+
return Err(err);
138+
}
139+
retries += 1;
140+
// sleep exponentially
141+
let sleep: u64 = std::cmp::min(2u64.pow(retries), 60);
142+
log::warn!(
143+
"Retrying call to DSH in {sleep} seconds due to error: {}",
144+
crate::error::report(&err)
145+
);
146+
std::thread::sleep(std::time::Duration::from_secs(sleep));
147+
}
148+
}
149+
}
150+
}
129151
}
130152

131153
/// Struct to parse DN string into separate fields.

dsh_sdk/src/dsh/certificates.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl Cert {
7171
config: dsh_config,
7272
csr: &csr.pem()?,
7373
}
74-
.perform_call(client)?;
74+
.retryable_call(client)?;
7575
let ca_cert = pem::parse_many(dsh_config.dsh_ca_certificate())?;
7676
let client_cert = pem::parse_many(client_certificate)?;
7777
Ok(Self::new(

dsh_sdk/src/dsh/properties.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,21 @@ impl Properties {
129129
}
130130
};
131131
let task_id = utils::get_env_var(VAR_TASK_ID).unwrap_or("local_task_id".to_string());
132-
let config_host = utils::get_env_var(VAR_KAFKA_CONFIG_HOST)
133-
.map(|host| format!("https://{}", host))
134-
.unwrap_or_else(|_| {
135-
warn!(
136-
"{} is not set, using default value {}",
137-
VAR_KAFKA_CONFIG_HOST, DEFAULT_CONFIG_HOST
138-
);
139-
DEFAULT_CONFIG_HOST.to_string()
140-
});
132+
let config_host =
133+
utils::get_env_var(VAR_KAFKA_CONFIG_HOST).map(|host| format!("https://{}", host));
141134
let certificates = if let Ok(cert) = pki_config_dir::get_pki_cert() {
142135
Some(cert)
143-
} else {
144-
bootstrap(&config_host, &tenant_name, &task_id)
136+
} else if let Ok(config_host) = &config_host {
137+
bootstrap(config_host, &tenant_name, &task_id)
145138
.inspect_err(|e| {
146139
warn!("Could not bootstrap to DSH, due to: {}", e);
147140
})
148141
.ok()
142+
} else {
143+
warn!("Could not bootstrap to DSH, as it does not seem to be running on DSH due to missing enivironment variables");
144+
None
149145
};
146+
let config_host = config_host.unwrap_or(DEFAULT_CONFIG_HOST.to_string()); // Default is for running on local machine with VPN
150147
let fetched_datastreams = certificates.as_ref().and_then(|cert| {
151148
cert.reqwest_blocking_client_config()
152149
.ok()

dsh_sdk/src/error.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
//! Error types for the DSH SDK
2-
31
use thiserror::Error;
42

53
#[derive(Error, Debug)]
@@ -55,6 +53,15 @@ pub enum DshError {
5553
HyperError(#[from] hyper::http::Error),
5654
}
5755

56+
pub(crate) fn report(mut err: &dyn std::error::Error) -> String {
57+
let mut s = format!("{}", err);
58+
while let Some(src) = err.source() {
59+
s.push_str(&format!("\n\nCaused by: {}", src));
60+
err = src;
61+
}
62+
s
63+
}
64+
5865
#[cfg(feature = "rest-token-fetcher")]
5966
#[derive(Error, Debug)]
6067
#[non_exhaustive]

0 commit comments

Comments
 (0)