Skip to content

Commit

Permalink
0.3.0-alpha.3, made HandlerExt public
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Jan 2, 2024
1 parent c9c01f0 commit d5eb571
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqrstt"
version = "0.3.0-alpha.2"
version = "0.3.0-alpha.3"
homepage = "https://github.com/GunnarMorrigan/mqrstt"
repository = "https://github.com/GunnarMorrigan/mqrstt"
documentation = "https://docs.rs/mqrstt"
Expand Down
24 changes: 12 additions & 12 deletions benches/benchmarks/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);
assert_eq!(read_res, NetworkStatus::IncomingDisconnect);
assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);
},
)
});
Expand All @@ -122,9 +122,9 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);
assert_eq!(read_res, NetworkStatus::IncomingDisconnect);
assert_eq!(102, pingpong.number.load(std::sync::atomic::Ordering::SeqCst));
assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);

let _server_box = black_box(client.clone());
let _server_box = black_box(server);
Expand Down Expand Up @@ -159,9 +159,9 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);

let _network_box = black_box(network);
},
Expand Down Expand Up @@ -192,9 +192,9 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);
}
)
});
Expand All @@ -220,11 +220,11 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);
assert_eq!(102, num_packets_received.load(std::sync::atomic::Ordering::SeqCst));

assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);

let _server_box = black_box(client.clone());
let _server_box = black_box(server);
let _addr_box = black_box(addr);
Expand Down Expand Up @@ -258,9 +258,9 @@ fn tokio_concurrent_benchmarks(c: &mut Criterion) {
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_ok());
let read_res = read_res.unwrap();
assert!(read_res.is_some());
assert_eq!(read_res.unwrap(), NetworkStatus::IncomingDisconnect);

assert_eq!(write_res.unwrap().unwrap(), NetworkStatus::ShutdownSignal);

