Skip to content

Commit 12fd1aa

Browse files
authored
Release/0.4.6 (#68)
* Add some unit tests to mqtt fetcher * Update api doc * update README and changelog + Bump version * remove unneeded references * cargo fmt * add .vscode settings to enable all features
1 parent 9dbc4b3 commit 12fd1aa

File tree

9 files changed

+146
-40
lines changed

9 files changed

+146
-40
lines changed

.gitignore

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,3 @@ test_files/
2323
*.iws
2424
*.iml
2525
*.ipr
26-
27-
.vscode/

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"rust-analyzer.cargo.features": "all"
3+
}

dsh_sdk/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.4.6] - 2024-08-24
9+
### Added
10+
- Add token fetcher for DSH MQTT
11+
- fetch token for MQTT
12+
- cache tokens till expiration time
13+
- support fetching tokens for multiple clients
14+
### Changed
15+
- Updated dependencies
16+
- Lazy_static to 1.5
17+
18+
819
## [0.4.5] - 2024-07-24
920
### Added
1021
- Add additional/optional config for producers and consumer

dsh_sdk/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ license.workspace = true
99
name = "dsh_sdk"
1010
readme = 'README.md'
1111
repository.workspace = true
12-
version = "0.4.5"
12+
version = "0.4.6"
1313

1414
[package.metadata.docs.rs]
1515
all-features = true
@@ -21,7 +21,7 @@ dashmap = {version = "6.0", optional = true}
2121
http-body-util = { version = "0.1", optional = true }
2222
hyper = { version = "1.3", features = ["server", "http1"], optional = true }
2323
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
24-
lazy_static = { version = "1.4", optional = true }
24+
lazy_static = { version = "1.5", optional = true }
2525
log = "0.4"
2626
pem = "3"
2727
prometheus = { version = "0.13", features = ["process"], optional = true }
@@ -33,10 +33,10 @@ serde_json = { version = "1.0", optional = true }
3333
sha2 = { version = "0.10", optional = true}
3434
thiserror = "1.0"
3535
tokio = { version = "^1.35", features = ["signal", "sync", "time", "macros"], optional = true }
36-
tokio-util = { version = "0.7", optional = true }
36+
tokio-util = { version = "0.7", default-features = false, optional = true }
3737

3838
[features]
39-
default = ["bootstrap", "graceful_shutdown", "metrics", "rdkafka-ssl", "rest-token-fetcher", "mqtt-token-fetcher"]
39+
default = ["bootstrap", "graceful_shutdown", "metrics", "rdkafka-ssl"]
4040

4141
bootstrap = ["rcgen", "serde_json", "reqwest"]
4242
metrics = ["prometheus", "hyper", "hyper-util", "http-body-util", "lazy_static", "tokio", "bytes"]

dsh_sdk/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ The following features are available in this library and can be enabled/disabled
6060
| **feature** | **default** | **Description** |
6161
|---|---|---|
6262
| `bootstrap` | &check; | Generate signed certificate and fetch datastreams info <br> Also makes certificates available, to be used as lowlevel API |
63-
| `rest-token-fetcher` | &check; | Fetch a token to use the DSH Rest API |
63+
| `rest-token-fetcher` | &cross; | Fetch tokens to use DSH Rest API |
64+
| `mqtt-token-fetcher` | &cross; | Fetch tokens to use DSH MQTT |
6465
| `metrics` | &check; | Enable (custom) metrics for your service |
6566
| `graceful_shutdown` | &check; | Create a signal handler for implementing a graceful shutdown |
6667
| `dlq` | &cross; | Dead Letter Queue implementation (experimental) |

dsh_sdk/src/dsh/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! # DSH Properties
1+
//! # DSH
22
//!
33
//! This module contains logic to connect to Kafka on DSH and retreive all properties of your tenant.
44
//!

dsh_sdk/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! Error types for the DSH SDK
2+
13
use thiserror::Error;
24

35
#[derive(Error, Debug)]

dsh_sdk/src/mqtt_token_fetcher.rs

