From 3439e958d96715e78ed8327994bf6dfb33daff11 Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Thu, 28 Nov 2024 10:21:06 +0100 Subject: [PATCH] examples in readme and example dir --- examples/tcp/Cargo.toml | 9 ++ examples/tcp/src/ping_pong.rs | 54 ++++++++++ examples/tcp/src/ping_pong_smol.rs | 52 ++++++++++ examples/tcp/src/tokio.rs | 1 + README.md => mqrstt/README.md | 160 +++++++++++------------------ mqrstt/src/packets/error.rs | 2 +- mqrstt/src/tokio/network.rs | 2 + 7 files changed, 179 insertions(+), 101 deletions(-) create mode 100644 examples/tcp/src/ping_pong.rs create mode 100644 examples/tcp/src/ping_pong_smol.rs rename README.md => mqrstt/README.md (61%) diff --git a/examples/tcp/Cargo.toml b/examples/tcp/Cargo.toml index b42bc70..15e0444 100644 --- a/examples/tcp/Cargo.toml +++ b/examples/tcp/Cargo.toml @@ -6,6 +6,7 @@ license = "MIT" [dependencies] smol = { version = "2" } +futures = "0.3.31" tokio = { version = "1", features = ["full"] } @@ -15,6 +16,14 @@ mqrstt = { path = "../../mqrstt", features = ["logs"] } name = "tokio" path = "src/tokio.rs" +[[bin]] +name = "ping_pong" +path = "src/ping_pong.rs" + +[[bin]] +name = "ping_pong_smol" +path = "src/ping_pong_smol.rs" + [[bin]] name = "smol" path = "src/smol.rs" diff --git a/examples/tcp/src/ping_pong.rs b/examples/tcp/src/ping_pong.rs new file mode 100644 index 0000000..3081554 --- /dev/null +++ b/examples/tcp/src/ping_pong.rs @@ -0,0 +1,54 @@ +use mqrstt::{ + packets::{self, Packet}, + AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus, +}; +use tokio::time::Duration; + +pub struct PingPong { + pub client: MqttClient, +} +impl AsyncEventHandler for PingPong { + // Handlers only get INCOMING packets. + async fn handle(&mut self, event: packets::Packet) { + match event { + Packet::Publish(p) => { + if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { + if payload.to_lowercase().contains("ping") { + self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap(); + println!("Received Ping, Send pong!"); + } + } + } + Packet::ConnAck(_) => { + println!("Connected!") + } + _ => (), + } + } +} + +#[tokio::main] +async fn main() { + let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network(); + + let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap(); + let stream = tokio::io::BufStream::new(stream); + + let mut pingpong = PingPong { client: client.clone() }; + + network.connect(stream, &mut pingpong).await.unwrap(); + + client.subscribe("mqrstt").await.unwrap(); + + let network_handle = tokio::spawn(async move { + let result = network.run(&mut pingpong).await; + (result, pingpong) + }); + + tokio::time::sleep(Duration::from_secs(30)).await; + client.disconnect().await.unwrap(); + + let (result, _pingpong) = network_handle.await.unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect); +} diff --git a/examples/tcp/src/ping_pong_smol.rs b/examples/tcp/src/ping_pong_smol.rs new file mode 100644 index 0000000..2647daa --- /dev/null +++ b/examples/tcp/src/ping_pong_smol.rs @@ -0,0 +1,52 @@ +use mqrstt::{ + packets::{self, Packet}, + AsyncEventHandler, ConnectOptions, MqttClient, NetworkBuilder, NetworkStatus, +}; +pub struct PingPong { + pub client: MqttClient, +} +impl AsyncEventHandler for PingPong { + // Handlers only get INCOMING packets. This can change later. + async fn handle(&mut self, event: packets::Packet) { + match event { + Packet::Publish(p) => { + if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { + if payload.to_lowercase().contains("ping") { + self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap(); + println!("Received Ping, Send pong!"); + } + } + } + Packet::ConnAck(_) => { + println!("Connected!") + } + _ => (), + } + } +} +fn main() { + smol::block_on(async { + let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network(); + let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap(); + + let mut pingpong = PingPong { client: client.clone() }; + + network.connect(stream, &mut pingpong).await.unwrap(); + + // This subscribe is only processed when we run the network + client.subscribe("mqrstt").await.unwrap(); + + let task_handle = smol::spawn(async move { + let result = network.run(&mut pingpong).await; + (result, pingpong) + }); + + smol::Timer::after(std::time::Duration::from_secs(30)).await; + client.disconnect().await.unwrap(); + + let (result, _pingpong) = task_handle.await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect); + }); +} diff --git a/examples/tcp/src/tokio.rs b/examples/tcp/src/tokio.rs index 1e9693b..e3db001 100644 --- a/examples/tcp/src/tokio.rs +++ b/examples/tcp/src/tokio.rs @@ -21,6 +21,7 @@ async fn main() { let mut handler = Handler { byte_count: 0 }; let stream = tokio::net::TcpStream::connect(hostname).await.unwrap(); + let stream = tokio::io::BufStream::new(stream); let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").tokio_network(); network.connect(stream, &mut handler).await.unwrap(); diff --git a/README.md b/mqrstt/README.md similarity index 61% rename from README.md rename to mqrstt/README.md index 7e112dc..8a7fb01 100644 --- a/README.md +++ b/mqrstt/README.md @@ -24,9 +24,8 @@ For a sync approach the stream has to implement the [`std::io::Read`] and [`std: - Keep alive depends on actual communication ### To do -- no_std (Requires a lot of work to use no heap allocations and depend on stack) - Even More testing -- More documentation +- Add TLS examples to repository ## MSRV From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in trait feature. @@ -38,119 +37,90 @@ From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in - Create a new connection when an error or disconnect is encountered - Handlers only get incoming packets -### TLS: -TLS examples are too larger for a README. [TLS examples](https://github.com/GunnarMorrigan/mqrstt/tree/main/examples). ### Smol example: ```rust use mqrstt::{ - MqttClient, - ConnectOptions, - new_smol, packets::{self, Packet}, - AsyncEventHandler, - smol::NetworkStatus, + AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus, }; -use bytes::Bytes; pub struct PingPong { pub client: MqttClient, } impl AsyncEventHandler for PingPong { // Handlers only get INCOMING packets. This can change later. - async fn handle(&mut self, event: packets::Packet { + async fn handle(&mut self, event: packets::Packet) { match event { Packet::Publish(p) => { if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { if payload.to_lowercase().contains("ping") { - self.client - .publish( - p.topic.clone(), - p.qos, - p.retain, - Bytes::from_static(b"pong"), - ) - .await - .unwrap(); + self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap(); println!("Received Ping, Send pong!"); } } - }, - Packet::ConnAck(_) => { println!("Connected!") }, + } + Packet::ConnAck(_) => { + println!("Connected!") + } _ => (), } } } -smol::block_on(async { - let options = ConnectOptions::new("mqrsttSmolExample"); - let (mut network, client) = new_smol(options); - let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)) - .await - .unwrap(); - - let mut pingpong = PingPong { - client: client.clone(), - }; +fn main() { + smol::block_on(async { + let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network(); + let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap(); - network.connect(stream, &mut pingpong).await.unwrap(); + let mut pingpong = PingPong { client: client.clone() }; - // This subscribe is only processed when we run the network - client.subscribe("mqrstt").await.unwrap(); + network.connect(stream, &mut pingpong).await.unwrap(); + + // This subscribe is only processed when we run the network + client.subscribe("mqrstt").await.unwrap(); + + let task_handle = smol::spawn(async move { + let result = network.run(&mut pingpong).await; + (result, pingpong) + }); + + smol::Timer::after(std::time::Duration::from_secs(30)).await; + client.disconnect().await.unwrap(); + + let (result, _pingpong) = task_handle.await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect); + }); +} - let (n, t) = futures::join!( - async { - loop { - return match network.poll(&mut pingpong).await { - Ok(NetworkStatus::Active) => continue, - otherwise => otherwise, - }; - } - }, - async { - smol::Timer::after(std::time::Duration::from_secs(30)).await; - client.disconnect().await.unwrap(); - } - ); - assert!(n.is_ok()); -}); ``` ### Tokio example: ```rust use mqrstt::{ - MqttClient, - ConnectOptions, - new_tokio, packets::{self, Packet}, - AsyncEventHandler, - tokio::NetworkStatus, + AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus, }; use tokio::time::Duration; -use bytes::Bytes; pub struct PingPong { pub client: MqttClient, } impl AsyncEventHandler for PingPong { - // Handlers only get INCOMING packets. This can change later. + // Handlers only get INCOMING packets. async fn handle(&mut self, event: packets::Packet) { match event { Packet::Publish(p) => { if let Ok(payload) = String::from_utf8(p.payload.to_vec()) { if payload.to_lowercase().contains("ping") { - self.client - .publish( - p.topic.clone(), - p.qos, - p.retain, - Bytes::from_static(b"pong"), - ) - .await - .unwrap(); + self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap(); println!("Received Ping, Send pong!"); } } - }, - Packet::ConnAck(_) => { println!("Connected!") }, + } + Packet::ConnAck(_) => { + println!("Connected!") + } _ => (), } } @@ -158,39 +128,30 @@ impl AsyncEventHandler for PingPong { #[tokio::main] async fn main() { - let options = ConnectOptions::new("TokioTcpPingPongExample"); - - let (mut network, client) = new_tokio(options); - - let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)) - .await - .unwrap(); - - let mut pingpong = PingPong { - client: client.clone(), - }; - + let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network(); + + let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap(); + let stream = tokio::io::BufStream::new(stream); + + let mut pingpong = PingPong { client: client.clone() }; + network.connect(stream, &mut pingpong).await.unwrap(); - + client.subscribe("mqrstt").await.unwrap(); - - - let (n, _) = tokio::join!( - async { - loop { - return match network.poll(&mut pingpong).await { - Ok(NetworkStatus::Active) => continue, - otherwise => otherwise, - }; - } - }, - async { - tokio::time::sleep(Duration::from_secs(30)).await; - client.disconnect().await.unwrap(); - } - ); - assert!(n.is_ok()); + + let network_handle = tokio::spawn(async move { + let result = network.run(&mut pingpong).await; + (result, pingpong) + }); + + tokio::time::sleep(Duration::from_secs(30)).await; + client.disconnect().await.unwrap(); + + let (result, _pingpong) = network_handle.await.unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect); } + ``` ### Sync example: @@ -284,7 +245,6 @@ Licensed under * Mozilla Public License, Version 2.0, [(MPL-2.0)](https://choosealicense.com/licenses/mpl-2.0/) ## Contribution - Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or conditions. diff --git a/mqrstt/src/packets/error.rs b/mqrstt/src/packets/error.rs index 68ede44..c6f120a 100644 --- a/mqrstt/src/packets/error.rs +++ b/mqrstt/src/packets/error.rs @@ -20,7 +20,7 @@ pub enum ReadError { IoError(#[from] std::io::Error), } -#[derive(Error, Clone, Debug)] +#[derive(Error, Clone, Debug, PartialEq, Eq)] pub enum DeserializeError { #[error("Malformed packet: {0}")] MalformedPacketWithInfo(String), diff --git a/mqrstt/src/tokio/network.rs b/mqrstt/src/tokio/network.rs index bdfb57b..6b691cb 100644 --- a/mqrstt/src/tokio/network.rs +++ b/mqrstt/src/tokio/network.rs @@ -55,6 +55,8 @@ where S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static, { /// Initializes an MQTT connection with the provided configuration an stream + /// + /// It is recommended to use a buffered stream. [`tokio::io::BufStream`] could be used to easily buffer both read and write. pub async fn connect(&mut self, mut stream: S, handler: &mut H) -> Result<(), ConnectionError> { let conn_ack = stream.connect(&self.options).await?; self.last_network_action = Instant::now();