Skip to content

Commit

Permalink
fix message receive + send
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoyu2002 committed Jun 8, 2024
1 parent 344e71c commit 3656eae
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 16 deletions.
15 changes: 7 additions & 8 deletions src/client/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use tokio::{io::AsyncReadExt, net::TcpStream, task::yield_now};
use tokio::{net::TcpStream, task::yield_now};

pub enum MessageId {
Choke = 0,
Expand Down Expand Up @@ -174,9 +174,10 @@ impl Display for Message {

pub async fn send_message(stream: &TcpStream, message: &Message) -> Result<(), SendError> {
let mut bytes_written = 0;
while bytes_written < message.serialize().len() {
let serialized_message = message.serialize();
while bytes_written < serialized_message.len() {
stream.writable().await.unwrap();
match stream.try_write(&message.serialize()) {
match stream.try_write(&serialized_message[bytes_written..]) {
Ok(0) => {
return Err(SendError::SendError(SendMessageError {
message: message.clone(),
Expand Down Expand Up @@ -205,7 +206,7 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
let mut bytes_read = 0;
while bytes_read < 4 {
stream.readable().await.unwrap();
match stream.try_read(&mut len) {
match stream.try_read(&mut len[bytes_read..]) {
Ok(0) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
error: "stream was closed".to_string(),
Expand Down Expand Up @@ -233,20 +234,18 @@ pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError
});
}

let mut message = Vec::new();
let mut message = vec![0u8; len as usize];
let mut bytes_read = 0;
while bytes_read < len as usize {
let mut buffer = vec![0u8; len as usize];
stream.readable().await.unwrap();
match stream.try_read(&mut buffer) {
match stream.try_read(&mut message[bytes_read..]) {
Ok(0) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
error: "stream was closed".to_string(),
}))
}
Ok(n) => {
bytes_read += n;
message.extend_from_slice(&buffer[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
yield_now().await;
Expand Down
9 changes: 1 addition & 8 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::{Mutex, RwLock},
task::{yield_now, JoinHandle, JoinSet},
task::{JoinHandle, JoinSet},
time::timeout,
};

Expand Down Expand Up @@ -170,7 +170,6 @@ impl Client {
tokio::spawn(async move {
loop {
let Some((peer_id, message)) = receive_queue.lock().await.pop_front() else {
yield_now().await;
continue;
};

Expand Down Expand Up @@ -338,8 +337,6 @@ impl Client {
peers.write().await.remove(&peer_id);
piece_scheduler.write().await.remove_peer_count(&peer_id);
}

yield_now().await;
}
})
}
Expand Down Expand Up @@ -384,7 +381,6 @@ impl Client {
.push_back((peer_id.clone(), message));
}
Err(ReceiveError::WouldBlock) => {
yield_now().await;
continue;
}
Err(e) => {
Expand All @@ -400,7 +396,6 @@ impl Client {

peer.write().await.last_touch = Utc::now();
}
yield_now().await;

for peer_id in &peers_to_remove {
if peers.write().await.remove(peer_id).is_some() {
Expand All @@ -422,7 +417,6 @@ impl Client {
tokio::spawn(async move {
loop {
let Some((peer_id, message)) = send_queue.lock().await.pop_front() else {
yield_now().await;
continue;
};

Expand Down Expand Up @@ -450,7 +444,6 @@ impl Client {
}
Err(SendError::WouldBlock) => {
send_queue.lock().await.push_back((peer_id, message));
yield_now().await;
}
Err(_) => {
println!(
Expand Down

0 comments on commit 3656eae

Please sign in to comment.