Skip to content

Commit

Permalink
Merge pull request #12 from hdoordt/test-topic-chan
Browse files Browse the repository at this point in the history
Add simple test for TopicChannels
  • Loading branch information
hdoordt authored Mar 19, 2023
2 parents 6662f58 + 2d3e4a8 commit 0d05660
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/chan/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ impl<B: TopicBus> TryFrom<String> for ConsumerRoutingKey<B> {
}
}

impl<B: TopicBus> TryFrom<&str> for ConsumerRoutingKey<B> {
type Error = <Self as TryFrom<String>>::Error;

fn try_from(key: &str) -> std::result::Result<Self, Self::Error> {
<Self as TryFrom<String>>::try_from(key.to_owned())
}
}

impl<B> Display for ConsumerRoutingKey<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.key.fmt(f)
Expand Down Expand Up @@ -244,6 +252,14 @@ impl<B: TopicBus> TryFrom<String> for PublisherRoutingKey<B> {
}
}

impl<B: TopicBus> TryFrom<&str> for PublisherRoutingKey<B> {
type Error = <Self as TryFrom<String>>::Error;

fn try_from(key: &str) -> std::result::Result<Self, Self::Error> {
<Self as TryFrom<String>>::try_from(key.to_owned())
}
}

impl<B> Display for PublisherRoutingKey<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.key.fmt(f)
Expand Down Expand Up @@ -277,12 +293,57 @@ impl std::error::Error for RoutingKeyError {}

#[cfg(test)]
mod tests {
use crate::{topic_bus, topic_exchange, ConsumerRoutingKey, PublisherRoutingKey};
use std::time::Duration;

use crate::{
topic_bus, topic_exchange, Connection, Consumer, ConsumerRoutingKey, FramePayload,
Publisher, PublisherRoutingKey, TopicChannel, RABBIT_MQ_URL,
};
use futures::StreamExt;
use test_case::test_case;
use tokio::{sync::oneshot, time::timeout};
use uuid::Uuid;

topic_exchange!(MyExchange, "the_exchange");

topic_bus!(MyTopic, (), MyExchange, "frame.*.*");
topic_bus!(MyTopic, FramePayload, MyExchange, "frame.*.*");

#[tokio::test]
async fn publish() -> crate::Result<()> {
let connection = Connection::connect(RABBIT_MQ_URL).await.unwrap();
let uuid = Uuid::new_v4();
let (tx, rx) = oneshot::channel();
tokio::task::spawn({
let channel: TopicChannel<MyExchange> = TopicChannel::new(&connection).await.unwrap();
let mut consumer: Consumer<_, MyTopic> = channel
.consumer("frame.*.*".try_into().unwrap(), &Uuid::new_v4().to_string())
.await?;
async move {
let msg = consumer.next().await.unwrap().unwrap();
msg.ack(false).await.unwrap();
let payload = msg.get_payload().unwrap();
assert_eq!(payload.message, uuid.to_string());
tx.send(()).unwrap();
}
});

let channel: TopicChannel<MyExchange> = TopicChannel::new(&connection).await.unwrap();
let publisher: Publisher<_, MyTopic> = channel.publisher();

publisher
.publish_topic(
"frame.1.2".try_into().unwrap(),
&FramePayload {
message: uuid.to_string(),
},
)
.await
.unwrap();

timeout(Duration::from_secs(3), rx).await.unwrap().unwrap();

Ok(())
}

#[test_case("frame.123.456"; "1")]
fn test_valid_publish_routing_key(key: &str) {
Expand Down

0 comments on commit 0d05660

Please sign in to comment.