Skip to content

Commit

Permalink
add suback and unsuback test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Nov 28, 2024
1 parent fd71dfd commit 0a009ca
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 175 deletions.
2 changes: 1 addition & 1 deletion mqrstt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ mod tokio_lib_test {
network.connect(stream, &mut pingresp).await.unwrap();

let network_handle = tokio::task::spawn(async move {
let result = network.run(&mut pingresp).await;
let _result = network.run(&mut pingresp).await;
// check result and or restart the connection
pingresp
});
Expand Down
6 changes: 3 additions & 3 deletions mqrstt/src/packets/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ where
S: tokio::io::AsyncWrite + Unpin,
{
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
let reason_code_writen = self.reason_code.async_write(stream).await?;
let properties_writen = self.properties.async_write(stream).await?;
Ok(reason_code_writen + properties_writen)
let reason_code_written = self.reason_code.async_write(stream).await?;
let properties_written = self.properties.async_write(stream).await?;
Ok(reason_code_written + properties_written)
}
}

Expand Down
8 changes: 4 additions & 4 deletions mqrstt/src/packets/connack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ where
{
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
use crate::packets::mqtt_trait::MqttAsyncWrite;
let connack_flags_writen = self.connack_flags.async_write(stream).await?;
let reason_code_writen = self.reason_code.async_write(stream).await?;
let connack_properties_writen = self.connack_properties.async_write(stream).await?;
let connack_flags_written = self.connack_flags.async_write(stream).await?;
let reason_code_written = self.reason_code.async_write(stream).await?;
let connack_properties_written = self.connack_properties.async_write(stream).await?;

Ok(connack_flags_writen + reason_code_writen + connack_properties_writen)
Ok(connack_flags_written + reason_code_written + connack_properties_written)
}
}

Expand Down
16 changes: 8 additions & 8 deletions mqrstt/src/packets/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ where
use crate::packets::mqtt_trait::MqttAsyncWrite;
use tokio::io::AsyncWriteExt;
async move {
let mut total_writen_bytes = 6 // protocol header
let mut total_written_bytes = 6 // protocol header
+ 1 // protocol version
+ 1 // connect flags
+ 2; // keep alive
let protocol = [0x00, 0x04, b'M', b'Q', b'T', b'T'];
// We allready start with 6 as total writen bytes thus dont add anymore
// We allready start with 6 as total written bytes thus dont add anymore
stream.write_all(&protocol).await?;

self.protocol_version.async_write(stream).await?;
Expand All @@ -250,21 +250,21 @@ where

stream.write_u16(self.keep_alive).await?;

total_writen_bytes += self.connect_properties.async_write(stream).await?;
total_written_bytes += self.connect_properties.async_write(stream).await?;

total_writen_bytes += self.client_id.async_write(stream).await?;
total_written_bytes += self.client_id.async_write(stream).await?;

if let Some(last_will) = &self.last_will {
total_writen_bytes += last_will.async_write(stream).await?;
total_written_bytes += last_will.async_write(stream).await?;
}
if let Some(username) = &self.username {
total_writen_bytes += username.async_write(stream).await?;
total_written_bytes += username.async_write(stream).await?;
}
if let Some(password) = &self.password {
total_writen_bytes += password.async_write(stream).await?;
total_written_bytes += password.async_write(stream).await?;
}

Ok(total_writen_bytes)
Ok(total_written_bytes)
}
}
}
Expand Down
172 changes: 86 additions & 86 deletions mqrstt/src/packets/macros/properties_macros.rs

Large diffs are not rendered by default.

