(path: Option
) -> Result (pki_config_dir: Option ) -> Result (
prefix: &str,
@@ -136,22 +145,26 @@ mod tests {
use serial_test::serial;
const PKI_CONFIG_DIR: &str = "test_files/pki_config_dir";
- const PKI_KEY_FILE_NAME: &str = "client.key";
+ const PKI_KEY_FILE_PEM_NAME: &str = "client.key";
+ const PKI_KEY_FILE_DER_NAME: &str = "client-der.key";
const PKI_CERT_FILE_NAME: &str = "client.pem";
const PKI_CA_FILE_NAME: &str = "ca.crt";
fn create_test_pki_config_dir() {
let path = PathBuf::from(PKI_CONFIG_DIR);
- let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_NAME);
+ let path_key_pem = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_PEM_NAME);
+ let path_key_der = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_DER_NAME);
let path_cert = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CERT_FILE_NAME);
let path_ca = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CA_FILE_NAME);
- if path_key.exists() && path_cert.exists() && path_ca.exists() {
+ if path_key_pem.exists() && path_cert.exists() && path_ca.exists() && path_key_der.exists()
+ {
return;
}
let _ = std::fs::create_dir(path);
let priv_key = openssl::rsa::Rsa::generate(2048).unwrap();
let pkey = PKey::from_rsa(priv_key).unwrap();
- let key = pkey.private_key_to_pem_pkcs8().unwrap();
+ let key_pem = pkey.private_key_to_pem_pkcs8().unwrap();
+ let key_der = pkey.private_key_to_pkcs8().unwrap();
let mut x509_name = openssl::x509::X509NameBuilder::new().unwrap();
x509_name.append_entry_by_text("CN", "test_ca").unwrap();
let x509_name = x509_name.build();
@@ -168,7 +181,8 @@ mod tests {
let x509 = x509.build();
let ca_cert = x509.to_pem().unwrap();
let cert = x509.to_pem().unwrap();
- std::fs::write(path_key, key).unwrap();
+ std::fs::write(path_key_pem, key_pem).unwrap();
+ std::fs::write(path_key_der, key_der).unwrap();
std::fs::write(path_ca, ca_cert).unwrap();
std::fs::write(path_cert, cert).unwrap();
}
@@ -181,7 +195,7 @@ mod tests {
let result_cert = get_file_path_bufs("client", PkiFileType::Cert, &path).unwrap();
assert_eq!(result_cert.len(), 1);
let result_key = get_file_path_bufs("client", PkiFileType::Key, &path).unwrap();
- assert_eq!(result_key.len(), 1);
+ assert_eq!(result_key.len(), 2);
assert_ne!(result_cert, result_key);
let result_ca = get_file_path_bufs("ca", PkiFileType::Cert, &path).unwrap();
assert_eq!(result_ca.len(), 1);
@@ -194,7 +208,7 @@ mod tests {
#[serial(pki)]
fn test_get_certificate() {
create_test_pki_config_dir();
- let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_NAME);
+ let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_PEM_NAME);
let path_cert = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CERT_FILE_NAME);
let path_ca = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CA_FILE_NAME);
let path_ne = PathBuf::from(PKI_CONFIG_DIR).join("not_existing.crt");
@@ -223,9 +237,30 @@ mod tests {
#[test]
#[serial(pki)]
- fn test_get_key_pair() {
+ fn test_get_key_pair_pem() {
+ create_test_pki_config_dir();
+ let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_PEM_NAME);
+ let path_cert = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CERT_FILE_NAME);
+ let path_ca = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CA_FILE_NAME);
+ let path_ne = PathBuf::from(PKI_CONFIG_DIR).join("not_existing.key");
+ let result = get_key_pair(vec![path_key.clone()]);
+ assert!(result.is_ok());
+ let result = get_key_pair(vec![path_ne.clone(), path_key.clone()]);
+ assert!(result.is_ok());
+ let result =
+ get_key_pair(vec![path_ne.clone(), path_cert.clone(), path_ca.clone()]).unwrap_err();
+ assert!(matches!(result, DshError::NoCertificates));
+ let result = get_key_pair(vec![]).unwrap_err();
+ assert!(matches!(result, DshError::NoCertificates));
+ let result = get_key_pair(vec![path_ne]).unwrap_err();
+ assert!(matches!(result, DshError::NoCertificates));
+ }
+
+ #[test]
+ #[serial(pki)]
+ fn test_get_key_pair_der() {
create_test_pki_config_dir();
- let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_NAME);
+ let path_key = PathBuf::from(PKI_CONFIG_DIR).join(PKI_KEY_FILE_DER_NAME);
let path_cert = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CERT_FILE_NAME);
let path_ca = PathBuf::from(PKI_CONFIG_DIR).join(PKI_CA_FILE_NAME);
let path_ne = PathBuf::from(PKI_CONFIG_DIR).join("not_existing.key");
@@ -246,10 +281,10 @@ mod tests {
#[serial(pki, env_dependency)]
fn test_get_pki_cert() {
create_test_pki_config_dir();
- let result = get_pki_cert().unwrap_err();
- assert!(matches!(result, DshError::EnvVarError(_)));
+ let result = get_pki_certificates::
(signed when bootstrap is initiated) |
- /// | ssl.ca.pem | CA certifacte | CA certificate, provided by DSH. |
- /// | log_level | Info | Log level of rdkafka |
- ///
- /// ## Environment variables
- /// See [ENV_VARIABLES.md](https://github.com/kpn-dsh/dsh-sdk-platform-rs/blob/main/dsh_sdk/ENV_VARIABLES.md) for more information
- /// configuring the producer via environment variables.
- #[cfg(any(feature = "rdkafka-ssl", feature = "rdkafka-ssl-vendored"))]
- pub fn producer_rdkafka_config(&self) -> rdkafka::config::ClientConfig {
- let producer_config = PRODUCER_CONFIG.get_or_init(config::ProducerConfig::new);
- let mut config = rdkafka::config::ClientConfig::new();
- config
- .set("bootstrap.servers", self.kafka_brokers())
- .set("client.id", self.client_id());
- if let Some(batch_num_messages) = producer_config.batch_num_messages() {
- config.set("batch.num.messages", batch_num_messages.to_string());
- }
- if let Some(queue_buffering_max_messages) = producer_config.queue_buffering_max_messages() {
- config.set(
- "queue.buffering.max.messages",
- queue_buffering_max_messages.to_string(),
- );
- }
- if let Some(queue_buffering_max_kbytes) = producer_config.queue_buffering_max_kbytes() {
- config.set(
- "queue.buffering.max.kbytes",
- queue_buffering_max_kbytes.to_string(),
- );
- }
- if let Some(queue_buffering_max_ms) = producer_config.queue_buffering_max_ms() {
- config.set("queue.buffering.max.ms", queue_buffering_max_ms.to_string());
- }
- debug!("Producer config: {:#?}", config);
-
- // Set SSL if certificates are present
- if let Ok(certificates) = self.certificates() {
- config
- .set("security.protocol", "ssl")
- .set("ssl.key.pem", certificates.private_key_pem())
- .set(
- "ssl.certificate.pem",
- certificates.dsh_kafka_certificate_pem(),
- )
- .set("ssl.ca.pem", certificates.dsh_ca_certificate_pem());
- } else {
- config.set("security.protocol", "plaintext");
- }
- config
- }
-
/// Get reqwest async client config to connect to DSH Schema Registry.
/// If certificates are present, it will use SSL to connect to Schema Registry.
///
@@ -535,8 +369,7 @@ impl Properties {
/// - Required: `false`
/// - Options: `true`, `false`
pub fn kafka_auto_commit(&self) -> bool {
- let consumer_config = CONSUMER_CONFIG.get_or_init(config::ConsumerConfig::new);
- consumer_config.enable_auto_commit()
+ config::KafkaConfig::get().enable_auto_commit()
}
/// Get the kafka auto offset reset settings.
@@ -550,8 +383,173 @@ impl Properties {
/// - Required: `false`
/// - Options: smallest, earliest, beginning, largest, latest, end
pub fn kafka_auto_offset_reset(&self) -> String {
- let consumer_config = CONSUMER_CONFIG.get_or_init(config::ConsumerConfig::new);
- consumer_config.auto_offset_reset()
+ config::KafkaConfig::get().auto_offset_reset()
+ }
+
+ /// Get default RDKafka Consumer config to connect to Kafka on DSH.
+ ///
+ /// Note: This config is set to auto commit to false. You need to manually commit offsets.
+ /// You can overwrite this config by setting the enable.auto.commit and enable.auto.offset.store property to `true`.
+ ///
+ /// # Group ID
+ /// There are 2 types of group id's in DSH: private and shared. Private will have a unique group id per running instance.
+ /// Shared will have the same group id for all running instances. With this you can horizontally scale your service.
+ /// The group type can be manipulated by environment variable KAFKA_CONSUMER_GROUP_TYPE.
+ /// If not set, it will default to shared.
+ ///
+ /// # Example
+ /// ```
+ /// use dsh_sdk::Properties;
+ /// use dsh_sdk::rdkafka::config::RDKafkaLogLevel;
+ /// use dsh_sdk::rdkafka::consumer::stream_consumer::StreamConsumer;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box
(signed when bootstrap is initiated) |
+ /// | ssl.ca.pem | CA certifacte | CA certificate, provided by DSH. |
+ /// | log_level | Info | Log level of rdkafka |
+ ///
+ /// ## Environment variables
+ /// See [ENV_VARIABLES.md](https://github.com/kpn-dsh/dsh-sdk-platform-rs/blob/main/dsh_sdk/ENV_VARIABLES.md) for more information
+ /// configuring the producer via environment variables.
+ #[cfg(any(feature = "rdkafka-ssl", feature = "rdkafka-ssl-vendored"))]
+ pub fn producer_rdkafka_config(&self) -> rdkafka::config::ClientConfig {
+ let producer_config = crate::protocol_adapters::kafka_protocol::config::KafkaConfig::get();
+ let mut config = rdkafka::config::ClientConfig::new();
+ config
+ .set("bootstrap.servers", self.kafka_brokers())
+ .set("client.id", self.client_id());
+ if let Some(batch_num_messages) = producer_config.batch_num_messages() {
+ config.set("batch.num.messages", batch_num_messages.to_string());
+ }
+ if let Some(queue_buffering_max_messages) = producer_config.queue_buffering_max_messages() {
+ config.set(
+ "queue.buffering.max.messages",
+ queue_buffering_max_messages.to_string(),
+ );
+ }
+ if let Some(queue_buffering_max_kbytes) = producer_config.queue_buffering_max_kbytes() {
+ config.set(
+ "queue.buffering.max.kbytes",
+ queue_buffering_max_kbytes.to_string(),
+ );
+ }
+ if let Some(queue_buffering_max_ms) = producer_config.queue_buffering_max_ms() {
+ config.set("queue.buffering.max.ms", queue_buffering_max_ms.to_string());
+ }
+ log::debug!("Producer config: {:#?}", config);
+
+ // Set SSL if certificates are present
+ if let Ok(certificates) = self.certificates() {
+ config
+ .set("security.protocol", "ssl")
+ .set("ssl.key.pem", certificates.private_key_pem())
+ .set(
+ "ssl.certificate.pem",
+ certificates.dsh_kafka_certificate_pem(),
+ )
+ .set("ssl.ca.pem", certificates.dsh_ca_certificate_pem());
+ } else {
+ config.set("security.protocol", "plaintext");
+ }
+ config
}
}
diff --git a/dsh_sdk/src/error.rs b/dsh_sdk/src/error.rs
index 1a47e45..1996b1d 100644
--- a/dsh_sdk/src/error.rs
+++ b/dsh_sdk/src/error.rs
@@ -7,11 +7,11 @@ use thiserror::Error;
pub enum DshError {
#[error("IO Error: {0}")]
IoError(#[from] std::io::Error),
- #[error("Env var error: {0}")]
- EnvVarError(#[from] std::env::VarError),
+ #[error("Env variable {0} error: {1}")]
+ EnvVarError(String, std::env::VarError),
#[error("Convert bytes to utf8 error: {0}")]
Utf8(#[from] std::string::FromUtf8Error),
- #[cfg(any(feature = "bootstrap", feature = "mqtt-token-fetcher"))]
+ #[cfg(any(feature = "bootstrap", feature = "protocol-token-fetcher"))]
#[error("Error calling: {url}, status code: {status_code}, error body: {error_body}")]
DshCallError {
url: String,
@@ -21,24 +21,24 @@ pub enum DshError {
#[cfg(feature = "bootstrap")]
#[error("Certificates are not set")]
NoCertificates,
- #[cfg(feature = "bootstrap")]
+ #[cfg(any(feature = "bootstrap", feature = "pki-config-dir"))]
#[error("Invalid PEM certificate: {0}")]
PemError(#[from] pem::PemError),
- #[cfg(any(feature = "bootstrap", feature = "mqtt-token-fetcher"))]
+ #[cfg(any(feature = "certificate", feature = "protocol-token-fetcher"))]
#[error("Reqwest: {0}")]
ReqwestError(#[from] reqwest::Error),
- #[cfg(any(feature = "bootstrap", feature = "mqtt-token-fetcher"))]
+ #[cfg(any(feature = "bootstrap", feature = "protocol-token-fetcher"))]
#[error("Serde_json error: {0}")]
JsonError(#[from] serde_json::Error),
#[cfg(feature = "bootstrap")]
#[error("Rcgen error: {0}")]
PrivateKeyError(#[from] rcgen::Error),
- #[cfg(any(feature = "bootstrap", feature = "mqtt-token-fetcher"))]
+ #[cfg(any(feature = "bootstrap", feature = "protocol-token-fetcher"))]
#[error("Error parsing: {0}")]
ParseDnError(String),
#[cfg(feature = "bootstrap")]
#[error("Error getting group id, index out of bounds for {0}")]
- IndexGroupIdError(crate::dsh::datastream::GroupType),
+ IndexGroupIdError(crate::datastream::GroupType),
#[error("No tenant name found")]
NoTenantName,
#[cfg(feature = "bootstrap")]
@@ -46,7 +46,7 @@ pub enum DshError {
NotFoundTopicError(String),
#[cfg(feature = "bootstrap")]
#[error("Error in topic permissions: {0} does not have {1:?} permissions.")]
- TopicPermissionsError(String, crate::dsh::datastream::ReadWriteAccess),
+ TopicPermissionsError(String, crate::datastream::ReadWriteAccess),
#[cfg(feature = "metrics")]
#[error("Prometheus error: {0}")]
Prometheus(#[from] prometheus::Error),
@@ -55,7 +55,7 @@ pub enum DshError {
HyperError(#[from] hyper::http::Error),
}
-#[cfg(feature = "rest-token-fetcher")]
+#[cfg(feature = "management-api")]
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum DshRestTokenError {
diff --git a/dsh_sdk/src/graceful_shutdown.rs b/dsh_sdk/src/graceful_shutdown.rs
index 942a0d8..0354bf1 100644
--- a/dsh_sdk/src/graceful_shutdown.rs
+++ b/dsh_sdk/src/graceful_shutdown.rs
@@ -1,4 +1,4 @@
-//! Graceful shutdown for tokio tasks.
+//! Graceful shutdown
//!
//! This module provides a shutdown handle for graceful shutdown of (tokio tasks within) your service.
//! It listens for SIGTERM requests and sends out shutdown requests to all shutdown handles.
diff --git a/dsh_sdk/src/lib.rs b/dsh_sdk/src/lib.rs
index 0276d35..47e58b3 100644
--- a/dsh_sdk/src/lib.rs
+++ b/dsh_sdk/src/lib.rs
@@ -72,28 +72,82 @@
//! The DLQ is implemented by running the `Dlq` struct to push messages towards the DLQ topics.
//! The `ErrorToDlq` trait can be implemented on your defined errors, to be able to send messages towards the DLQ Struct.
-#[cfg(feature = "dlq")]
-pub mod dlq;
+#![allow(deprecated)]
+
+// to be kept in v0.6.0
+#[cfg(feature = "certificate")]
+pub mod certificates;
+#[cfg(feature = "bootstrap")]
+pub mod datastream;
#[cfg(feature = "bootstrap")]
pub mod dsh;
pub mod error;
-#[cfg(feature = "graceful_shutdown")]
+#[cfg(feature = "management-api")]
+pub mod management_api;
+pub mod protocol_adapters;
+pub mod utils;
+
+#[cfg(feature = "bootstrap")]
+#[doc(inline)]
+pub use dsh::Dsh;
+
+#[cfg(feature = "management-api")]
+pub use management_api::token_fetcher::{
+ ManagementApiTokenFetcher, ManagementApiTokenFetcherBuilder,
+};
+
+#[doc(inline)]
+pub use utils::Platform;
+
+// TODO: to be removed in v0.6.0
+#[cfg(feature = "dlq")]
+#[deprecated(since = "0.5.0", note = "The DLQ is moved to `dsh_sdk::utils::dlq`")]
+pub mod dlq;
+
+#[cfg(feature = "bootstrap")]
+#[deprecated(
+ since = "0.5.0",
+ note = "The `dsh` as module is phased out. Use
+ `dsh_sdk::Dsh` for all info about your running container;
+ `dsh_sdk::certificates` for all certificate related info;
+ `dsh_sdk::datastream` for all datastream related info;
+ "
+)]
+pub mod dsh_old;
+
+#[cfg(feature = "graceful-shutdown")]
+#[deprecated(
+ since = "0.5.0",
+ note = "`dsh_sdk::graceful_shutdown` is moved to `dsh_sdk::utils::graceful_shutdown`"
+)]
pub mod graceful_shutdown;
+
#[cfg(feature = "metrics")]
+#[deprecated(
+ since = "0.5.0",
+ note = "`dsh_sdk::metrics` is moved to `dsh_sdk::utils::metrics`"
+)]
pub mod metrics;
+
#[cfg(any(feature = "rdkafka-ssl", feature = "rdkafka-ssl-vendored"))]
pub use rdkafka;
-#[cfg(feature = "mqtt-token-fetcher")]
+#[cfg(feature = "protocol-token-fetcher")]
+#[deprecated(
+ since = "0.5.0",
+ note = "`dsh_sdk::mqtt_token_fetcher` is moved to `dsh_sdk::protocol_adapters::token_fetcher`"
+)]
pub mod mqtt_token_fetcher;
-#[cfg(feature = "rest-token-fetcher")]
-mod rest_api_token_fetcher;
-mod utils;
-
#[cfg(feature = "bootstrap")]
-pub use dsh::Properties;
-#[cfg(feature = "rest-token-fetcher")]
+pub use dsh_old::Properties;
+
+#[cfg(feature = "management-api")]
+#[deprecated(
+ since = "0.5.0",
+ note = "`RestTokenFetcher` and `RestTokenFetcherBuilder` are renamed to `ManagementApiTokenFetcher` and `ManagementApiTokenFetcherBuilder`"
+)]
+mod rest_api_token_fetcher;
+#[cfg(feature = "management-api")]
pub use rest_api_token_fetcher::{RestTokenFetcher, RestTokenFetcherBuilder};
-pub use utils::Platform;
// Environment variables
const VAR_APP_ID: &str = "MARATHON_APP_ID";
diff --git a/dsh_sdk/src/management_api/mod.rs b/dsh_sdk/src/management_api/mod.rs
new file mode 100644
index 0000000..a3e273a
--- /dev/null
+++ b/dsh_sdk/src/management_api/mod.rs
@@ -0,0 +1 @@
+pub mod token_fetcher;
diff --git a/dsh_sdk/src/management_api/token_fetcher.rs b/dsh_sdk/src/management_api/token_fetcher.rs
new file mode 100644
index 0000000..eda43be
--- /dev/null
+++ b/dsh_sdk/src/management_api/token_fetcher.rs
@@ -0,0 +1,597 @@
+//! Module for fetching and storing access tokens for the DSH Management Rest API client
+//!
+//! This module is meant to be used together with the [dsh_rest_api_client].
+//!
+//! The TokenFetcher will fetch and store access tokens to be used in the DSH Rest API client.
+//!
+//! ## Example
+//! Recommended usage is to use the [RestTokenFetcherBuilder] to create a new instance of the token fetcher.
+//! However, you can also create a new instance of the token fetcher directly.
+//! ```no_run
+//! use dsh_sdk::{RestTokenFetcherBuilder, Platform};
+//! use dsh_rest_api_client::Client;
+//!
+//! const CLIENT_SECRET: &str = "";
+//! const TENANT: &str = "tenant-name";
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let platform = Platform::NpLz;
+//! let client = Client::new(platform.endpoint_rest_api());
+//!
+//! let tf = RestTokenFetcherBuilder::new(platform)
+//! .tenant_name(TENANT.to_string())
+//! .client_secret(CLIENT_SECRET.to_string())
+//! .build()
+//! .unwrap();
+//!
+//! let response = client
+//! .topic_get_by_tenant_topic(TENANT, &tf.get_token().await.unwrap())
+//! .await;
+//! println!("Available topics: {:#?}", response);
+//! }
+//! ```
+
+use std::fmt::Debug;
+use std::ops::Add;
+use std::sync::Mutex;
+use std::time::{Duration, Instant};
+
+use log::debug;
+use serde::Deserialize;
+
+use crate::error::DshRestTokenError;
+use crate::utils::Platform;
+
+/// Access token of the authentication serveice of DSH.
+///
+/// This is the response whem requesting for a new access token.
+///
+/// ## Recommended usage
+/// Use the [RestTokenFetcher::get_token] to get the bearer token, the `TokenFetcher` will automatically fetch a new token if the current token is not valid.
+#[derive(Debug, Clone, Deserialize)]
+pub struct AccessToken {
+ access_token: String,
+ expires_in: u64,
+ refresh_expires_in: u32,
+ token_type: String,
+ #[serde(rename(deserialize = "not-before-policy"))]
+ not_before_policy: u32,
+ scope: String,
+}
+
+impl AccessToken {
+ /// Get the formatted token
+ pub fn formatted_token(&self) -> String {
+ format!("{} {}", self.token_type, self.access_token)
+ }
+
+ /// Get the access token
+ pub fn access_token(&self) -> &str {
+ &self.access_token
+ }
+
+ /// Get the expires in of the access token
+ pub fn expires_in(&self) -> u64 {
+ self.expires_in
+ }
+
+ /// Get the refresh expires in of the access token
+ pub fn refresh_expires_in(&self) -> u32 {
+ self.refresh_expires_in
+ }
+
+ /// Get the token type of the access token
+ pub fn token_type(&self) -> &str {
+ &self.token_type
+ }
+
+ /// Get the not before policy of the access token
+ pub fn not_before_policy(&self) -> u32 {
+ self.not_before_policy
+ }
+
+ /// Get the scope of the access token
+ pub fn scope(&self) -> &str {
+ &self.scope
+ }
+}
+
+impl Default for AccessToken {
+ fn default() -> Self {
+ Self {
+ access_token: "".to_string(),
+ expires_in: 0,
+ refresh_expires_in: 0,
+ token_type: "".to_string(),
+ not_before_policy: 0,
+ scope: "".to_string(),
+ }
+ }
+}
+
+/// Fetch and store access tokens to be used in the DSH Rest API client
+///
+/// This struct will fetch and store access tokens to be used in the DSH Rest API client.
+/// It will automatically fetch a new token if the current token is not valid.
+pub struct ManagementApiTokenFetcher {
+ access_token: Mutex