Skip to content

Commit

Permalink
linter
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 28, 2024
1 parent 4fc4924 commit 1040da6
Show file tree
Hide file tree
Showing 27 changed files with 463 additions and 563 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
# run clippy to verify we have no warnings
- run: cargo fetch
- name: cargo clippy
run: cargo clippy --all-targets --all-features -- -D warnings
run: cargo clippy -p mqrstt

test:
name: Test
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct PingPong {
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
async fn handle(&mut self, event: packets::Packet {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
Expand Down Expand Up @@ -132,7 +132,7 @@ pub struct PingPong {
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
Expand Down Expand Up @@ -212,7 +212,7 @@ pub struct PingPong {

impl EventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
fn handle(&mut self, event: packets::Packet) -> () {
fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
Expand Down
10 changes: 8 additions & 2 deletions mqrstt/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub enum HandlerError {
#[error("The incoming channel between network and handler is closed")]
IncomingNetworkChannelClosed,

#[error("The outgoing channel between handler and network is closed: {0}")]
OutgoingNetworkChannelClosed(#[from] SendError<Packet>),
#[error("The outgoing channel between handler and network is closed")]
OutgoingNetworkChannelClosed,

#[error("Channel between client and handler closed")]
ClientChannelClosed,
Expand All @@ -88,6 +88,12 @@ pub enum HandlerError {
UnexpectedPacket(PacketType),
}

impl From<SendError<Packet>> for HandlerError {
fn from(_: SendError<Packet>) -> Self {
HandlerError::OutgoingNetworkChannelClosed
}
}

/// Errors producable by the [`crate::MqttClient`]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum ClientError {
Expand Down
8 changes: 3 additions & 5 deletions mqrstt/src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ pub trait AsyncEventHandler {

/// This is a simple no operation handler.
impl AsyncEventHandler for () {
fn handle(&mut self, _: Packet) -> impl Future<Output = ()> + Send + Sync {
async {}
}
async fn handle(&mut self, _: Packet) {}
}

pub trait EventHandler {
Expand Down Expand Up @@ -60,7 +58,7 @@ pub mod example_handlers {
}

impl AsyncEventHandler for PingResp {
async fn handle(&mut self, event: packets::Packet) -> () {
async fn handle(&mut self, event: packets::Packet) {
use Packet::*;
if event == PingResp {
self.ping_resp_received += 1;
Expand Down Expand Up @@ -91,7 +89,7 @@ pub mod example_handlers {
}

impl AsyncEventHandler for PingPong {
async fn handle(&mut self, event: packets::Packet) -> () {
async fn handle(&mut self, event: packets::Packet) {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
Expand Down
7 changes: 4 additions & 3 deletions mqrstt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub mod smol;
/// Contains the reader and writer parts for the tokio runtime.
///
/// Module [`crate::tokio`] contains both a synchronized and concurrent approach to call the users `Handler`.
#[cfg(any(feature = "tokio"))]
#[cfg(feature = "tokio")]
pub mod tokio;

/// Error types that the user can see during operation of the client.
Expand Down Expand Up @@ -342,7 +342,7 @@ mod smol_lib_test {
});
}

#[cfg(all(target_family = "windows"))]
#[cfg(target_family = "windows")]
#[test]
fn test_close_write_tcp_stream_smol() {
use crate::error::ConnectionError;
Expand Down Expand Up @@ -409,7 +409,8 @@ mod tokio_lib_test {
network.connect(stream, &mut pingresp).await.unwrap();

let network_handle = tokio::task::spawn(async move {
network.run(&mut pingresp).await;
let result = network.run(&mut pingresp).await;
// check result and or restart the connection
pingresp
});

Expand Down
11 changes: 4 additions & 7 deletions mqrstt/src/packets/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
mod properties;
use std::future::Future;

pub use properties::AuthProperties;
mod reason_code;
Expand Down Expand Up @@ -48,12 +47,10 @@ impl<S> crate::packets::mqtt_trait::PacketAsyncWrite<S> for Auth
where
S: tokio::io::AsyncWrite + Unpin,
{
fn async_write(&self, stream: &mut S) -> impl Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
let reason_code_writen = self.reason_code.async_write(stream).await?;
let properties_writen = self.properties.async_write(stream).await?;
Ok(reason_code_writen + properties_writen)
}
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
let reason_code_writen = self.reason_code.async_write(stream).await?;
let properties_writen = self.properties.async_write(stream).await?;
Ok(reason_code_writen + properties_writen)
}
}

Expand Down
74 changes: 33 additions & 41 deletions mqrstt/src/packets/connack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,19 @@ impl<S> PacketAsyncRead<S> for ConnAck
where
S: tokio::io::AsyncRead + Unpin,
{
fn async_read(_: u8, _: usize, stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), super::error::ReadError>> {
async move {
let (connack_flags, read_bytes) = ConnAckFlags::async_read(stream).await?;
let (reason_code, reason_code_read_bytes) = ConnAckReasonCode::async_read(stream).await?;
let (connack_properties, connack_properties_read_bytes) = ConnAckProperties::async_read(stream).await?;

Ok((
Self {
connack_flags,
reason_code,
connack_properties,
},
read_bytes + reason_code_read_bytes + connack_properties_read_bytes,
))
}
async fn async_read(_: u8, _: usize, stream: &mut S) -> Result<(Self, usize), super::error::ReadError> {
let (connack_flags, read_bytes) = ConnAckFlags::async_read(stream).await?;
let (reason_code, reason_code_read_bytes) = ConnAckReasonCode::async_read(stream).await?;
let (connack_properties, connack_properties_read_bytes) = ConnAckProperties::async_read(stream).await?;

Ok((
Self {
connack_flags,
reason_code,
connack_properties,
},
read_bytes + reason_code_read_bytes + connack_properties_read_bytes,
))
}
}

Expand All @@ -85,15 +83,13 @@ impl<S> crate::packets::mqtt_trait::PacketAsyncWrite<S> for ConnAck
where
S: tokio::io::AsyncWrite + Unpin,
{
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
use crate::packets::mqtt_trait::MqttAsyncWrite;
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
let reason_code_writen = self.reason_code.async_write(stream).await?;
let connack_properties_writen = self.connack_properties.async_write(stream).await?;

Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
}
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
use crate::packets::mqtt_trait::MqttAsyncWrite;
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
let reason_code_writen = self.reason_code.async_write(stream).await?;
let connack_properties_writen = self.connack_properties.async_write(stream).await?;

Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
}
}

Expand All @@ -114,16 +110,14 @@ impl<S> MqttAsyncRead<S> for ConnAckFlags
where
S: tokio::io::AsyncRead + Unpin,
{
fn async_read(stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), super::error::ReadError>> {
async move {
let byte = stream.read_u8().await?;
Ok((
Self {
session_present: (byte & 0b00000001) == 0b00000001,
},
1,
))
}
async fn async_read(stream: &mut S) -> Result<(Self, usize), super::error::ReadError> {
let byte = stream.read_u8().await?;
Ok((
Self {
session_present: (byte & 0b00000001) == 0b00000001,
},
1,
))
}
}

Expand Down Expand Up @@ -154,14 +148,12 @@ impl<S> crate::packets::mqtt_trait::MqttAsyncWrite<S> for ConnAckFlags
where
S: tokio::io::AsyncWrite + Unpin,
{
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
use tokio::io::AsyncWriteExt;
let byte = self.session_present as u8;
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
use tokio::io::AsyncWriteExt;
let byte = self.session_present as u8;

stream.write_u8(byte).await?;
Ok(1)
}
stream.write_u8(byte).await?;
Ok(1)
}
}

Expand Down
20 changes: 8 additions & 12 deletions mqrstt/src/packets/connect/connect_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ impl<S> MqttAsyncRead<S> for ConnectFlags
where
S: tokio::io::AsyncRead + Unpin,
{
fn async_read(stream: &mut S) -> impl std::future::Future<Output = Result<(Self, usize), crate::packets::error::ReadError>> {
async move {
let byte = stream.read_u8().await?;
Ok((ConnectFlags::from_u8(byte)?, 1))
}
async fn async_read(stream: &mut S) -> Result<(Self, usize), crate::packets::error::ReadError> {
let byte = stream.read_u8().await?;
Ok((ConnectFlags::from_u8(byte)?, 1))
}
}

Expand All @@ -96,13 +94,11 @@ impl<S> MqttAsyncWrite<S> for ConnectFlags
where
S: tokio::io::AsyncWrite + Unpin,
{
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
use tokio::io::AsyncWriteExt;
let byte = self.into_u8()?;
stream.write_u8(byte).await?;
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
use tokio::io::AsyncWriteExt;
let byte = self.into_u8()?;
stream.write_u8(byte).await?;

Ok(1)
}
Ok(1)
}
}
12 changes: 5 additions & 7 deletions mqrstt/src/packets/connect/last_will.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@ impl<S> MqttAsyncWrite<S> for LastWill
where
S: tokio::io::AsyncWrite + Unpin,
{
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
let properties_written = self.last_will_properties.async_write(stream).await?;
let topic_written = self.topic.async_write(stream).await?;
let payload_written = self.payload.async_write(stream).await?;
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
let properties_written = self.last_will_properties.async_write(stream).await?;
let topic_written = self.topic.async_write(stream).await?;
let payload_written = self.payload.async_write(stream).await?;

Ok(properties_written + topic_written + payload_written)
}
Ok(properties_written + topic_written + payload_written)
}
}

Expand Down
40 changes: 7 additions & 33 deletions mqrstt/src/packets/macros/properties_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,14 @@ macro_rules! define_properties {
}

impl<S> $crate::packets::mqtt_trait::MqttAsyncWrite<S> for $name where S: tokio::io::AsyncWrite + Unpin {
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
async move {
let mut bytes_writen = 0;
$crate::packets::VariableInteger::write_async_variable_integer(&self.wire_len(), stream).await?;
$(
$crate::packets::macros::properties_write!(self, bytes_writen, stream, PropertyType::$prop_variant);
)*

Ok(bytes_writen)
}

// let (len, length_variable_integer) = <usize as crate::packets::primitive::VariableInteger>::read_async_variable_integer(stream).await?;
// if len == 0 {
// return Ok((Self::default(), length_variable_integer));
// }

// let mut properties = $name::default();

// let mut read_property_bytes = 0;
// loop {
// let (prop, read_bytes) = crate::packets::PropertyType::async_read(stream).await?;
// read_property_bytes += read_bytes;
// match prop {
// $(
// $crate::packets::macros::properties_read_match_branch_name!($prop_variant) => $crate::packets::macros::properties_read_match_branch_body!(stream, properties, read_property_bytes, PropertyType::$prop_variant),
// )*
// e => return Err($crate::packets::error::ReadError::DeserializeError(DeserializeError::UnexpectedProperty(e, PacketType::PubRel))),
// }
// if read_property_bytes == len {
// break;
// }
// }
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
let mut bytes_writen = 0;
$crate::packets::VariableInteger::write_async_variable_integer(&self.wire_len(), stream).await?;
$(
$crate::packets::macros::properties_write!(self, bytes_writen, stream, PropertyType::$prop_variant);
)*

// Ok((properties, length_variable_integer + read_property_bytes))
Ok(bytes_writen)
}
}

Expand Down
2 changes: 1 addition & 1 deletion mqrstt/src/packets/macros/reason_code_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ macro_rules! reason_code {
} -> ())
}

pub(crate) fn to_u8(&self) -> u8 {
pub(crate) fn to_u8(self) -> u8 {
$crate::packets::macros::reason_code_match_write!(@ $name, self, {
$($code,)*
} -> ())
Expand Down
2 changes: 1 addition & 1 deletion mqrstt/src/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl Packet {
#[cfg(feature = "logs")]
tracing::trace!("Read packet header: {:?}", header);

Ok(Packet::async_read_packet(header, stream).await?)
Packet::async_read_packet(header, stream).await
}

pub fn read(buffer: &mut BytesMut) -> Result<Packet, error::ReadBytes<DeserializeError>> {
Expand Down
2 changes: 1 addition & 1 deletion mqrstt/src/packets/mqtt_trait/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait MqttWrite: Sized {
fn write(&self, buf: &mut BytesMut) -> Result<(), crate::packets::error::SerializeError>;
}

impl<'a, T> MqttWrite for &'a T
impl<T> MqttWrite for &T
where
T: MqttWrite,
{
Expand Down
Loading

0 comments on commit 1040da6

Please sign in to comment.