Skip to content

Commit

Permalink
Added tests for when tcp mqtt clients read half closed
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed May 16, 2023
1 parent d857fc9 commit 0a53171
Showing 1 changed file with 81 additions and 1 deletion.
82 changes: 81 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ mod lib_test {
}
},
async move {
smol::Timer::after(std::time::Duration::from_secs(150)).await;
tokio::time::sleep(Duration::new(150, 0)).await;
client.disconnect().await.unwrap();
}
)
Expand All @@ -743,6 +743,45 @@ mod lib_test {
assert_eq!(2, pingresp.ping_resp_received);
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_close_write_tcp_stream_tokio() {
use std::io::ErrorKind;
use crate::{error::ConnectionError};

let address = ("127.0.0.1", 1883);

let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
client_id += "_TokioTcpPingReqTest";
let options = ConnectOptions::new(client_id);

let (n, _) = tokio::join!(
async move {
let (mut network, client) = new_tokio(options);

let stream = tokio::net::TcpStream::connect(address).await.unwrap();

let mut pingresp = PingResp::new(client.clone());

return network.connect(stream, &mut pingresp).await;
},
async move {
let listener = smol::net::TcpListener::bind(address).await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::new(5, 0)).await;
stream.shutdown(std::net::Shutdown::Write).unwrap();
}
);

if let ConnectionError::Io(err) = n.unwrap_err() {
assert_eq!(ErrorKind::ConnectionReset, err.kind());
assert_eq!("Connection reset by peer".to_string(), err.to_string());
}
else{
assert!(false);
}
}

#[cfg(feature = "smol")]
#[test]
fn test_smol_ping_req() {
Expand Down Expand Up @@ -783,4 +822,45 @@ mod lib_test {
assert_eq!(2, pingreq.ping_resp_received);
});
}



#[cfg(feature = "smol")]
#[test]
fn test_close_write_tcp_stream_smol() {
use std::io::ErrorKind;
use crate::{error::ConnectionError};

smol::block_on(async {
let mut client_id: String = rand::thread_rng().sample_iter(&rand::distributions::Alphanumeric).take(7).map(char::from).collect();
client_id += "_SmolTcpPingReqTest";
let options = ConnectOptions::new(client_id);

let address = "127.0.0.1";
let port = 1883;


let (n, _) = futures::join!(
async {
let (mut network, client) = new_smol(options);
let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();
let mut pingresp = PingResp::new(client.clone());
return network.connect(stream, &mut pingresp).await;
},
async {
let listener = smol::net::TcpListener::bind((address, port)).await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
smol::Timer::after(std::time::Duration::from_secs(2)).await;
stream.shutdown(std::net::Shutdown::Write).unwrap();
}
);
if let ConnectionError::Io(err) = n.unwrap_err() {
assert_eq!(ErrorKind::ConnectionReset, err.kind());
assert_eq!("Connection reset by peer".to_string(), err.to_string());
}
else{
assert!(false);
}
});
}
}

0 comments on commit 0a53171

Please sign in to comment.