Skip to content

Commit 3c5bef3

Browse files
authored
Feature/config via env variables (#43)
* Add example service back to cargo.toml * Add env variables to overwrite config and remove Result from config * update readme and changelog * Add test * Fix DLQ * cargo fmt
1 parent 7263475 commit 3c5bef3

File tree

12 files changed

+321
-97
lines changed

12 files changed

+321
-97
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Added
1515
- Add Default implementation for Dsh::Properties
1616
- Points to localhost:9092 for kafka, localhost:8081 for schemastore
17-
- local_datastreams.json is now optional
17+
- local_datastreams.json is now optional as it falssback to default values
18+
- Overwrite Kafka config via environment variables for producer and consumer
1819
- Add extra check in github actions to check for compile issues for all features independently
1920

2021
### Changed
22+
- **Breaking change:** consumer_rdkafka_config and producer_rdkafka_config returns `ClientConfig` instead of `Result<ClientConfig>`
23+
- **Breaking change:** certificates and keys are now returned as `T` instead of `Result<T>`
2124
- **Breaking change:** Private key is based on ECDSA instead of RSA
2225
- **Breaking change:** Error enum is now non_exhaustive
2326

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"dsh_sdk",
4+
"example_dsh_service",
45
]
56
readme = "README.md"
67
resolver = "2"

dsh_sdk/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@ rdkafka-ssl = ["rdkafka", "rdkafka/ssl"]
4545
mockito = "1.1.1"
4646
openssl = "0.10"
4747
tokio = "^1.35"
48-
hyper = { version = "1.2.0", features = ["full"]}
48+
hyper = { version = "1.2.0", features = ["full"]}
49+
serial_test = "3.1.0"

