Skip to content

Commit

Permalink
Made examples smaller with NOP handler
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed May 16, 2023
1 parent ce07f69 commit ee141e6
Showing 1 changed file with 61 additions and 136 deletions.
197 changes: 61 additions & 136 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! A pure rust MQTT client which is easy to use, efficient and provides both sync and async options.
//!
//!
//! Because this crate aims to be runtime agnostic the user is required to provide their own data stream.
//! For an async approach the stream has to implement the smol or tokio [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
//! For a sync approach the stream has to implement the [`std::io::Read`] and [`std::io::Write`] traits.
//!
//!
//!
//! Features:
//! ----------------------------
//! - MQTT v5
Expand All @@ -13,87 +12,57 @@
//! - TLS/TCP
//! - Lean
//! - Keep alive depends on actual communication
//!
//!
//!
//! To do
//! ----------------------------
//! - Enforce size of outbound messages (e.g. Publish)
//! - QUIC via QUINN
//! - Even More testing
//! - More documentation
//! - Remove logging calls or move all to test flag
//!
//!
//!
//! Notes:
//! ----------------------------
//! - Your handler should not wait too long
//! - Create a new connection when an error or disconnect is encountered
//! - Handlers only get incoming packets
//! - Sync mode requires a non blocking stream
//!
//!
//!
//! Smol example:
//! ----------------------------
//! ```rust
//! use mqrstt::{
//! MqttClient,
//! NOP,
//! ConnectOptions,
//! new_smol,
//! packets::{self, Packet},
//! AsyncEventHandler,
//! smol::NetworkStatus,
//! };
//! use async_trait::async_trait;
//! use bytes::Bytes;
//! pub struct PingPong {
//! pub client: MqttClient,
//! }
//! #[async_trait]
//! impl AsyncEventHandler for PingPong {
//! // Handlers only get INCOMING packets. This can change later.
//! async fn handle(&mut self, event: packets::Packet) -> () {
//! match event {
//! Packet::Publish(p) => {
//! if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
//! if payload.to_lowercase().contains("ping") {
//! self.client
//! .publish(
//! p.topic.clone(),
//! p.qos,
//! p.retain,
//! Bytes::from_static(b"pong"),
//! )
//! .await
//! .unwrap();
//! println!("Received Ping, Send pong!");
//! }
//! }
//! },
//! Packet::ConnAck(_) => { println!("Connected!") },
//! _ => (),
//! }
//! }
//! }
//!
//! smol::block_on(async {
//! let options = ConnectOptions::new("mqrsttSmolExample".to_string());
//!
//! // Construct a no op handler
//! let mut nop = NOP{};
//!
//! // In normal operations you would want to loop this connection
//! // To reconnect after a disconnect or error
//! let (mut network, client) = new_smol(options);
//! let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
//! .await
//! .unwrap();
//! network.connect(stream, &mut nop).await.unwrap();
//!
//! let mut pingpong = PingPong {
//! client: client.clone(),
//! };
//!
//! network.connect(stream, &mut pingpong).await.unwrap();
//!
//! // This subscribe is only processed when we run the network
//! client.subscribe("mqrstt").await.unwrap();
//!
//!
//! let (n, t) = futures::join!(
//! async {
//! loop {
//! return match network.poll(&mut pingpong).await {
//! return match network.poll(&mut nop).await {
//! Ok(NetworkStatus::Active) => continue,
//! otherwise => otherwise,
//! };
Expand All @@ -107,13 +76,14 @@
//! assert!(n.is_ok());
//! });
//! ```
//!
//!
//!
//!
//! Tokio example:
//! ----------------------------
//! ```rust
//! use mqrstt::{
//! MqttClient,
//! NOP,
//! ConnectOptions,
//! new_tokio,
//! packets::{self, Packet},
Expand All @@ -122,61 +92,28 @@
//! };
//! use tokio::time::Duration;
//! use async_trait::async_trait;
//! use bytes::Bytes;
//!
//! pub struct PingPong {
//! pub client: MqttClient,
//! }
//! #[async_trait]
//! impl AsyncEventHandler for PingPong {
//! // Handlers only get INCOMING packets. This can change later.
//! async fn handle(&mut self, event: packets::Packet) -> () {
//! match event {
//! Packet::Publish(p) => {
//! if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
//! if payload.to_lowercase().contains("ping") {
//! self.client
//! .publish(
//! p.topic.clone(),
//! p.qos,
//! p.retain,
//! Bytes::from_static(b"pong"),
//! )
//! .await
//! .unwrap();
//! println!("Received Ping, Send pong!");
//! }
//! }
//! },
//! Packet::ConnAck(_) => { println!("Connected!") },
//! _ => (),
//! }
//! }
//! }
//!
//!
//! #[tokio::main]
//! async fn main() {
//! let options = ConnectOptions::new("TokioTcpPingPongExample".to_string());
//!
//! let (mut network, client) = new_tokio(options);
//!
//!
//! // Construct a no op handler
//! let mut nop = NOP{};
//!
//! // In normal operations you would want to loop this connection
//! // To reconnect after a disconnect or error
//! let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
//! .await
//! .unwrap();
//!
//! let mut pingpong = PingPong {
//! client: client.clone(),
//! };
//!
//! network.connect(stream, &mut pingpong).await.unwrap();
//! network.connect(stream, &mut nop).await.unwrap();
//!
//! client.subscribe("mqrstt").await.unwrap();
//!
//!
//! let (n, _) = tokio::join!(
//! async {
//! loop {
//! return match network.poll(&mut pingpong).await {
//! return match network.poll(&mut nop).await {
//! Ok(NetworkStatus::Active) => continue,
//! otherwise => otherwise,
//! };
Expand All @@ -190,71 +127,42 @@
//! assert!(n.is_ok());
//! }
//! ```
//!
//!
//! Sync example:
//! ----------------------------
//! ```rust
//! use mqrstt::{
//! MqttClient,
//! NOP,
//! ConnectOptions,
//! new_sync,
//! packets::{self, Packet},
//! EventHandler,
//! sync::NetworkStatus,
//! };
//! use std::net::TcpStream;
//! use bytes::Bytes;
//!
//! pub struct PingPong {
//! pub client: MqttClient,
//! }
//!
//! impl EventHandler for PingPong {
//! // Handlers only get INCOMING packets. This can change later.
//! fn handle(&mut self, event: packets::Packet) -> () {
//! match event {
//! Packet::Publish(p) => {
//! if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
//! if payload.to_lowercase().contains("ping") {
//! self.client
//! .publish_blocking(
//! p.topic.clone(),
//! p.qos,
//! p.retain,
//! Bytes::from_static(b"pong"),
//! ).unwrap();
//! println!("Received Ping, Send pong!");
//! }
//! }
//! },
//! Packet::ConnAck(_) => { println!("Connected!") },
//! _ => (),
//! }
//! }
//! }
//!
//!
//!
//! let mut client_id: String = "SyncTcpPingReqTestExample".to_string();
//! let options = ConnectOptions::new(client_id);
//!
//!
//! let address = "broker.emqx.io";
//! let port = 1883;
//!
//!
//! let (mut network, client) = new_sync(options);
//!
//! // IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block!
//!
//! // Construct a no op handler
//! let mut nop = NOP{};
//!
//! // In normal operations you would want to loop connect
//! // To reconnect after a disconnect or error
//! let stream = TcpStream::connect((address, port)).unwrap();
//! // IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block!
//! stream.set_nonblocking(true).unwrap();
//!
//! let mut pingpong = PingPong {
//! client: client.clone(),
//! };
//!
//! network.connect(stream, &mut pingpong).unwrap();
//!
//! network.connect(stream, &mut nop).unwrap();
//!
//! let res_join_handle = std::thread::spawn(move ||
//! loop {
//! match network.poll(&mut pingpong) {
//! match network.poll(&mut nop) {
//! Ok(NetworkStatus::ActivePending) => {
//! std::thread::sleep(std::time::Duration::from_millis(100));
//! },
Expand All @@ -265,7 +173,7 @@
//! }
//! }
//! );
//!
//!
//! std::thread::sleep(std::time::Duration::from_secs(30));
//! client.disconnect_blocking().unwrap();
//! let join_res = res_join_handle.join();
Expand Down Expand Up @@ -312,6 +220,23 @@ pub trait EventHandler {
fn handle(&mut self, incoming_packet: Packet);
}

/// Most basic no op handler
/// This handler performs no operations on incoming messages.
pub struct NOP{}

#[async_trait::async_trait]
impl AsyncEventHandler for NOP{
async fn handle(&mut self, _: Packet){

}
}

impl EventHandler for NOP{
fn handle(&mut self, _: Packet){

}
}

#[cfg(feature = "smol")]
/// Creates the needed components to run the MQTT client using a stream that implements [`smol::io::AsyncReadExt`] and [`smol::io::AsyncWriteExt`]
/// ```
Expand Down

0 comments on commit ee141e6

Please sign in to comment.