Skip to content

Commit

Permalink
Use async_io to simplify zero_copy
Browse files Browse the repository at this point in the history
Signed-off-by: yuguorui <yuguorui@pku.edu.cn>
  • Loading branch information
yuguorui committed Jan 1, 2025
1 parent f799cfd commit 807a995
Showing 1 changed file with 16 additions and 38 deletions.
54 changes: 16 additions & 38 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(dead_code)]

use ipnet::{IpNet, Ipv6Net, PrefixLenError};
use tokio::io::AsyncWriteExt;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::{
Expand Down Expand Up @@ -166,7 +167,7 @@ fn splice_n(r: i32, w: i32, n: usize, has_more_data: bool) -> std::io::Result<us
}
}

pub async fn zero_copy(r: ReadHalf<'_>, w: WriteHalf<'_>) -> io::Result<()>
pub async fn zero_copy(r: ReadHalf<'_>, mut w: WriteHalf<'_>) -> io::Result<usize>
where
{
// create pipe
Expand All @@ -179,47 +180,24 @@ where
let rfd = rx.as_raw_fd();
let wfd = wx.as_raw_fd();

let mut bytes = 0;

loop {
let mut n;

rx.readable().await?;
match splice_n(rfd, wpipe, PIPE_BUF_SIZE, false) {
Ok(ret) => {
if ret > 0 {
n = ret;
} else {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "zero_copy"));
}
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
let _ = rx.try_io(Interest::READABLE, || {
Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, ""))
as std::io::Result<()>
});
continue;
}
Err(err) => {
return Err(err);
}
let mut n = rx.async_io(Interest::READABLE, || {
splice_n(rfd, wpipe, PIPE_BUF_SIZE, false)
}).await?;

if n == 0 {
w.shutdown().await?;
return Ok(bytes);
}

bytes += n;

while n > 0 {
wx.writable().await?;
match splice_n(rpipe, wfd, n, false) {
Ok(ret) => {
n -= ret;
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
let _ = wx.try_io(Interest::WRITABLE, || {
Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, ""))
as std::io::Result<()>
});
continue;
}
Err(err) => {
return Err(err);
}
}
n -= wx.async_io(Interest::WRITABLE, || {
splice_n(rpipe, wfd, n, false)
}).await?;
}
}
}
Expand Down

0 comments on commit 807a995

Please sign in to comment.