Skip to content

Commit

Permalink
test: setting up test for bidi
Browse files Browse the repository at this point in the history
Co-authored-by: Benno Zeeman <bzeeman@live.nl>
  • Loading branch information
joshuef and b-zee committed Mar 14, 2023
1 parent e80c859 commit 6b52319
Showing 1 changed file with 54 additions and 62 deletions.
116 changes: 54 additions & 62 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use futures::future;
use std::sync::Arc;
use std::{collections::BTreeSet, time::Duration};
use tracing::info;
// use tracing_test::traced_test;

#[tokio::test(flavor = "multi_thread")]
async fn successful_connection() -> Result<()> {
Expand Down Expand Up @@ -523,9 +522,9 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()
while let Ok(Ok(Some(WireMsg((_, _, msg))))) =
recv_incoming_messages.next().timeout().await
{
let remote_port = connection.remote_address().port();
info!(
"received from {:?} with message size {}",
connection.remote_address(),
"[server] received from {remote_port} with message size {}",
msg.len()
);
task_connection = Some(connection.clone());
Expand All @@ -539,7 +538,7 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()
}

// Send the hash result back.
info!("About to send hash from receiver");
info!("[server][to {remote_port}] About to send hash from receiver");
connection
.send((Bytes::new(), Bytes::new(), hash_result.to_vec().into()))
.await?;
Expand All @@ -557,7 +556,7 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()

let _res = future::try_join_all(sending_tasks).await?;

info!("RECEIVER CLOSED");
info!("[server] RECEIVER CLOSED");

Ok(task_connection)
}
Expand All @@ -578,14 +577,17 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()
assert_eq!(messages.len(), num_messages_each);
for (index, message) in messages.iter().enumerate() {
let _ = hash_results.insert(hash(message));
info!("sender #{} sending message #{}", id, index);
info!(
"[client][{}] sending #{index}",
send_endpoint.local_addr().port()
);
connection
.send((Bytes::new(), Bytes::new(), message.clone()))
.await?;
}

info!(
"sender #{} completed sending messages, starts listening",
"[client] sender #{} completed sending messages, starts listening",
id
);

Expand All @@ -594,15 +596,12 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()
recv_incoming_messages.next().timeout().await
{
info!(
"#{} received from server {:?} with message size {}",
id,
connection.remote_address(),
msg.len()
len = msg.len(),
"[client][{}] received message",
send_endpoint.local_addr().port(),
);

info!("Hash len before: {:?}", hash_results.len());
assert!(hash_results.remove(&msg[..]));
info!("Hash len after: {:?}", hash_results.len());

all_received_msgs += 1;
if hash_results.is_empty() && all_received_msgs == num_messages_each {
Expand All @@ -624,7 +623,6 @@ async fn multiple_connections_with_many_large_concurrent_messages() -> Result<()
async fn multiple_connections_with_many_large_concurrent_bidi_messages() -> Result<()> {
use futures::future;

info!("woooo");
let num_senders: usize = 10;
let num_messages_each: usize = 100;
let num_messages_total: usize = num_senders * num_messages_each;
Expand All @@ -647,23 +645,19 @@ async fn multiple_connections_with_many_large_concurrent_bidi_messages() -> Resu
async move {
let mut num_received = 0;
let mut sending_tasks = Vec::new();
// let mut task_connection = None;
while let Some((connection, mut recv_incoming_messages)) =
recv_incoming_connections.next().await
{
debug!("Sssssssssssssssssssssssss");
let connection = Arc::new(connection);
while let Ok(Some((WireMsg((_, _, msg)), Some(mut send_stream)))) =
recv_incoming_messages.next_with_stream().await
{
let remote_port = connection.remote_address().port();
info!(
"received from {:?} with message size {}",
connection.remote_address(),
"[server] received from {remote_port} with message size {}",
msg.len()
);
// task_connection = Some(connection.clone());
sending_tasks.push(tokio::spawn({
// let connection = connection.clone();
async move {
let hash_result = hash(&msg);
// to simulate certain workload.
Expand All @@ -672,9 +666,13 @@ async fn multiple_connections_with_many_large_concurrent_bidi_messages() -> Resu
}

// Send the hash result back.
info!("About to send hash from receiver");
info!("[server][to {remote_port}] About to send hash from receiver");
send_stream
.send_user_msg((Bytes::new(), Bytes::new(), hash_result.to_vec().into()))
.send_user_msg((
Bytes::new(),
Bytes::new(),
hash_result.to_vec().into(),
))
.await?;
send_stream.finish().await?;

Expand All @@ -684,20 +682,16 @@ async fn multiple_connections_with_many_large_concurrent_bidi_messages() -> Resu

num_received += 1;
}

debug!("hmmmmmmmmmmmm");
// break;
if num_received >= num_messages_total {
break;
}
}

let _res = future::try_join_all(sending_tasks).await?;

info!("Receiver closed");
info!("[server] RECEIVER CLOSED");

Ok(())
// Ok(task_connection)
}
}));

Expand All @@ -706,55 +700,53 @@ async fn multiple_connections_with_many_large_concurrent_bidi_messages() -> Resu
let messages = sending_msgs.clone();
tasks.push(tokio::spawn({
let (send_endpoint, _) = new_endpoint_with_keepalive().await?;
// let task_connection = None;

async move {
let mut hash_results = BTreeSet::new();
let (connection, mut _recv_incoming_messages) =
send_endpoint.connect_to(&server_addr).await?;

assert_eq!(messages.len(), num_messages_each);
// let mut all_received_msgs = 0;

let mut recv_streams = vec![];
let mut all_received_msgs = 0;

for (index, message) in messages.iter().enumerate() {
let _ = hash_results.insert(hash(message));
info!("sender #{} sending message #{}", id, index);
let (mut send, recv) = connection
.open_bi().await?;

send.send_user_msg((Bytes::new(), Bytes::new(), message.clone())).await?;
send.finish().await?;
// send.((Bytes::new(), Bytes::new(), message.clone()))
// .await?;

let (_,_, msg) = recv.read().await?;
info!(
"#{} received from server {:?} with message size {}",
id,
connection.remote_address(),
msg.len()
);
info!("Hash len before: {:?}", hash_results.len());
assert!(hash_results.remove(&msg[..]));
// all_received_msgs += 1;
info!("Hash len after: {:?}", hash_results.len());
}


// if let Ok(Some(WireMsg((_, _, msg)))) =
info!(
"sender #{} completed sending message on this streams, starts listening",
id
"[client][{}] sending #{index}",
send_endpoint.local_addr().port()
);
let (mut send, recv) = connection.open_bi().await?;
recv_streams.push(recv);

send.send_user_msg((Bytes::new(), Bytes::new(), message.clone()))
.await?;
send.finish().await?;
}

info!(
"[client] sender #{} sent all messages, starts listening",
id
);

for recv in recv_streams {
let (_, _, msg) = recv.read().await?;
info!(
len = msg.len(),
"[client][{}] received message",
send_endpoint.local_addr().port(),
);
// recv.read().timeout().await
// {

// if hash_results.is_empty() && all_received_msgs == num_messages_each {
// break;
// }
// }
assert!(hash_results.remove(&msg[..]));

all_received_msgs += 1;
if hash_results.is_empty() && all_received_msgs == num_messages_each {
break;
}
}

Ok::<_, Report>(())
// Ok::<_, Report>(task_connection)
}
}));
}
Expand Down

0 comments on commit 6b52319

Please sign in to comment.