dsh_sdk/README.md

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use dsh_sdk::rdkafka::consumer::{Consumer, StreamConsumer};
4242
fn main() -> Result<(), Box<dyn std::error::Error>>{
4343
let dsh_properties = Properties::get();
4444
// get a rdkafka consumer config for example
45-
let consumer: StreamConsumer = dsh_properties.consumer_rdkafka_config()?.create()?;
45+
let consumer: StreamConsumer = dsh_properties.consumer_rdkafka_config().create()?;
4646
}
4747
```
4848

@@ -53,7 +53,6 @@ The following features are available in this library and can be enabled/disabled
5353
| **feature** | **default** | **Description** |
5454
|---|---|---|
5555
| `bootstrap` | &check; | Generate signed certificate and fetch datastreams info <br> Also makes certificates available, to be used as lowlevel API |
56-
| ~~`local`~~ | &check; | Will be deprecated in v0.4.0 |
5756
| `metrics` | &check; | Enable (custom) metrics for your service |
5857
| `graceful_shutdown` | &check; | Create a signal handler for implementing a graceful shutdown |
5958
| `dlq` | &cross; | Dead Letter Queue implementation (experimental) |
@@ -62,11 +61,43 @@ The following features are available in this library and can be enabled/disabled
6261

6362
See api documentation for more information on how to use these features including.
6463

64+
## Environment variables
65+
The default RDKafka config can be overwritten by setting the following environment variables:
66+
67+
### `KAFKA_BOOTSTRAP_SERVERS`
68+
- Usage: Overwrite hostnames of brokers (useful for local testing)
69+
- Default: Brokers based on datastreams
70+
- Required: `false`
71+
72+
### `KAFKA_CONSUMER_GROUP_TYPE`
73+
- Usage: Picks group_id based on type from datastreams
74+
- Default: Shared
75+
- Options: private, shared
76+
- Required: `false`
77+
78+
### `KAFKA_GROUP_ID`
79+
- Usage: Custom group id
80+
- Default: NA
81+
- Required: `false`
82+
- Remark: Overrules `KAFKA_CONSUMER_GROUP_TYPE`. Mandatory to start with tenant name. (will prefix tenant name automatically if not set)
83+
84+
### `KAFKA_ENABLE_AUTO_COMMIT`
85+
- Usage: Enable/Disable auto commit
86+
- Default: `false`
87+
- Required: `false`
88+
- Options: `true`, `false`
89+
90+
### `KAFKA_AUTO_OFFSET_RESET`
91+
- Usage: Set the offset reset settings to start consuming from set option.
92+
- Default: earliest
93+
- Required: `false`
94+
- Options: smallest, earliest, beginning, largest, latest, end
95+
6596
## Api doc
6697
See the [api documentation](https://docs.rs/dsh_sdk/latest/dsh_sdk/) for more information on how to use this library.
6798

6899
### Local development
69-
You can start the [docker-compose](../docker/docker-compose.yml) file in the root of this project to start a local Kafka broker and Schema Registry.
100+
You can start the [docker-compose](../docker/docker-compose.yml) file to start a local Kafka broker and Schema Registry.
70101

71102
When running the SDK on your local machine, it will automatically try to connect to the local Kafka broker and Schema Registry
72103

@@ -75,7 +106,7 @@ When running the SDK on your local machine, it will automatically try to connect
75106
| Kafka | `localhost:9092` |
76107
| Schema Registry | `localhost:8081/apis/ccompat/v7` |
77108

78-
If you want manipulate these endpoints, or want to use specific datastream info, you can add a [local_datastreams.json](local_datastreams.json) to your project root to overwrite the default values.
109+
If you want manipulate these endpoints, or want to use specific datastream info, you can add a [local_datastreams.json](local_datastreams.json) to your project root to overwrite the default values or set the environment variables accordingly.
79110

80111
### Note
81112
Rdkafka and thereby this library is dependent on CMAKE. Make sure it is installed in your environment and/or Dockerfile where you are compiling.

dsh_sdk/examples/produce_consume.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4646
let topic = "scratch.local.local-tenant";
4747

4848
// Create a new producer based on the properties default config
49-
let mut producer: FutureProducer = dsh_properties.producer_rdkafka_config()?.create()?;
49+
let mut producer: FutureProducer = dsh_properties.producer_rdkafka_config().create()?;
5050

5151
// Produce messages towards topic
5252
produce(&mut producer, topic).await;
5353

5454
// Create a new consumer based on the properties default config
55-
let mut consumer: StreamConsumer = dsh_properties.consumer_rdkafka_config()?.create()?;
55+
let mut consumer: StreamConsumer = dsh_properties.consumer_rdkafka_config().create()?;
5656

5757
consume(&mut consumer, topic).await;
5858
Ok(())

dsh_sdk/src/dlq.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -230,17 +230,7 @@ impl Dlq {
230230
}
231231

232232
fn build_producer(dsh_prop: &Properties) -> Result<FutureProducer, rdkafka::error::KafkaError> {
233-
let producer_config = match dsh_prop.producer_rdkafka_config() {
234-
Ok(config) => config,
235-
Err(e) => {
236-
error!("Error creating producer config");
237-
return Err(rdkafka::error::KafkaError::ClientCreation(format!(
238-
"Error creating producer config: {}",
239-
e
240-
)));
241-
}
242-
};
243-
producer_config.create()
233+
dsh_prop.producer_rdkafka_config().create()
244234
}
245235
}
246236

dsh_sdk/src/dsh/certificates.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl Cert {
7676
pub fn reqwest_client_config(&self) -> Result<reqwest::ClientBuilder, DshError> {
7777
let pem_identity = Cert::create_identity(
7878
self.dsh_kafka_certificate_pem().as_bytes(),
79-
self.private_key_pem()?.as_bytes(),
79+
self.private_key_pem().as_bytes(),
8080
)?;
8181
let reqwest_cert =
8282
reqwest::tls::Certificate::from_pem(self.dsh_ca_certificate_pem().as_bytes())?;
@@ -91,7 +91,7 @@ impl Cert {
9191
pub fn reqwest_blocking_client_config(&self) -> Result<ClientBuilder, DshError> {
9292
let pem_identity = Cert::create_identity(
9393
self.dsh_kafka_certificate_pem().as_bytes(),
94-
self.private_key_pem()?.as_bytes(),
94+
self.private_key_pem().as_bytes(),
9595
)?;
9696
let reqwest_cert =
9797
reqwest::tls::Certificate::from_pem(self.dsh_ca_certificate_pem().as_bytes())?;
@@ -112,23 +112,23 @@ impl Cert {
112112
}
113113

114114
/// Get the private key as PKCS8 and return bytes based on asn1 DER format.
115-
pub fn private_key_pkcs8(&self) -> Result<Vec<u8>, DshError> {
116-
Ok(self.key_pair.serialize_der())
115+
pub fn private_key_pkcs8(&self) -> Vec<u8> {
116+
self.key_pair.serialize_der()
117117
}
118118

119119
/// Get the private key as PEM string. Equivalent to client.key.
120-
pub fn private_key_pem(&self) -> Result<String, DshError> {
121-
Ok(self.key_pair.serialize_pem())
120+
pub fn private_key_pem(&self) -> String {
121+
self.key_pair.serialize_pem()
122122
}
123123

124124
/// Get the public key as PEM string.
125-
pub fn public_key_pem(&self) -> Result<String, DshError> {
126-
Ok(self.key_pair.public_key_pem())
125+
pub fn public_key_pem(&self) -> String {
126+
self.key_pair.public_key_pem()
127127
}
128128

129129
/// Get the public key as DER bytes.
130-
pub fn public_key_der(&self) -> Result<Vec<u8>, DshError> {
131-
Ok(self.key_pair.public_key_der())
130+
pub fn public_key_der(&self) -> Vec<u8> {
131+
self.key_pair.public_key_der()
132132
}
133133

134134
/// Create the ca.crt, client.pem, and client.key files in a desired directory.
@@ -152,7 +152,7 @@ impl Cert {
152152
std::fs::create_dir_all(dir)?;
153153
Self::create_file(dir.join("ca.crt"), self.dsh_ca_certificate_pem())?;
154154
Self::create_file(dir.join("client.pem"), self.dsh_kafka_certificate_pem())?;
155-
Self::create_file(dir.join("client.key"), self.private_key_pem()?)?;
155+
Self::create_file(dir.join("client.key"), self.private_key_pem())?;
156156
Ok(())
157157
}
158158

@@ -212,7 +212,7 @@ mod tests {
212212
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
213213
let pkey_pem_bytes = pkey.private_key_to_pem_pkcs8().unwrap();
214214

215-
let key_pem = cert.private_key_pem().unwrap();
215+
let key_pem = cert.private_key_pem();
216216
let pkey_pem = String::from_utf8_lossy(pkey_pem_bytes.as_slice());
217217
assert_eq!(key_pem, pkey_pem);
218218
}
@@ -224,7 +224,7 @@ mod tests {
224224
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
225225
let pkey_pub_pem_bytes = pkey.public_key_to_pem().unwrap();
226226

227-
let pub_pem = cert.public_key_pem().unwrap();
227+
let pub_pem = cert.public_key_pem();
228228
let pkey_pub_pem = String::from_utf8_lossy(pkey_pub_pem_bytes.as_slice());
229229
assert_eq!(pub_pem, pkey_pub_pem);
230230
}
@@ -236,7 +236,7 @@ mod tests {
236236
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
237237
let pkey_pub_der = pkey.public_key_to_der().unwrap();
238238

239-
let pub_der = cert.public_key_der().unwrap();
239+
let pub_der = cert.public_key_der();
240240
assert_eq!(pub_der, pkey_pub_der);
241241
}
242242

@@ -247,7 +247,7 @@ mod tests {
247247
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
248248
let pkey = pkey.private_key_to_pkcs8().unwrap();
249249

250-
let key = cert.private_key_pkcs8().unwrap();
250+
let key = cert.private_key_pkcs8();
251251
assert_eq!(key, pkey);
252252
}
253253

@@ -292,7 +292,7 @@ mod tests {
292292
let dn = Dn::parse_string("CN=Test CN,OU=Test OU,O=Test Org").unwrap();
293293
let csr = Cert::generate_csr(&cert.key_pair, dn).unwrap();
294294
let csr_pem = csr.pem().unwrap();
295-
let key = cert.private_key_pkcs8().unwrap();
295+
let key = cert.private_key_pkcs8();
296296
let pkey = PKey::private_key_from_der(&key).unwrap();
297297

298298
let req = X509Req::from_pem(csr_pem.as_bytes()).unwrap();

dsh_sdk/src/dsh/datastream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl Datastream {
134134
let mut file = match file_result {
135135
Ok(file) => file,
136136
Err(e) => {
137-
error!(
137+
warn!(
138138
"Error opening local_datastreams.json ({}): {}",
139139
path_buf.display(),
140140
e

0 commit comments

Comments
 (0)