-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
remove ambiguous rexport lazy_static
- Loading branch information
Showing
2 changed files
with
87 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// make sure to use the dlq feature in your Cargo.toml | ||
// dsh_sdk = { version = "0.1.0", features = ["dlq"] } | ||
// | ||
// To run this example, run the following command: | ||
// cargo run --features dlq --example dlq_implementation | ||
|
||
use dsh_sdk::bootstrap::Bootstrap; | ||
use dsh_sdk::dlq; | ||
use dsh_sdk::dlq::ErrorToDlq; | ||
use dsh_sdk::graceful_shutdown::Shutdown; | ||
use dsh_sdk::rdkafka::consumer::stream_consumer::StreamConsumer; | ||
use dsh_sdk::rdkafka::Message; | ||
use std::backtrace::Backtrace; | ||
use thiserror::Error; | ||
use tokio::sync::mpsc; | ||
|
||
// Define your custom error type | ||
#[derive(Error, Debug)] | ||
enum ConsumerError { | ||
#[error("Deserialization error: {0}")] | ||
DeserializeError(#[from] std::string::FromUtf8Error), | ||
} | ||
|
||
// implement the ErrorToDlq trait for your custom error type (or exusting error types) | ||
impl dlq::ErrorToDlq for ConsumerError { | ||
fn to_dlq(&self, kafka_message: rdkafka::message::OwnedMessage) -> dlq::SendToDlq { | ||
let backtrace = Backtrace::force_capture(); | ||
dlq::SendToDlq::new( | ||
kafka_message, | ||
self.retryable(), | ||
self.to_string(), | ||
backtrace.to_string(), | ||
) | ||
} | ||
// Definition if error is retryable or not | ||
fn retryable(&self) -> dlq::Retryable { | ||
match self { | ||
ConsumerError::DeserializeError(_) => dlq::Retryable::NonRetryable, | ||
} | ||
} | ||
} | ||
|
||
// simple deserialization function, that returns a Result of string or defined ConsumerError | ||
fn deserialize(msg: dsh_sdk::rdkafka::message::OwnedMessage) -> Result<String, ConsumerError> { | ||
match msg.payload() { | ||
Some(payload) => Ok(String::from_utf8(payload.to_vec())?), | ||
None => Ok("".to_string()), | ||
} | ||
} | ||
|
||
// simple consumer function | ||
async fn consume(consumer: &mut StreamConsumer, dlq_tx: &mut mpsc::Sender<dlq::SendToDlq>) { | ||
while let Ok(msg) = consumer.recv().await { | ||
match deserialize(msg.detach()) { | ||
// send message to dlq if error occurs | ||
Err(e) => e.to_dlq(msg.detach()).send(dlq_tx).await, | ||
// process message, in this case print payload | ||
Ok(payload) => { | ||
println!("Payload: {}", payload) | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
std::env::set_var("DLQ_DEAD_TOPIC", "scratch.dlq.local-tenant"); | ||
std::env::set_var("DLQ_RETRY_TOPIC", "scratch.dlq.local-tenant"); | ||
let bootstrap = Bootstrap::new().await?; | ||
let shutdown = Shutdown::new(); | ||
let mut dlq = dlq::Dlq::new(&bootstrap, shutdown.clone())?; | ||
// get the dlq channel sender to send messages to the dlq | ||
// for example in your consumer | ||
let mut dlq_tx = dlq.get_dlq_tx(); | ||
// run the dlq in a separate tokio task | ||
let dlq_handle = tokio::spawn(async move { | ||
dlq.run().await; | ||
}); | ||
Ok(()) | ||
} | ||
|
||
async fn procuce(bootstrap: Bootstrap) -> Result<(), Box<dyn std::error::Error>> { | ||
let mut producer = bootstrap.producer_rdkafka_config(); | ||
|
||
producer.send(topic, None, Some()).await?; | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters