Skip to content

Commit

Permalink
fix blocking issues
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoyu2002 committed Jun 9, 2024
1 parent 76ffdc9 commit a3a8bdb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
11 changes: 5 additions & 6 deletions src/client/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub async fn send_message(stream: &TcpStream, message: &Message) -> Result<(), S
let mut bytes_written = 0;
let serialized_message = message.serialize();
while bytes_written < serialized_message.len() {
stream.writable().await.unwrap();
// stream.writable().await.unwrap();
match stream.try_write(&serialized_message[bytes_written..]) {
Ok(0) => {
return Err(SendError::SendError(SendMessageError {
Expand All @@ -197,6 +197,7 @@ pub async fn send_message(stream: &TcpStream, message: &Message) -> Result<(), S
}));
}
};
yield_now().await;
}
Ok(())
}
Expand All @@ -205,7 +206,6 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
let mut len = [0u8; 4];
let mut bytes_read = 0;
while bytes_read < 4 {
stream.readable().await.unwrap();
match stream.try_read(&mut len[bytes_read..]) {
Ok(0) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
Expand All @@ -224,6 +224,7 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
}));
}
}
yield_now().await;
}
let len = u32::from_be_bytes(len);
if len == 0 {
Expand All @@ -237,7 +238,6 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
let mut message = vec![0u8; len as usize];
let mut bytes_read = 0;
while bytes_read < len as usize {
stream.readable().await.unwrap();
match stream.try_read(&mut message[bytes_read..]) {
Ok(0) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
Expand All @@ -247,15 +247,14 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
Ok(n) => {
bytes_read += n;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
yield_now().await;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
error: format!("Failed to read message: {}", e),
}));
}
}
yield_now().await;
}
let id = message[0];
let payload = message[1..].to_vec();
Expand Down
4 changes: 3 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Client {
}

pub async fn download(&mut self) -> Result<(), ClientError> {
self.connect_to_peers(10).await?;
self.connect_to_peers(30).await?;

let _ = tokio::join!(
self.send_messages(),
Expand Down Expand Up @@ -368,6 +368,7 @@ impl Client {
loop {
// println!("Retrieving messages...");
for (peer_id, peer) in peers.read().await.iter() {
// println!("pre-receive");
match receive_message(&peer.lock().await.stream).await {
Ok(message) => {
println!(
Expand All @@ -392,6 +393,7 @@ impl Client {
peers_to_remove.push(peer_id.clone());
}
}
// println!("post-receive");
peer.lock().await.last_touch = Utc::now();
yield_now().await;
}
Expand Down

0 comments on commit a3a8bdb

Please sign in to comment.