Skip to content

Commit

Permalink
0.3.0-alpha.1
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Jan 1, 2024
1 parent 0dce384 commit 98fff6d
Show file tree
Hide file tree
Showing 20 changed files with 1,464 additions and 1,068 deletions.
295 changes: 64 additions & 231 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 12 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqrstt"
version = "0.2.2"
version = "0.3.0-alpha.1"
homepage = "https://github.com/GunnarMorrigan/mqrstt"
repository = "https://github.com/GunnarMorrigan/mqrstt"
documentation = "https://docs.rs/mqrstt"
Expand All @@ -9,50 +9,46 @@ readme = "README.md"
edition = "2021"
license = "MPL-2.0"
keywords = [ "MQTT", "IoT", "MQTTv5", "messaging", "client" ]
description = "Pure rust MQTTv5 client implementation for sync and async (Smol & Tokio)"
description = "Pure rust MQTTv5 client implementation Smol and Tokio"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["smol", "tokio_mqtt", "concurrent_tokio"]
concurrent_tokio = ["dep:tokio", "tokio/rt"]
tokio_mqtt = ["dep:tokio"]
default = ["smol", "tokio", "tokio_concurrent"]
tokio_concurrent = ["dep:tokio", "tokio/rt"]
tokio = ["dep:tokio"]
smol = ["dep:smol"]
sync = []
logs = ["dep:tracing"]
# quic = ["dep:quinn"]

[dependencies]
# Packets
bytes = "1.5.0"

# Errors
thiserror = "1.0.49"
tracing = { version = "0.1.39", optional = true }
thiserror = "1.0.53"
tracing = { version = "0.1.40", optional = true }

async-channel = "2.1.1"
#async-mutex = "1.4.0"
futures = { version = "0.3.28", default-features = false, features = ["std", "async-await"] }
futures = { version = "0.3.30", default-features = false, features = ["std", "async-await"] }

# quic feature flag
# quinn = {version = "0.9.0", optional = true }

# tokio feature flag
tokio = { version = "1.33.0", features = ["macros", "io-util", "net", "time"], optional = true }
tokio = { version = "1.35.1", features = ["macros", "io-util", "net", "time"], optional = true }

# smol feature flag
smol = { version = "1.3.0", optional = true }
smol = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = {version="0.5.1", features=["async_tokio"]}

tracing-subscriber = {version = "0.3.16", features = ["env-filter"]}
tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}

smol = { version = "1.3.0" }
smol = { version = "2.0.0" }
tokio = { version = "1.33.0", features = ["rt-multi-thread", "rt", "macros", "sync", "io-util", "net", "time"] }

pretty_assertions = "1.3.0"

rustls = { version = "0.21.7" }
rustls-pemfile = { version = "1.0.3" }
webpki = { version = "0.22.4" }
Expand Down
13 changes: 2 additions & 11 deletions benches/bench_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ use criterion::criterion_main;
mod benchmarks;

criterion_main! {
benchmarks::tokio_concurrent::tokio,
// benchmarks::external_process::benches,
// benchmarks::iter_with_large_drop::benches,
// benchmarks::iter_with_large_setup::benches,
// benchmarks::iter_with_setup::benches,
// benchmarks::with_inputs::benches,
// benchmarks::special_characters::benches,
// benchmarks::measurement_overhead::benches,
// benchmarks::custom_measurement::benches,
// benchmarks::sampling_mode::benches,
// benchmarks::async_measurement_overhead::benches,
benchmarks::tokio::tokio_concurrent,
benchmarks::tokio::tokio_synchronous,
}
87 changes: 85 additions & 2 deletions benches/benchmarks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::{BufMut, Bytes, BytesMut};
use mqrstt::packets::{Disconnect, Packet, Publish};

pub mod tokio_concurrent;
pub mod tokio;

fn fill_stuff(buffer: &mut BytesMut, publ_count: usize, publ_size: usize) {
empty_connect(buffer);
Expand Down Expand Up @@ -65,8 +65,91 @@ fn very_large_publish(id: u16, repeat: usize) -> Packet {
topic: "BlaBla".into(),
packet_identifier: Some(id),
publish_properties: Default::default(),
payload: Bytes::from_iter([0u8, 1u8, 2, 3, 4].repeat(repeat)),
payload: Bytes::from_iter("ping".repeat(repeat).into_bytes()),
};

Packet::Publish(publ)
}


mod test_handlers{
use std::{sync::{atomic::AtomicU16, Arc}, ops::AddAssign, time::Duration};

use bytes::Bytes;
use mqrstt::{AsyncEventHandler, packets::{self, Packet}, MqttClient, AsyncEventHandlerMut};

pub struct PingPong {
pub client: MqttClient,
pub number: Arc<AtomicU16>,
}

impl PingPong{
pub fn new(client: MqttClient) -> Self {
Self {
client,
number: Arc::new(AtomicU16::new(0)),
}
}
}

impl AsyncEventHandler for PingPong {
async fn handle(&self, event: packets::Packet) -> () {
self.number.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
let max_len = payload.len().min(10);
let a = &payload[0..max_len];
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, Bytes::from_static(b"pong")).await.unwrap();
}
}
}
Packet::ConnAck(_) => (),
_ => (),
}
}
}

impl AsyncEventHandlerMut for PingPong {
async fn handle(&mut self, event: packets::Packet) -> () {
self.number.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
let max_len = payload.len().min(10);
let a = &payload[0..max_len];
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, Bytes::from_static(b"pong")).await.unwrap();
}
}
}
Packet::ConnAck(_) => (),
_ => (),
}
}
}

pub struct SimpleDelay{
delay: Duration,
}

impl SimpleDelay{
pub fn new(delay: Duration) -> Self{
Self {
delay,
}
}
}

impl AsyncEventHandler for SimpleDelay {
fn handle(&self, _: Packet) -> impl futures::prelude::Future<Output = ()> + Send + Sync {
tokio::time::sleep(self.delay)
}
}
impl AsyncEventHandlerMut for SimpleDelay{
fn handle(&mut self, _: Packet) -> impl futures::prelude::Future<Output = ()> + Send + Sync {
tokio::time::sleep(self.delay)
}
}
}
Loading

0 comments on commit 98fff6d

Please sign in to comment.