let _network_box = black_box(network);
},
Expand Down
8 changes: 4 additions & 4 deletions src/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,21 @@ pub mod example_handlers{
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
let max_len = payload.len().min(10);
let a = &payload[0..max_len];
// let a = &payload[0..max_len];
if payload.to_lowercase().contains("ping") {
self.client.publish(p.topic.clone(), p.qos, p.retain, Bytes::from_static(b"pong")).await.unwrap();
println!("Received publish payload: {}", a);
// println!("Received publish payload: {}", a);

if !p.retain{
self.number.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}

println!("DBG: \n {}", &Packet::Publish(p));
// println!("DBG: \n {}", &Packet::Publish(p));
}
}
}
Packet::ConnAck(_) => {
println!("Connected!")
// println!("Connected!")
}
_ => (),
}
Expand Down
39 changes: 27 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ pub mod tests;
/// It is returned when the run handle returns from performing an operation.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum NetworkStatus {
/// The other side indicated a shutdown shutdown, Only used in concurrent context
ShutdownSignal,
/// Indicate that there was an incoming disconnect and the socket has been closed.
IncomingDisconnect,
/// Indicate that an outgoing disconnect has been transmited and the socket is closed
Expand Down Expand Up @@ -441,7 +443,9 @@ mod smol_lib_test {
let (n, _) = futures::join!(
async {
match network.run(&mut pingresp).await {

Ok(crate::NetworkStatus::OutgoingDisconnect) => return Ok(pingresp),
Ok(crate::NetworkStatus::ShutdownSignal) => unreachable!(),
Ok(crate::NetworkStatus::KeepAliveTimeout) => panic!(),
Ok(crate::NetworkStatus::IncomingDisconnect) => panic!(),
Err(err) => return Err(err),
Expand Down Expand Up @@ -472,15 +476,16 @@ mod smol_lib_test {
let address = "127.0.0.1";
let port = 2001;

let listener = smol::net::TcpListener::bind((address, port)).await.unwrap();

let (n, _) = futures::join!(
async {
let (mut network, client) = NetworkBuilder::new_from_options(options).smol_sequential_network();
let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
let mut pingresp = crate::example_handlers::PingResp::new(client.clone());
network.connect(stream, &mut pingresp).await
},
async {
let listener = smol::net::TcpListener::bind((address, port)).await.unwrap();
async move {
let (stream, _) = listener.accept().await.unwrap();
smol::Timer::after(std::time::Duration::from_secs(10)).await;
stream.shutdown(std::net::Shutdown::Write).unwrap();
Expand Down Expand Up @@ -532,7 +537,11 @@ mod tokio_lib_test {

network.connect(stream, &mut pingpong).await.unwrap();

client.subscribe(("mqrstt", QoS::ExactlyOnce)).await.unwrap();
let topic = crate::random_chars() + "_mqrstt";

client.subscribe((topic.as_str(), QoS::ExactlyOnce)).await.unwrap();

tokio::time::sleep(Duration::from_secs(5)).await;

let (read, write) = network.split(pingpong.clone()).unwrap();

Expand All @@ -543,21 +552,27 @@ mod tokio_lib_test {
read_handle,
write_handle,
async {
client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".to_vec()).await.unwrap();
client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".to_vec()).await.unwrap();
client.publish("mqrstt".to_string(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();

client.unsubscribe("mqrstt").await.unwrap();

tokio::time::sleep(Duration::from_secs(30)).await;
client.publish(topic.as_str(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();
client.publish(topic.as_str(), QoS::ExactlyOnce, false, b"ping".to_vec()).await.unwrap();
client.publish(topic.as_str(), QoS::ExactlyOnce, false, b"ping".to_vec()).await.unwrap();
client.publish(topic.as_str(), QoS::ExactlyOnce, false, b"ping".repeat(500)).await.unwrap();

client.unsubscribe(topic.as_str()).await.unwrap();

for _ in 0..30 {
tokio::time::sleep(Duration::from_secs(1)).await;
if pingpong.number.load(std::sync::atomic::Ordering::SeqCst) == 4 {
break;
}
}

client.disconnect().await.unwrap();
}
);

let write_result = write_result.unwrap();
assert!(write_result.is_ok());
assert_eq!(crate::NetworkStatus::OutgoingDisconnect, write_result.unwrap().unwrap());
assert_eq!(crate::NetworkStatus::OutgoingDisconnect, write_result.unwrap());
assert_eq!(4, pingpong.number.load(std::sync::atomic::Ordering::SeqCst));
let _ = black_box(read_result);
}
Expand Down
2 changes: 1 addition & 1 deletion src/sync/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ where
}
}

fn run<H>(&mut self, handler: &mut H) -> Result<Option<NetworkStatus>, ConnectionError>
fn run<H>(&mut self, handler: &mut H) -> Result<NetworkStatus, ConnectionError>
where
H: EventHandler,
{
Expand Down
2 changes: 1 addition & 1 deletion src/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct SequentialHandler;
/// This kind of handler is used for both concurrent message handling and concurrent TCP read and write operations.
pub struct ConcurrentHandler;

trait HandlerExt<H>: Sized{
pub trait HandlerExt<H>: Sized{
/// Should call the handler in the fashion of the handler.
/// (e.g. spawn a task if or await the handle call)
fn call_handler(handler: &mut H, incoming_packet: Packet) -> impl Future<Output = ()> + Send;
Expand Down
18 changes: 9 additions & 9 deletions src/tokio/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ where
/// - Ok(None) in the case that the write task requested shutdown.
/// - Ok(Some(reason)) in the case that this task initiates a shutdown.
/// - Err in the case of IO, or protocol errors.
pub async fn run(mut self) -> Result<Option<NetworkStatus>, ConnectionError> {
pub async fn run(mut self) -> Result<NetworkStatus, ConnectionError> {
let ret = self.read().await;
self.run_signal.store(false, std::sync::atomic::Ordering::Release);
while let Some(_) = self.join_set.join_next().await {
()
}
ret
}
async fn read(&mut self) -> Result<Option<NetworkStatus>, ConnectionError> {
async fn read(&mut self) -> Result<NetworkStatus, ConnectionError> {
while self.run_signal.load(std::sync::atomic::Ordering::Acquire) {
let _ = self.read_stream.read_bytes().await?;
loop {
Expand All @@ -316,7 +316,7 @@ where
}
Packet::Disconnect(_) => {
N::call_handler(&mut self.handler, packet).await;
return Ok(Some(NetworkStatus::IncomingDisconnect));
return Ok(NetworkStatus::IncomingDisconnect);
}
Packet::ConnAck(conn_ack) => {
if let Some(retransmit_packets) = self.state_handler.handle_incoming_connack(&conn_ack)? {
Expand All @@ -338,7 +338,7 @@ where
}
}
}
Ok(None)
Ok(NetworkStatus::ShutdownSignal)
}
}

Expand Down Expand Up @@ -371,12 +371,12 @@ where
/// - Ok(None) in the case that the read task requested shutdown
/// - Ok(Some(reason)) in the case that this task initiates a shutdown
/// - Err in the case of IO, or protocol errors.
pub async fn run(mut self) -> Result<Option<NetworkStatus>, ConnectionError> {
pub async fn run(mut self) -> Result<NetworkStatus, ConnectionError> {
let ret = self.write().await;
self.run_signal.store(false, std::sync::atomic::Ordering::Release);
ret
}
async fn write(&mut self) -> Result<Option<NetworkStatus>, ConnectionError> {
async fn write(&mut self) -> Result<NetworkStatus, ConnectionError> {
while self.run_signal.load(std::sync::atomic::Ordering::Acquire) {
if self.await_pingresp_time.is_some() && !self.await_pingresp_bool.load(std::sync::atomic::Ordering::Acquire) {
self.await_pingresp_time = None;
Expand All @@ -400,7 +400,7 @@ where
self.last_network_action = Instant::now();

if disconnect{
return Ok(Some(NetworkStatus::OutgoingDisconnect))
return Ok(NetworkStatus::OutgoingDisconnect);
}
},
from_reader = self.to_writer_r.recv() => {
Expand All @@ -426,11 +426,11 @@ where
if self.await_pingresp_bool.load(std::sync::atomic::Ordering::SeqCst){
let disconnect = Disconnect{ reason_code: DisconnectReasonCode::KeepAliveTimeout, properties: Default::default() };
self.write_stream.write(&Packet::Disconnect(disconnect)).await?;
return Ok(Some(NetworkStatus::KeepAliveTimeout))
return Ok(NetworkStatus::KeepAliveTimeout);
}
}
}
}
Ok(None)
Ok(NetworkStatus::ShutdownSignal)
}
}

0 comments on commit d5eb571

Please sign in to comment.