133 changes: 105 additions & 28 deletions mqrstt/src/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ impl Packet {
Packet::PubRel(_) => 0b0110_0010,
Packet::PubComp(_) => 0b0111_0000,
Packet::Subscribe(_) => 0b1000_0010,
Packet::SubAck(_) => {
unreachable!()
}
Packet::SubAck(_) => 0b1001_0000,
Packet::Unsubscribe(_) => 0b1010_0010,
Packet::UnsubAck(_) => 0b1011_0000,
Packet::PingReq => 0b1100_0000,
Expand Down Expand Up @@ -168,17 +166,20 @@ impl Packet {
p.wire_len().write_variable_integer(buf)?;
p.write(buf)?;
}
Packet::SubAck(_) => {
unreachable!()
Packet::SubAck(p) => {
buf.put_u8(0b1001_0000);
p.wire_len().write_variable_integer(buf)?;
p.write(buf)?;
}
Packet::Unsubscribe(p) => {
buf.put_u8(0b1010_0010);
p.wire_len().write_variable_integer(buf)?;
p.write(buf)?;
}
Packet::UnsubAck(_) => {
unreachable!();
// buf.put_u8(0b1011_0000);
Packet::UnsubAck(p) => {
buf.put_u8(0b1011_0000);
p.wire_len().write_variable_integer(buf)?;
p.write(buf)?;
}
Packet::PingReq => {
buf.put_u8(0b1100_0000);
Expand Down Expand Up @@ -259,17 +260,20 @@ impl Packet {
written += p.wire_len().write_async_variable_integer(stream).await?;
written += p.async_write(stream).await?;
}
Packet::SubAck(_) => {
unreachable!()
Packet::SubAck(p) => {
stream.write_u8(0b1001_0000).await?;
written += p.wire_len().write_async_variable_integer(stream).await?;
written += p.async_write(stream).await?;
}
Packet::Unsubscribe(p) => {
stream.write_u8(0b1010_0010).await?;
written += p.wire_len().write_async_variable_integer(stream).await?;
written += p.async_write(stream).await?;
}
Packet::UnsubAck(_) => {
unreachable!();
// stream.write_u8(0b1011_0000).await?;
Packet::UnsubAck(p) => {
stream.write_u8(0b1011_0000).await?;
written += p.wire_len().write_async_variable_integer(stream).await?;
written += p.async_write(stream).await?;
}
Packet::PingReq => {
stream.write_u8(0b1100_0000).await?;
Expand Down Expand Up @@ -396,6 +400,28 @@ impl Display for Packet {
}
}

impl WireLength for Packet {
fn wire_len(&self) -> usize {
match self {
Packet::Connect(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::ConnAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::Publish(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::PubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::PubRec(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::PubRel(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::PubComp(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::Subscribe(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::SubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::Unsubscribe(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::UnsubAck(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::PingReq => 2,
Packet::PingResp => 2,
Packet::Disconnect(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
Packet::Auth(p) => 1 + p.wire_len().variable_integer_len() + p.wire_len(),
}
}
}

/// 2.1.2 MQTT Control Packet type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub enum PacketType {
Expand Down Expand Up @@ -447,33 +473,47 @@ impl std::fmt::Display for PacketType {

#[cfg(test)]
mod tests {

use bytes::BytesMut;

use crate::packets::Packet;

use crate::tests::test_packets::*;

#[rstest::rstest]
#[case(ping_req_case().1)]
#[case(ping_resp_case().1)]
#[case(connack_case().1)]
#[case(create_subscribe_packet(1))]
#[case(create_subscribe_packet(65335))]
#[case(create_puback_packet(1))]
#[case(create_puback_packet(65335))]
#[case(create_disconnect_packet())]
#[case(create_connack_packet(true))]
#[case(create_connack_packet(false))]
#[case(publish_packet_1())]
#[case(publish_packet_2())]
#[case(publish_packet_3())]
#[case(publish_packet_4())]
#[case(create_empty_publish_packet())]
#[case::ping_req_case(ping_req_case().1)]
#[case::ping_resp_case(ping_resp_case().1)]
#[case::connack_case(connack_case().1)]
#[case::create_subscribe_packet(create_subscribe_packet(1))]
#[case::create_subscribe_packet(create_subscribe_packet(65335))]
#[case::create_puback_packet(create_puback_packet(1))]
#[case::create_puback_packet(create_puback_packet(65335))]
#[case::create_disconnect_packet(create_disconnect_packet())]
#[case::create_connack_packet(create_connack_packet(true))]
#[case::create_connack_packet(create_connack_packet(false))]
#[case::publish_packet_1(publish_packet_1())]
#[case::publish_packet_2(publish_packet_2())]
#[case::publish_packet_3(publish_packet_3())]
#[case::publish_packet_4(publish_packet_4())]
#[case::create_empty_publish_packet(create_empty_publish_packet())]
#[case::subscribe(subscribe_case())]
#[case::suback(suback_case())]
#[case::unsubscribe(unsubscribe_case())]
#[case::unsuback(unsuback_case())]
fn test_write_read_write_read_cases(#[case] packet: Packet) {
use crate::packets::WireLength;

let mut buffer = BytesMut::new();

packet.write(&mut buffer).unwrap();

let wire_len = packet.wire_len();
assert_eq!(wire_len, buffer.len());

let res1 = Packet::read(&mut buffer).unwrap();

assert_eq!(packet, res1);

let mut buffer = BytesMut::new();
res1.write(&mut buffer).unwrap();
let res2 = Packet::read(&mut buffer).unwrap();
Expand Down Expand Up @@ -533,6 +573,43 @@ mod tests {
assert_eq!(out, input)
}

#[rstest::rstest]
#[case::ping_req_case(ping_req_case().1)]
#[case::ping_resp_case(ping_resp_case().1)]
#[case::connack_case(connack_case().1)]
#[case::create_subscribe_packet(create_subscribe_packet(1))]
#[case::create_subscribe_packet(create_subscribe_packet(65335))]
#[case::create_puback_packet(create_puback_packet(1))]
#[case::create_puback_packet(create_puback_packet(65335))]
#[case::create_disconnect_packet(create_disconnect_packet())]
#[case::create_connack_packet(create_connack_packet(true))]
#[case::create_connack_packet(create_connack_packet(false))]
#[case::publish_packet_1(publish_packet_1())]
#[case::publish_packet_2(publish_packet_2())]
#[case::publish_packet_3(publish_packet_3())]
#[case::publish_packet_4(publish_packet_4())]
#[case::create_empty_publish_packet(create_empty_publish_packet())]
#[case::subscribe(subscribe_case())]
#[case::suback(suback_case())]
#[case::unsubscribe(unsubscribe_case())]
#[case::unsuback(unsuback_case())]
#[tokio::test]
async fn test_async_write_read_write_read_cases(#[case] packet: Packet) {
use crate::packets::WireLength;

let mut buffer = Vec::with_capacity(1000);
packet.async_write(&mut buffer).await.unwrap();

let wire_len = packet.wire_len();
assert_eq!(wire_len, buffer.len());

let mut buf = buffer.as_slice();

let res1 = Packet::async_read(&mut buf).await.unwrap();

assert_eq!(packet, res1);
}

// #[rstest::rstest]
// #[case(&[59, 1, 0, 59])]
// #[case(&[16, 14, 0, 4, 77, 81, 84, 84, 5, 247, 247, 252, 1, 17, 247, 247, 247])]
Expand Down
12 changes: 6 additions & 6 deletions mqrstt/src/packets/pubcomp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,18 @@ where
{
async fn async_write(&self, stream: &mut S) -> Result<usize, crate::packets::error::WriteError> {
use crate::packets::mqtt_trait::MqttAsyncWrite;
let mut total_writen_bytes = 2;
let mut total_written_bytes = 2;
self.packet_identifier.async_write(stream).await?;

if self.reason_code == PubCompReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
return Ok(total_writen_bytes);
return Ok(total_written_bytes);
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
} else {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_writen_bytes += self.properties.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.properties.async_write(stream).await?;
}
Ok(total_writen_bytes)
Ok(total_written_bytes)
}
}

Expand Down
12 changes: 6 additions & 6 deletions mqrstt/src/packets/pubrec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ where
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
use crate::packets::mqtt_trait::MqttAsyncWrite;
async move {
let mut total_writen_bytes = 2;
let mut total_written_bytes = 2;
self.packet_identifier.async_write(stream).await?;

if self.reason_code == PubRecReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
return Ok(total_writen_bytes);
return Ok(total_written_bytes);
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
} else {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_writen_bytes += self.properties.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.properties.async_write(stream).await?;
}
Ok(total_writen_bytes)
Ok(total_written_bytes)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions mqrstt/src/packets/pubrel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,18 @@ where
fn async_write(&self, stream: &mut S) -> impl std::future::Future<Output = Result<usize, crate::packets::error::WriteError>> {
use crate::packets::mqtt_trait::MqttAsyncWrite;
async move {
let mut total_writen_bytes = 2;
let mut total_written_bytes = 2;
self.packet_identifier.async_write(stream).await?;

if self.reason_code == PubRelReasonCode::Success && self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
return Ok(total_writen_bytes);
return Ok(total_written_bytes);
} else if self.properties.reason_string.is_none() && self.properties.user_properties.is_empty() {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
} else {
total_writen_bytes += self.reason_code.async_write(stream).await?;
total_writen_bytes += self.properties.async_write(stream).await?;
total_written_bytes += self.reason_code.async_write(stream).await?;
total_written_bytes += self.properties.async_write(stream).await?;
}
Ok(total_writen_bytes)
Ok(total_written_bytes)
}
}
}
Expand Down
Loading

0 comments on commit 0a009ca

Please sign in to comment.