Lines changed: 121 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//! # MQTT Token Fetcher
2+
//!
3+
//! `MqttTokenFetcher` is responsible for fetching and managing MQTT tokens for DSH.
14
use std::{
25
fmt::{Display, Formatter},
36
sync::Mutex,
@@ -21,8 +24,9 @@ pub struct MqttTokenFetcher {
2124
tenant_name: String,
2225
rest_api_key: String,
2326
rest_token: Mutex<RestToken>,
27+
rest_auth_url: String,
2428
mqtt_token: DashMap<String, MqttToken>, // Mapping from Client ID to MqttToken
25-
platform: Platform,
29+
mqtt_auth_url: String,
2630
//token_lifetime: Option<i32>, // TODO: Implement option of passing token lifetime to request token for specific duration
2731
// port: Port or connection_type: Connection // TODO: Platform provides two connection options, current implemetation only provides connecting over SSL, enable WebSocket too
2832
}
@@ -45,8 +49,9 @@ impl MqttTokenFetcher {
4549
tenant_name,
4650
rest_api_key,
4751
rest_token: Mutex::new(rest_token),
52+
rest_auth_url: platform.endpoint_rest_token().to_string(),
4853
mqtt_token: DashMap::new(),
49-
platform,
54+
mqtt_auth_url: platform.endpoint_mqtt_token().to_string(),
5055
}
5156
}
5257
/// Retrieves an MQTT token for the specified client ID.
@@ -66,18 +71,20 @@ impl MqttTokenFetcher {
6671
client_id: &str,
6772
claims: Option<Vec<Claims>>,
6873
) -> Result<MqttToken, DshError> {
69-
let mut mqtt_token = self
70-
.mqtt_token
71-
.entry(client_id.to_string())
72-
.or_insert(self.fetch_new_mqtt_token(client_id, claims.clone()).await?);
73-
74-
if !mqtt_token.is_valid() {
75-
*mqtt_token = self
76-
.fetch_new_mqtt_token(client_id, claims.clone())
77-
.await
78-
.unwrap()
79-
};
80-
Ok(mqtt_token.clone())
74+
match self.mqtt_token.entry(client_id.to_string()) {
75+
dashmap::Entry::Occupied(mut entry) => {
76+
let mqtt_token = entry.get_mut();
77+
if !mqtt_token.is_valid() {
78+
*mqtt_token = self.fetch_new_mqtt_token(client_id, claims).await?;
79+
};
80+
Ok(mqtt_token.clone())
81+
}
82+
dashmap::Entry::Vacant(entry) => {
83+
let mqtt_token = self.fetch_new_mqtt_token(client_id, claims).await?;
84+
entry.insert(mqtt_token.clone());
85+
Ok(mqtt_token)
86+
}
87+
}
8188
}
8289
/// Fetches a new MQTT token from the platform.
8390
///
@@ -94,7 +101,7 @@ impl MqttTokenFetcher {
94101

95102
if !rest_token.is_valid() {
96103
*rest_token =
97-
RestToken::get(&self.tenant_name, &self.rest_api_key, &self.platform).await?
104+
RestToken::get(&self.tenant_name, &self.rest_api_key, &self.rest_auth_url).await?
98105
}
99106

100107
let authorization_header = format!("Bearer {}", rest_token.raw_token);
@@ -103,7 +110,7 @@ impl MqttTokenFetcher {
103110
let payload = serde_json::to_value(&mqtt_token_request)?;
104111

105112
let response = mqtt_token_request
106-
.send(&self.platform, &authorization_header, &payload)
113+
.send(&self.mqtt_auth_url, &authorization_header, &payload)
107114
.await?;
108115

109116
MqttToken::new(response)
@@ -202,7 +209,7 @@ impl MqttTokenRequest {
202209

203210
async fn send(
204211
&self,
205-
platform: &Platform,
212+
mqtt_auth_url: &str,
206213
authorization_header: &str,
207214
payload: &serde_json::Value,
208215
) -> Result<String, DshError> {
@@ -215,7 +222,7 @@ impl MqttTokenRequest {
215222
.expect("Failed to build reqwest client");
216223

217224
let response = reqwest_client
218-
.post(platform.endpoint_mqtt_token())
225+
.post(mqtt_auth_url)
219226
.header("Authorization", authorization_header)
220227
.json(payload)
221228
.send()
@@ -225,7 +232,7 @@ impl MqttTokenRequest {
225232
Ok(response.text().await?)
226233
} else {
227234
Err(DshError::DshCallError {
228-
url: platform.endpoint_mqtt_token().to_string(),
235+
url: mqtt_auth_url.to_string(),
229236
status_code: response.status(),
230237
error_body: response.text().await?,
231238
})
@@ -283,7 +290,7 @@ impl MqttToken {
283290
.duration_since(UNIX_EPOCH)
284291
.expect("SystemTime before UNIX EPOCH!")
285292
.as_secs() as i32;
286-
self.exp >= current_unixtime - 5
293+
self.exp >= current_unixtime + 5
287294
}
288295
}
289296

@@ -292,10 +299,10 @@ impl MqttToken {
292299
#[serde(rename_all = "kebab-case")]
293300
struct RestTokenAttributes {
294301
gen: i64,
295-
pub endpoint: String,
302+
endpoint: String,
296303
iss: String,
297-
pub claims: RestClaims,
298-
pub exp: i64,
304+
claims: RestClaims,
305+
exp: i32,
299306
tenant_id: String,
300307
}
301308

@@ -312,7 +319,7 @@ struct DatastreamsData {}
312319
#[derive(Serialize, Deserialize, Debug)]
313320
struct RestToken {
314321
raw_token: String,
315-
exp: i64,
322+
exp: i32,
316323
}
317324

318325
impl RestToken {
@@ -327,8 +334,8 @@ impl RestToken {
327334
/// # Returns
328335
///
329336
/// A Result containing the created `RestToken` or a `DshError`.
330-
async fn get(tenant: &str, api_key: &str, env: &Platform) -> Result<RestToken, DshError> {
331-
let raw_token = Self::fetch_token(tenant, api_key, env).await.unwrap();
337+
async fn get(tenant: &str, api_key: &str, auth_url: &str) -> Result<RestToken, DshError> {
338+
let raw_token = Self::fetch_token(tenant, api_key, auth_url).await.unwrap();
332339

333340
let header_payload = extract_header_and_payload(&raw_token)?;
334341

@@ -347,11 +354,11 @@ impl RestToken {
347354
let current_unixtime = SystemTime::now()
348355
.duration_since(UNIX_EPOCH)
349356
.expect("SystemTime before UNIX EPOCH!")
350-
.as_secs() as i64;
351-
self.exp >= current_unixtime - 5
357+
.as_secs() as i32;
358+
self.exp >= current_unixtime + 5
352359
}
353360

354-
async fn fetch_token(tenant: &str, api_key: &str, env: &Platform) -> Result<String, DshError> {
361+
async fn fetch_token(tenant: &str, api_key: &str, auth_url: &str) -> Result<String, DshError> {
355362
let json_body = json!({"tenant": tenant});
356363

357364
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -364,7 +371,7 @@ impl RestToken {
364371

365372
let rest_client = reqwest_client;
366373
let response = rest_client
367-
.post(env.endpoint_rest_token())
374+
.post(auth_url)
368375
.header("apikey", api_key)
369376
.json(&json_body)
370377
.send()
@@ -375,7 +382,7 @@ impl RestToken {
375382
match status {
376383
reqwest::StatusCode::OK => Ok(body_text),
377384
_ => Err(DshError::DshCallError {
378-
url: env.endpoint_rest_api().to_string(),
385+
url: auth_url.to_string(),
379386
status_code: status,
380387
error_body: body_text,
381388
}),
@@ -437,6 +444,33 @@ fn decode_base64(payload: &str) -> Result<Vec<u8>, DshError> {
437444
mod tests {
438445
use super::*;
439446

447+
fn create_valid_fetcher() -> MqttTokenFetcher {
448+
let exp_time = SystemTime::now()
449+
.duration_since(UNIX_EPOCH)
450+
.unwrap()
451+
.as_secs() as i32
452+
+ 3600;
453+
println!("exp_time: {}", exp_time);
454+
let rest_token: RestToken = RestToken {
455+
exp: exp_time as i32,
456+
raw_token: "valid.token.payload".to_string(),
457+
};
458+
let mqtt_token = MqttToken {
459+
exp: exp_time,
460+
raw_token: "valid.token.payload".to_string(),
461+
};
462+
let mqtt_token_map = DashMap::new();
463+
mqtt_token_map.insert("test_client".to_string(), mqtt_token.clone());
464+
MqttTokenFetcher {
465+
tenant_name: "test_tenant".to_string(),
466+
rest_api_key: "test_api_key".to_string(),
467+
rest_token: Mutex::new(rest_token),
468+
rest_auth_url: "test_auth_url".to_string(),
469+
mqtt_token: mqtt_token_map,
470+
mqtt_auth_url: "test_auth_url".to_string(),
471+
}
472+
}
473+
440474
#[tokio::test]
441475
async fn test_mqtt_token_fetcher_new() {
442476
let tenant_name = "test_tenant".to_string();
@@ -448,6 +482,13 @@ mod tests {
448482
assert!(fetcher.mqtt_token.is_empty());
449483
}
450484

485+
#[tokio::test]
486+
async fn test_mqtt_token_fetcher_get_token() {
487+
let fetcher = create_valid_fetcher();
488+
let token = fetcher.get_token("test_client", None).await.unwrap();
489+
assert_eq!(token.raw_token, "valid.token.payload");
490+
}
491+
451492
#[test]
452493
fn test_claims_new() {
453494
let resource = Resource::new(
@@ -492,18 +533,66 @@ mod tests {
492533

493534
assert!(token.is_valid());
494535
}
536+
#[test]
537+
fn test_mqtt_token_is_invalid() {
538+
let raw_token = "valid.token.payload".to_string();
539+
let token = MqttToken {
540+
exp: SystemTime::now()
541+
.duration_since(UNIX_EPOCH)
542+
.unwrap()
543+
.as_secs() as i32,
544+
raw_token,
545+
};
546+
547+
assert!(!token.is_valid());
548+
}
495549

496550
#[test]
497551
fn test_rest_token_is_valid() {
498552
let token = RestToken {
499553
exp: SystemTime::now()
500554
.duration_since(UNIX_EPOCH)
501555
.unwrap()
502-
.as_secs() as i64
556+
.as_secs() as i32
503557
+ 3600,
504558
raw_token: "valid.token.payload".to_string(),
505559
};
506560

507561
assert!(token.is_valid());
508562
}
563+
564+
#[test]
565+
fn test_rest_token_is_invalid() {
566+
let token = RestToken {
567+
exp: SystemTime::now()
568+
.duration_since(UNIX_EPOCH)
569+
.unwrap()
570+
.as_secs() as i32,
571+
raw_token: "valid.token.payload".to_string(),
572+
};
573+
574+
assert!(!token.is_valid());
575+
}
576+
577+
#[test]
578+
fn test_rest_token_default_is_invalid() {
579+
let token = RestToken::default();
580+
581+
assert!(!token.is_valid());
582+
}
583+
584+
#[test]
585+
fn test_extract_header_and_payload() {
586+
let raw = "header.payload.signature";
587+
let result = extract_header_and_payload(raw).unwrap();
588+
assert_eq!(result, "payload");
589+
590+
let raw = "header.payload";
591+
let result = extract_header_and_payload(raw).unwrap();
592+
assert_eq!(result, "payload");
593+
594+
let raw = "header";
595+
let result = extract_header_and_payload(raw);
596+
assert!(result.is_err());
597+
}
509598
}

dsh_sdk/src/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ impl Platform {
9292

9393
/// Get the endpoint for fetching DSH Rest Authentication Token
9494
///
95+
/// With this token you can authenticate for the mqtt token endpoint
96+
///
9597
/// It will return the endpoint for DSH Rest authentication token based on the platform
9698
pub fn endpoint_rest_token(&self) -> &str {
9799
match self {

0 commit comments

Comments
 (0)