Skip to content

Commit d380599

Browse files
authored
Merge pull request #37 from kpn-dsh/fix/improve-example
Fix/improve example
2 parents 24843c5 + ad87562 commit d380599

File tree

3 files changed

+14
-15
lines changed

3 files changed

+14
-15
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
3-
"dsh_sdk"
3+
"dsh_sdk",
4+
"example_dsh_service",
45
]
56
readme = "README.md"
67
resolver = "2"

dsh_sdk/examples/produce_consume.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use dsh_sdk::dsh::Properties;
2+
use dsh_sdk::rdkafka::consumer::CommitMode;
23
use dsh_sdk::rdkafka::consumer::{Consumer, StreamConsumer};
34
use dsh_sdk::rdkafka::producer::{FutureProducer, FutureRecord};
45
use dsh_sdk::rdkafka::Message;
@@ -31,6 +32,7 @@ async fn consume(consumer: &mut StreamConsumer, topic: &str) {
3132
let payload = String::from_utf8_lossy(msg.payload().unwrap());
3233
let key = usize::from_be_bytes(msg.key().unwrap().try_into().unwrap());
3334
println!("Received message: key: {}, payload: {}", key, payload);
35+
consumer.commit_message(&msg, CommitMode::Async).unwrap();
3436
i += 1;
3537
}
3638
}

example_dsh_service/src/main.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ use dsh_sdk::graceful_shutdown::Shutdown;
33
use dsh_sdk::rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
44
use dsh_sdk::rdkafka::message::{BorrowedMessage, Message};
55

6+
use log::{info, error};
7+
68
mod custom_metrics;
79

810
fn deserialize_and_print(msg: &BorrowedMessage) {
911
let payload = String::from_utf8_lossy(msg.payload().unwrap_or(b""));
1012
let key = String::from_utf8_lossy(msg.key().unwrap_or(b""));
1113

12-
println!(
14+
info!(
1315
"Received message from topic {} partition {} offset {} with key {:?} and payload {}",
1416
msg.topic(),
1517
msg.partition(),
@@ -28,14 +30,12 @@ async fn consume(consumer: StreamConsumer, shutdown: Shutdown) {
2830
// Deserialize and print the message
2931
deserialize_and_print(&msg);
3032
// Commit the message
31-
match consumer.commit_message(&msg, CommitMode::Sync)
32-
{
33-
Ok(_) => {}
34-
Err(e) => println!("Error while committing message: {:?}", e),
33+
if let Err(e) = consumer.commit_message(&msg, CommitMode::Sync) {
34+
error!("Error while committing message: {:?}", e);
3535
}
3636
},
3737
_ = shutdown.recv() => {
38-
println!("Shutdown requested, breaking out of consumer");
38+
info!("Shutdown requested, breaking out of consumer");
3939
consumer.unsubscribe();
4040
break;
4141
}
@@ -76,14 +76,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7676
consumer_client_config.set("auto.offset.reset", "latest");
7777

7878
// Create a new consumer instance
79-
let consumer: StreamConsumer = consumer_client_config
80-
.create()
81-
.expect("Consumer creation failed");
79+
let consumer: StreamConsumer = consumer_client_config.create()?;
8280

8381
// Subscribe to the configured topics
84-
consumer
85-
.subscribe(&topics)
86-
.expect("Can't subscribe to specified topics");
82+
consumer.subscribe(&topics)?;
8783

8884
// Create handle for consuming messages,
8985
let shutdown_clone = shutdown.clone();
@@ -94,10 +90,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9490
// Wait for shutdown signal or that the consumer has stopped
9591
tokio::select! {
9692
_ = shutdown.signal_listener() => {
97-
println!("Shutdown signal received");
93+
info!("Shutdown signal received");
9894
}
9995
_ = consumer_handle => {
100-
println!("Consumer stopped");
96+
info!("Consumer stopped");
10197
shutdown.start(); // Start the shutdown process (this will stop other potential running tasks that implemented the shutdown listener)
10298
}
10399
}

0 commit comments

Comments
 (0)