Skip to content

Commit

Permalink
examples in readme and example dir
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 28, 2024
1 parent 5caa701 commit 3439e95
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 101 deletions.
9 changes: 9 additions & 0 deletions examples/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license = "MIT"

[dependencies]
smol = { version = "2" }
futures = "0.3.31"

tokio = { version = "1", features = ["full"] }

Expand All @@ -15,6 +16,14 @@ mqrstt = { path = "../../mqrstt", features = ["logs"] }
name = "tokio"
path = "src/tokio.rs"

[[bin]]
name = "ping_pong"
path = "src/ping_pong.rs"

[[bin]]
name = "ping_pong_smol"
path = "src/ping_pong_smol.rs"

[[bin]]
name = "smol"
path = "src/smol.rs"
54 changes: 54 additions & 0 deletions examples/tcp/src/ping_pong.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use mqrstt::{
packets::{self, Packet},
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
};
use tokio::time::Duration;

pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets.
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
println!("Received Ping, Send pong!");
}
}
}
Packet::ConnAck(_) => {
println!("Connected!")
}
_ => (),
}
}
}

#[tokio::main]
async fn main() {
let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network();

let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
let stream = tokio::io::BufStream::new(stream);

let mut pingpong = PingPong { client: client.clone() };

network.connect(stream, &mut pingpong).await.unwrap();

client.subscribe("mqrstt").await.unwrap();

let network_handle = tokio::spawn(async move {
let result = network.run(&mut pingpong).await;
(result, pingpong)
});

tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();

let (result, _pingpong) = network_handle.await.unwrap();
assert!(result.is_ok());
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
}
52 changes: 52 additions & 0 deletions examples/tcp/src/ping_pong_smol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use mqrstt::{
packets::{self, Packet},
AsyncEventHandler, ConnectOptions, MqttClient, NetworkBuilder, NetworkStatus,
};
pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
println!("Received Ping, Send pong!");
}
}
}
Packet::ConnAck(_) => {
println!("Connected!")
}
_ => (),
}
}
}
fn main() {
smol::block_on(async {
let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network();
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();

let mut pingpong = PingPong { client: client.clone() };

network.connect(stream, &mut pingpong).await.unwrap();

// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();

let task_handle = smol::spawn(async move {
let result = network.run(&mut pingpong).await;
(result, pingpong)
});

smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();

let (result, _pingpong) = task_handle.await;

assert!(result.is_ok());
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
});
}
1 change: 1 addition & 0 deletions examples/tcp/src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async fn main() {
let mut handler = Handler { byte_count: 0 };

let stream = tokio::net::TcpStream::connect(hostname).await.unwrap();
let stream = tokio::io::BufStream::new(stream);
let (mut network, client) = mqrstt::NetworkBuilder::new_from_client_id("TestClientABCDEFG").tokio_network();

network.connect(stream, &mut handler).await.unwrap();
Expand Down
160 changes: 60 additions & 100 deletions README.md → mqrstt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ For a sync approach the stream has to implement the [`std::io::Read`] and [`std:
- Keep alive depends on actual communication

### To do
- no_std (Requires a lot of work to use no heap allocations and depend on stack)
- Even More testing
- More documentation
- Add TLS examples to repository

## MSRV
From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in trait feature.
Expand All @@ -38,159 +37,121 @@ From 0.3 the tokio and smol variants will require MSRV: 1.75 due to async fn in
- Create a new connection when an error or disconnect is encountered
- Handlers only get incoming packets

### TLS:
TLS examples are too larger for a README. [TLS examples](https://github.com/GunnarMorrigan/mqrstt/tree/main/examples).

### Smol example:
```rust
use mqrstt::{
MqttClient,
ConnectOptions,
new_smol,
packets::{self, Packet},
AsyncEventHandler,
smol::NetworkStatus,
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
};
use bytes::Bytes;
pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet {
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
Bytes::from_static(b"pong"),
)
.await
.unwrap();
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
}
Packet::ConnAck(_) => {
println!("Connected!")
}
_ => (),
}
}
}
smol::block_on(async {
let options = ConnectOptions::new("mqrsttSmolExample");
let (mut network, client) = new_smol(options);
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();

let mut pingpong = PingPong {
client: client.clone(),
};
fn main() {
smol::block_on(async {
let (mut network, client) = NetworkBuilder::new_from_client_id("mqrsttSmolExample").smol_network();
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();

network.connect(stream, &mut pingpong).await.unwrap();
let mut pingpong = PingPong { client: client.clone() };

// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();
network.connect(stream, &mut pingpong).await.unwrap();

// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();

let task_handle = smol::spawn(async move {
let result = network.run(&mut pingpong).await;
(result, pingpong)
});

smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();

let (result, _pingpong) = task_handle.await;

assert!(result.is_ok());
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
});
}

let (n, t) = futures::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
});
```

### Tokio example:
```rust
use mqrstt::{
MqttClient,
ConnectOptions,
new_tokio,
packets::{self, Packet},
AsyncEventHandler,
tokio::NetworkStatus,
AsyncEventHandler, MqttClient, NetworkBuilder, NetworkStatus,
};
use tokio::time::Duration;
use bytes::Bytes;

pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
// Handlers only get INCOMING packets.
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
Bytes::from_static(b"pong"),
)
.await
.unwrap();
self.client.publish(p.topic.clone(), p.qos, p.retain, b"pong").await.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
}
Packet::ConnAck(_) => {
println!("Connected!")
}
_ => (),
}
}
}

#[tokio::main]
async fn main() {
let options = ConnectOptions::new("TokioTcpPingPongExample");

let (mut network, client) = new_tokio(options);

let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();

let mut pingpong = PingPong {
client: client.clone(),
};

let (mut network, client) = NetworkBuilder::new_from_client_id("TokioTcpPingPongExample").tokio_network();

let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883)).await.unwrap();
let stream = tokio::io::BufStream::new(stream);

let mut pingpong = PingPong { client: client.clone() };

network.connect(stream, &mut pingpong).await.unwrap();

client.subscribe("mqrstt").await.unwrap();


let (n, _) = tokio::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());

let network_handle = tokio::spawn(async move {
let result = network.run(&mut pingpong).await;
(result, pingpong)
});

tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();

let (result, _pingpong) = network_handle.await.unwrap();
assert!(result.is_ok());
assert_eq!(result.unwrap(), NetworkStatus::OutgoingDisconnect);
}

```

### Sync example:
Expand Down Expand Up @@ -284,7 +245,6 @@ Licensed under
* Mozilla Public License, Version 2.0, [(MPL-2.0)](https://choosealicense.com/licenses/mpl-2.0/)

## Contribution

Unless you explicitly state otherwise, any contribution intentionally
submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or
conditions.
2 changes: 1 addition & 1 deletion mqrstt/src/packets/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum ReadError {
IoError(#[from] std::io::Error),
}

#[derive(Error, Clone, Debug)]
#[derive(Error, Clone, Debug, PartialEq, Eq)]
pub enum DeserializeError {
#[error("Malformed packet: {0}")]
MalformedPacketWithInfo(String),
Expand Down
2 changes: 2 additions & 0 deletions mqrstt/src/tokio/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ where
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Sized + Unpin + Send + 'static,
{
/// Initializes an MQTT connection with the provided configuration an stream
///
/// It is recommended to use a buffered stream. [`tokio::io::BufStream`] could be used to easily buffer both read and write.
pub async fn connect(&mut self, mut stream: S, handler: &mut H) -> Result<(), ConnectionError> {
let conn_ack = stream.connect(&self.options).await?;
self.last_network_action = Instant::now();
Expand Down

0 comments on commit 3439e95

Please sign in to comment.