From fa9ad2efb12db41c867c4b7423b961bf4fbeb1eb Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Tue, 3 Dec 2024 10:04:10 +0100 Subject: [PATCH] chore: remove unnecessary MQTT trait impls --- mqrstt/src/packets/connect/mod.rs | 6 +- .../src/packets/mqtt_trait/primitive_impl.rs | 164 ++++++------------ 2 files changed, 58 insertions(+), 112 deletions(-) diff --git a/mqrstt/src/packets/connect/mod.rs b/mqrstt/src/packets/connect/mod.rs index 8dfbd2b..12aa620 100644 --- a/mqrstt/src/packets/connect/mod.rs +++ b/mqrstt/src/packets/connect/mod.rs @@ -71,8 +71,10 @@ impl Default for Connect { impl PacketRead for Connect { fn read(_: u8, _: usize, mut buf: Bytes) -> Result { - if String::read(&mut buf)? != "MQTT" { - return Err(DeserializeError::MalformedPacketWithInfo("Protocol not MQTT".to_string())); + let expected_protocol = [b'M', b'Q', b'T', b'T']; + let received_protocol = Vec::::read(&mut buf)?; + if &received_protocol != &expected_protocol { + return Err(DeserializeError::MalformedPacketWithInfo("Protocol not MQTT".to_owned())); } let protocol_version = ProtocolVersion::read(&mut buf)?; diff --git a/mqrstt/src/packets/mqtt_trait/primitive_impl.rs b/mqrstt/src/packets/mqtt_trait/primitive_impl.rs index b0612cb..819ee31 100644 --- a/mqrstt/src/packets/mqtt_trait/primitive_impl.rs +++ b/mqrstt/src/packets/mqtt_trait/primitive_impl.rs @@ -10,10 +10,10 @@ use super::MqttAsyncWrite; impl MqttRead for Box { #[inline] fn read(buf: &mut Bytes) -> Result { - let content = Bytes::read(buf)?; + let content = Vec::::read(buf)?; - match String::from_utf8(content.to_vec()) { - Ok(s) => Ok(s.into()), + match String::from_utf8(content) { + Ok(s) => Ok(s.into_boxed_str()), Err(e) => Err(DeserializeError::Utf8Error(e)), } } @@ -86,117 +86,61 @@ impl WireLength for &str { } } -impl MqttRead for String { - #[inline] - fn read(buf: &mut Bytes) -> Result { - let content = Bytes::read(buf)?; +// impl MqttRead for Bytes { +// #[inline] +// fn read(buf: &mut Bytes) -> Result { +// if buf.len() < 2 { +// return Err(DeserializeError::InsufficientData(std::any::type_name::(), buf.len(), 2)); +// } +// let len = buf.get_u16() as usize; - match String::from_utf8(content.to_vec()) { - Ok(s) => Ok(s), - Err(e) => Err(DeserializeError::Utf8Error(e)), - } - } -} +// if len > buf.len() { +// return Err(DeserializeError::InsufficientData(std::any::type_name::(), buf.len(), len)); +// } -impl MqttAsyncRead for String -where - T: tokio::io::AsyncReadExt + std::marker::Unpin, -{ - async fn async_read(buf: &mut T) -> Result<(Self, usize), ReadError> { - let (content, read_bytes) = Bytes::async_read(buf).await?; - match String::from_utf8(content.to_vec()) { - Ok(s) => Ok((s, read_bytes)), - Err(e) => Err(ReadError::DeserializeError(DeserializeError::Utf8Error(e))), - } - } -} +// Ok(buf.split_to(len)) +// } +// } +// impl MqttAsyncRead for Bytes +// where +// S: tokio::io::AsyncReadExt + std::marker::Unpin, +// { +// async fn async_read(stream: &mut S) -> Result<(Self, usize), ReadError> { +// let size = stream.read_u16().await? as usize; +// // let mut data = BytesMut::with_capacity(size); +// let mut data = Vec::with_capacity(size); +// let read_bytes = stream.read_exact(&mut data).await?; +// assert_eq!(size, read_bytes); +// Ok((data.into(), 2 + size)) +// } +// } +// impl MqttWrite for Bytes { +// #[inline] +// fn write(&self, buf: &mut BytesMut) -> Result<(), SerializeError> { +// buf.put_u16(self.len() as u16); +// buf.extend(self); -impl MqttWrite for String { - #[inline] - fn write(&self, buf: &mut BytesMut) -> Result<(), SerializeError> { - if self.len() > 65535 { - return Err(SerializeError::StringTooLong(self.len())); - } +// Ok(()) +// } +// } +// impl MqttAsyncWrite for Bytes +// where +// S: tokio::io::AsyncWrite + Unpin, +// { +// async fn async_write(&self, stream: &mut S) -> Result { +// let size = (self.len() as u16).to_be_bytes(); +// stream.write_all(&size).await?; +// stream.write_all(self.as_ref()).await?; +// Ok(2 + self.len()) +// } +// } - buf.put_u16(self.len() as u16); - buf.extend(self.as_bytes()); - Ok(()) - } -} -impl MqttAsyncWrite for String -where - S: tokio::io::AsyncWrite + Unpin, -{ - async fn async_write(&self, stream: &mut S) -> Result { - let size = (self.len() as u16).to_be_bytes(); - stream.write_all(&size).await?; - stream.write_all(self.as_bytes()).await?; - Ok(2 + self.len()) - } -} - -impl WireLength for String { - #[inline(always)] - fn wire_len(&self) -> usize { - self.len() + 2 - } -} - -impl MqttRead for Bytes { - #[inline] - fn read(buf: &mut Bytes) -> Result { - if buf.len() < 2 { - return Err(DeserializeError::InsufficientData(std::any::type_name::(), buf.len(), 2)); - } - let len = buf.get_u16() as usize; - - if len > buf.len() { - return Err(DeserializeError::InsufficientData(std::any::type_name::(), buf.len(), len)); - } - - Ok(buf.split_to(len)) - } -} -impl MqttAsyncRead for Bytes -where - S: tokio::io::AsyncReadExt + std::marker::Unpin, -{ - async fn async_read(stream: &mut S) -> Result<(Self, usize), ReadError> { - let size = stream.read_u16().await? as usize; - // let mut data = BytesMut::with_capacity(size); - let mut data = Vec::with_capacity(size); - let read_bytes = stream.read_exact(&mut data).await?; - assert_eq!(size, read_bytes); - Ok((data.into(), 2 + size)) - } -} -impl MqttWrite for Bytes { - #[inline] - fn write(&self, buf: &mut BytesMut) -> Result<(), SerializeError> { - buf.put_u16(self.len() as u16); - buf.extend(self); - - Ok(()) - } -} -impl MqttAsyncWrite for Bytes -where - S: tokio::io::AsyncWrite + Unpin, -{ - async fn async_write(&self, stream: &mut S) -> Result { - let size = (self.len() as u16).to_be_bytes(); - stream.write_all(&size).await?; - stream.write_all(self.as_ref()).await?; - Ok(2 + self.len()) - } -} - -impl WireLength for Bytes { - #[inline(always)] - fn wire_len(&self) -> usize { - self.len() + 2 - } -} +// impl WireLength for Bytes { +// #[inline(always)] +// fn wire_len(&self) -> usize { +// self.len() + 2 +// } +// } impl MqttRead for Vec { #[inline]