Skip to content

Commit

Permalink
Massive progress on read and write concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Dec 24, 2023
1 parent 6bad4e1 commit ba989a8
Show file tree
Hide file tree
Showing 11 changed files with 767 additions and 482 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ description = "Pure rust MQTTv5 client implementation for sync and async (Smol &
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["tokio", "smol", "sync"]
default = ["tokio"]
tokio = ["dep:tokio"]
smol = ["dep:smol"]
sync = []
Expand All @@ -29,7 +29,7 @@ bytes = "1.5.0"
thiserror = "1.0.49"
tracing = { version = "0.1.39", optional = true }

async-channel = "1.9.0"
async-channel = "2.1.1"
#async-mutex = "1.4.0"
futures = { version = "0.3.28", default-features = false, features = ["std", "async-await"] }

Expand Down
34 changes: 17 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
#[derive(Debug, Clone)]
pub struct MqttClient {
/// Provides this client with an available packet id or waits on it.
available_packet_ids: Receiver<u16>,
available_packet_ids_r: Receiver<u16>,

/// Sends Publish, Subscribe, Unsubscribe to the event handler to handle later.
to_network_s: Sender<Packet>,
Expand All @@ -27,11 +27,11 @@ pub struct MqttClient {
}

impl MqttClient {
pub fn new(available_packet_ids: Receiver<u16>, to_network_s: Sender<Packet>, max_packet_size: Option<u32>) -> Self {
pub fn new(available_packet_ids_r: Receiver<u16>, to_network_s: Sender<Packet>, max_packet_size: usize) -> Self {
Self {
available_packet_ids,
available_packet_ids_r,
to_network_s,
max_packet_size: max_packet_size.unwrap_or(DEFAULT_MAX_PACKET_SIZE) as usize,
max_packet_size
}
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ impl MqttClient {
/// # });
/// ```
pub async fn subscribe<A: Into<Subscription>>(&self, into_subscribtions: A) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let subscription: Subscription = into_subscribtions.into();
let sub = Subscribe::new(pkid, subscription.0);

Expand Down Expand Up @@ -144,7 +144,7 @@ impl MqttClient {
/// # });
/// ```
pub async fn subscribe_with_properties<S: Into<Subscription>>(&self, into_sub: S, properties: SubscribeProperties) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let sub = Subscribe {
packet_identifier: pkid,
properties,
Expand Down Expand Up @@ -190,7 +190,7 @@ impl MqttClient {
pub async fn publish<T: Into<String>, P: Into<Bytes>>(&self, topic: T, qos: QoS, retain: bool, payload: P) -> Result<(), ClientError> {
let pkid = match qos {
QoS::AtMostOnce => None,
_ => Some(self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?),
_ => Some(self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?),
};
#[cfg(feature = "logs")]
info!("Published message with ID: {:?}", pkid);
Expand Down Expand Up @@ -276,7 +276,7 @@ impl MqttClient {
pub async fn publish_with_properties<T: Into<String>, P: Into<Bytes>>(&self, topic: T, qos: QoS, retain: bool, payload: P, properties: PublishProperties) -> Result<(), ClientError> {
let pkid = match qos {
QoS::AtMostOnce => None,
_ => Some(self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?),
_ => Some(self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?),
};
let publish = Publish {
dup: false,
Expand Down Expand Up @@ -327,7 +327,7 @@ impl MqttClient {
/// # });
/// ```
pub async fn unsubscribe<T: Into<UnsubscribeTopics>>(&self, into_topics: T) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let unsub = Unsubscribe {
packet_identifier: pkid,
properties: UnsubscribeProperties::default(),
Expand Down Expand Up @@ -396,7 +396,7 @@ impl MqttClient {
/// # });
/// ```
pub async fn unsubscribe_with_properties<T: Into<UnsubscribeTopics>>(&self, into_topics: T, properties: UnsubscribeProperties) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv().await.map_err(|_| ClientError::NoNetworkChannel)?;
let unsub = Unsubscribe {
packet_identifier: pkid,
properties,
Expand Down Expand Up @@ -509,7 +509,7 @@ impl MqttClient {
/// # });
/// ```
pub fn subscribe_blocking<A: Into<Subscription>>(&self, into_subscribtions: A) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let subscription: Subscription = into_subscribtions.into();
let sub = Subscribe::new(pkid, subscription.0);

Expand Down Expand Up @@ -578,7 +578,7 @@ impl MqttClient {
/// mqtt_client.subscribe_with_properties_blocking(("final/test/topic", sub_options), sub_properties);
/// ```
pub fn subscribe_with_properties_blocking<S: Into<Subscription>>(&self, into_subscribtions: S, properties: SubscribeProperties) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let sub = Subscribe {
packet_identifier: pkid,
properties,
Expand Down Expand Up @@ -624,7 +624,7 @@ impl MqttClient {
pub fn publish_blocking<T: Into<String>, P: Into<Bytes>>(&self, topic: T, qos: QoS, retain: bool, payload: P) -> Result<(), ClientError> {
let pkid = match qos {
QoS::AtMostOnce => None,
_ => Some(self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?),
_ => Some(self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?),
};
#[cfg(feature = "logs")]
info!("Published message with ID: {:?}", pkid);
Expand Down Expand Up @@ -710,7 +710,7 @@ impl MqttClient {
pub fn publish_with_properties_blocking<T: Into<String>, P: Into<Bytes>>(&self, topic: T, qos: QoS, retain: bool, payload: P, properties: PublishProperties) -> Result<(), ClientError> {
let pkid = match qos {
QoS::AtMostOnce => None,
_ => Some(self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?),
_ => Some(self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?),
};
let publish = Publish {
dup: false,
Expand Down Expand Up @@ -760,7 +760,7 @@ impl MqttClient {
///
/// ```
pub fn unsubscribe_blocking<T: Into<UnsubscribeTopics>>(&self, into_topics: T) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let unsub = Unsubscribe {
packet_identifier: pkid,
properties: UnsubscribeProperties::default(),
Expand Down Expand Up @@ -827,7 +827,7 @@ impl MqttClient {
/// mqtt_client.unsubscribe_with_properties_blocking(topics.as_slice(), properties);
/// ```
pub fn unsubscribe_with_properties_blocking<T: Into<UnsubscribeTopics>>(&self, into_topics: T, properties: UnsubscribeProperties) -> Result<(), ClientError> {
let pkid = self.available_packet_ids.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let pkid = self.available_packet_ids_r.recv_blocking().map_err(|_| ClientError::NoNetworkChannel)?;
let unsub = Unsubscribe {
packet_identifier: pkid,
properties,
Expand Down Expand Up @@ -913,7 +913,7 @@ mod tests {
let (client_to_handler_s, client_to_handler_r) = async_channel::bounded(100);
let (_, to_network_r) = async_channel::bounded(100);

let client = MqttClient::new(r, client_to_handler_s, Some(500000));
let client = MqttClient::new(r, client_to_handler_s, 500000);

(client, client_to_handler_r, to_network_r)
}
Expand Down
112 changes: 92 additions & 20 deletions src/connect_options.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,57 @@
use std::{cell::OnceCell, time::Duration};

use bytes::Bytes;

use crate::packets::LastWill;
use crate::{packets::{LastWill, ConnectProperties}, util::constants::DEFAULT_MAX_PACKET_SIZE};
#[cfg(any(feature = "smol-rustls", feature = "tokio-rustls"))]
use crate::stream::transport::TlsConfig;
use crate::util::constants::DEFAULT_RECEIVE_MAXIMUM;

#[derive(Debug, Clone)]
pub struct ConnectOptions {
/// keep alive time to send pingreq to broker when the connection is idle
pub keep_alive_interval_s: u64,
/// clean (or) persistent session
pub clean_start: bool,
pub(crate) keep_alive_interval: Duration,
/// clean or persistent session indicator
pub(crate) clean_start: bool,
/// client identifier
pub client_id: String,
client_id: String,
/// username and password
pub username: Option<String>,
pub password: Option<String>,
username: Option<String>,
password: Option<String>,

// MQTT v5 Connect Properties:
pub session_expiry_interval: Option<u32>,
pub receive_maximum: Option<u16>,
pub maximum_packet_size: Option<u32>,
pub topic_alias_maximum: Option<u16>,
pub request_response_information: Option<u8>,
pub request_problem_information: Option<u8>,
pub user_properties: Vec<(String, String)>,
pub authentication_method: Option<String>,
pub authentication_data: Bytes,
session_expiry_interval: Option<u32>,

/// The maximum number of packets that will be inflight from the broker to this client.
receive_maximum: Option<u16>,

/// The maximum number of packets that can be inflight from this client to the broker.
send_maximum: Option<u16>,

maximum_packet_size: Option<u32>,
topic_alias_maximum: Option<u16>,
request_response_information: Option<u8>,
request_problem_information: Option<u8>,
user_properties: Vec<(String, String)>,
authentication_method: Option<String>,
authentication_data: Bytes,

/// Last will that will be issued on unexpected disconnect
pub last_will: Option<LastWill>,
last_will: Option<LastWill>,
}

impl ConnectOptions {
pub fn new(client_id: String) -> Self {
pub fn new(client_id: String, clean_start: bool) -> Self {
Self {
keep_alive_interval_s: 60,
clean_start: false,
keep_alive_interval: Duration::from_secs(60),
clean_start: clean_start,
client_id,
username: None,
password: None,

session_expiry_interval: None,
receive_maximum: None,
send_maximum: None,
maximum_packet_size: None,
topic_alias_maximum: None,
request_response_information: None,
Expand All @@ -54,7 +63,70 @@ impl ConnectOptions {
}
}

pub(crate) fn create_connect_from_options(&self) -> crate::packets::Packet {
let connect_properties = ConnectProperties {
session_expiry_interval: self.session_expiry_interval,
receive_maximum: self.receive_maximum,
maximum_packet_size: self.maximum_packet_size,
topic_alias_maximum: self.topic_alias_maximum,
request_response_information: self.request_response_information,
request_problem_information: self.request_response_information,
user_properties: self.user_properties.clone(),
authentication_method: self.authentication_method.clone(),
authentication_data: self.authentication_data.clone(),
};

let connect = crate::packets::Connect {
client_id: self.client_id.clone(),
clean_start: self.clean_start,
keep_alive: self.keep_alive_interval.as_secs() as u16,
username: self.username.clone(),
password: self.password.clone(),
connect_properties,
protocol_version: crate::packets::ProtocolVersion::V5,
last_will: self.last_will.clone(),
};

crate::packets::Packet::Connect(connect)
}

pub fn set_keep_alive_interval(&mut self, keep_alive_interval: Duration) {
self.keep_alive_interval = keep_alive_interval;
}
pub fn get_keep_alive_interval(&self) -> Duration {
self.keep_alive_interval
}

pub fn get_clean_start(&self) -> bool {
self.clean_start
}

pub fn get_client_id(&self) -> &str {
&self.client_id
}

pub fn set_last_will(&mut self, last_will: LastWill) {
self.last_will = Some(last_will);
}
pub fn get_last_will(&self) -> Option<&LastWill> {
self.last_will.as_ref()
}

pub fn set_receive_maximum(&mut self, receive_maximum: u16) {
self.receive_maximum = Some(receive_maximum)
}
pub fn receive_maximum(&self) -> u16 {
self.receive_maximum.unwrap_or(DEFAULT_RECEIVE_MAXIMUM)
}

pub fn maximum_packet_size(&self) -> usize {
self.maximum_packet_size.unwrap_or(DEFAULT_MAX_PACKET_SIZE) as usize
}

pub fn set_send_maximum(&mut self, send_maximum: u16) {
self.send_maximum = Some(send_maximum)
}
pub fn send_maximum(&self) -> u16 {
self.send_maximum.unwrap_or(DEFAULT_RECEIVE_MAXIMUM)
}
}
Loading

0 comments on commit ba989a8

Please sign in to comment.