Skip to content

Commit

Permalink
try to optimize tcp locking
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoyu2002 committed Jun 3, 2024
1 parent 2271365 commit d68e2c1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
35 changes: 25 additions & 10 deletions src/client/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use tokio::{io::AsyncReadExt, net::TcpStream};
use tokio::{io::AsyncReadExt, net::TcpStream, task::yield_now};

pub enum MessageId {
Choke = 0,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl Display for Message {
}
}

pub async fn send_message(stream: &mut TcpStream, message: &Message) -> Result<(), SendError> {
pub async fn send_message(stream: &TcpStream, message: &Message) -> Result<(), SendError> {
let mut bytes_written = 0;
while bytes_written < message.serialize().len() {
stream.writable().await.unwrap();
Expand Down Expand Up @@ -200,7 +200,7 @@ pub async fn send_message(stream: &mut TcpStream, message: &Message) -> Result<(
Ok(())
}

pub async fn receive_message(stream: &mut TcpStream) -> Result<Message, ReceiveError> {
pub async fn receive_message(stream: &TcpStream) -> Result<Message, ReceiveError> {
let mut len = [0u8; 4];

let mut bytes_read = 0;
Expand Down Expand Up @@ -235,13 +235,28 @@ pub async fn receive_message(stream: &mut TcpStream) -> Result<Message, ReceiveE
}

let mut message = vec![0u8; len as usize];
stream.readable().await.unwrap();
stream.read_exact(&mut message).await.map_err(|e| {
ReceiveError::ReceiveError(ReceiveMessageError {
error: format!("Failed to read message: {}", e),
})
})?;

let mut bytes_read = 0;
while bytes_read < len as usize {
stream.readable().await.unwrap();
match stream.try_read(&mut message) {
Ok(0) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
error: "stream was closed".to_string(),
}))
}
Ok(n) => {
bytes_read += n;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
yield_now().await;
}
Err(e) => {
return Err(ReceiveError::ReceiveError(ReceiveMessageError {
error: format!("Failed to read message: {}", e),
}));
}
}
}
let id = message[0];
let payload = message[1..].to_vec();

Expand Down
18 changes: 6 additions & 12 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Client {
}

pub async fn download(&mut self) -> Result<(), ClientError> {
self.connect_to_peers(10).await?;
self.connect_to_peers(30).await?;

let _ = tokio::join!(
self.send_messages(),
Expand All @@ -165,7 +165,7 @@ impl Client {
tokio::spawn(async move {
loop {
for (peer_id, peer) in peers.read().await.iter() {
if (Utc::now() - peer.read().await.last_sent).num_seconds() > 40 {
if (Utc::now() - peer.read().await.last_sent).num_seconds() > 60 {
println!(
"Sending keep alive to peer: {:?}",
String::from_utf8_lossy(peer_id)
Expand All @@ -176,8 +176,6 @@ impl Client {
));
}
}

yield_now().await;
}
})
}
Expand All @@ -189,11 +187,7 @@ impl Client {
let mut peers_to_remove = Vec::new();
loop {
for (peer_id, peer) in peers.read().await.iter() {
let stream = &mut peer.write().await.stream;
// println!(
// "Receiving message from peer: {:?}",
// String::from_utf8_lossy(peer_id)
// );
let stream = &peer.read().await.stream;

match receive_message(stream).await {
Ok(message) => {
Expand Down Expand Up @@ -250,7 +244,7 @@ impl Client {
continue;
};

let stream = &mut peer.write().await.stream;
let stream = &peer.read().await.stream;
println!(
"Sending \"{}\" message from {}",
message.get_id(),
Expand Down Expand Up @@ -438,8 +432,8 @@ impl Client {
handle.map_err(|e| ClientError::GetPeersError(format!("{}", e)))?;

if let Err(e) = conection_result {
// #[cfg(debug_assertions)]
// eprintln!("{}", e);
#[cfg(debug_assertions)]
eprintln!("{}", e);
}
// println!("{}", handles.len())
}
Expand Down

0 comments on commit d68e2c1

Please sign in to comment.