diff --git a/docker-compose.yml b/docker-compose.yml index 7544351..8c60252 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,8 @@ services: - "172.16.0.2:5000" - "--to_udp" - "172.16.0.3:6000" + - "--bandwidth_limit" + - "10" receive: image: diode:receive diff --git a/src/bin/diode-send.rs b/src/bin/diode-send.rs index 5b2fa21..e9ad269 100644 --- a/src/bin/diode-send.rs +++ b/src/bin/diode-send.rs @@ -23,6 +23,7 @@ struct Config { to_udp: net::SocketAddr, to_udp_mtu: u16, heartbeat: Option, + bandwidth_limit: f64, } fn command_args() -> Config { @@ -120,6 +121,14 @@ fn command_args() -> Config { .value_parser(clap::value_parser!(u16)) .help("Duration between two emitted heartbeat messages, 0 to disable"), ) + .arg( + Arg::new("bandwidth_limit") + .long("bandwidth_limit") + .value_name("bandwidth_limit_mbit") + .default_value("0") + .value_parser(clap::value_parser!(f64)) + .help("Set the bandwidth limit for transfer speed between pitcher and catcher in Mbit/s. Use 0 to disable the limit."), + ) .get_matches(); let from_tcp = net::SocketAddr::from_str(args.get_one::("from_tcp").expect("default")) @@ -148,6 +157,11 @@ fn command_args() -> Config { (hb != 0).then(|| time::Duration::from_secs(hb)) }; + let bandwidth_limit = { + let target_bandwidth_mbps = *args.get_one::("bandwidth_limit").expect("default");// Target bandwidth in Mbps + target_bandwidth_mbps * 1_000_000.0 / 8.0 // Convert Mbps to bytes per second + }; + Config { from_tcp, from_unix, @@ -161,6 +175,7 @@ fn command_args() -> Config { to_udp, to_udp_mtu, heartbeat, + bandwidth_limit, } } @@ -248,6 +263,7 @@ fn main() { to_bind: config.to_bind, to_udp: config.to_udp, to_mtu: config.to_udp_mtu, + bandwidth_limit: config.bandwidth_limit, }); thread::scope(|scope| { diff --git a/src/send/mod.rs b/src/send/mod.rs index b15242f..478350d 100644 --- a/src/send/mod.rs +++ b/src/send/mod.rs @@ -44,6 +44,7 @@ pub struct Config { pub to_bind: net::SocketAddr, pub to_udp: net::SocketAddr, pub to_mtu: u16, + pub bandwidth_limit: f64, } impl Config { diff --git a/src/send/udp.rs b/src/send/udp.rs index b13d8f7..779832d 100644 --- a/src/send/udp.rs +++ b/src/send/udp.rs @@ -25,6 +25,7 @@ pub(crate) fn start(sender: &send::Sender) -> Result<(), send::Error> { socket, usize::from(sender.to_max_messages), sender.config.to_udp, + sender.config.bandwidth_limit, ); loop { diff --git a/src/udp.rs b/src/udp.rs index 304f501..848d618 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -2,7 +2,8 @@ use std::marker::PhantomData; use std::os::fd::AsRawFd; -use std::{io, mem, net}; +use std::time::{Duration, Instant}; +use std::{io, mem, net, thread}; pub struct UdpRecv; pub struct UdpSend; @@ -20,6 +21,7 @@ pub struct UdpMessages { iovecs: Vec, buffers: Vec>, marker: PhantomData, + bandwidth_limit: f64, } impl UdpMessages { @@ -28,6 +30,7 @@ impl UdpMessages { vlen: usize, msglen: Option, addr: Option, + bandwidth_limit: f64, ) -> Self { let (mut msgvec, mut iovecs, mut buffers); @@ -89,6 +92,7 @@ impl UdpMessages { iovecs, buffers, marker: PhantomData, + bandwidth_limit, } } } @@ -96,7 +100,7 @@ impl UdpMessages { impl UdpMessages { pub fn new_receiver(socket: net::UdpSocket, vlen: usize, msglen: usize) -> Self { log::info!("UDP configured to receive {vlen} messages (datagrams)"); - Self::new(socket, vlen, Some(msglen), None) + Self::new(socket, vlen, Some(msglen), None, 0.0) } pub fn recv_mmsg(&mut self) -> Result, io::Error> { @@ -128,35 +132,67 @@ impl UdpMessages { socket: net::UdpSocket, vlen: usize, dest: net::SocketAddr, + bandwidth_limit: f64, ) -> UdpMessages { log::info!("UDP configured to send {vlen} messages (datagrams) at a time"); - Self::new(socket, vlen, None, Some(dest)) + //std::println!("{}",udpdelay.unwrap().as_micros()); + Self::new(socket, vlen, None, Some(dest), bandwidth_limit) } pub fn send_mmsg(&mut self, mut buffers: Vec>) -> Result<(), io::Error> { for bufchunk in buffers.chunks_mut(self.vlen) { - let to_send = bufchunk.len(); + if self.bandwidth_limit > 0.0 { + for (i, buf) in bufchunk.iter_mut().enumerate() { + self.msgvec[i].msg_len = buf.len() as u32; + self.iovecs[i].iov_base = buf.as_mut_ptr().cast::(); + self.iovecs[i].iov_len = buf.len(); - for (i, buf) in bufchunk.iter_mut().enumerate() { - self.msgvec[i].msg_len = buf.len() as u32; - self.iovecs[i].iov_base = buf.as_mut_ptr().cast::(); - self.iovecs[i].iov_len = buf.len(); - } + let start_time = Instant::now(); + let nb_msg; + unsafe { + nb_msg = libc::sendmmsg(self.socket.as_raw_fd(), &mut self.msgvec[i], 1, 0); + } - let nb_msg; - unsafe { - nb_msg = libc::sendmmsg( - self.socket.as_raw_fd(), - self.msgvec.as_mut_ptr(), - to_send as u32, - 0, - ); - } - if nb_msg == -1 { - return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg")); - } - if nb_msg as usize != to_send { - log::warn!("nb prepared messages doesn't match with nb sent messages"); + if nb_msg == -1 { + return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg")); + } + + let send_duration = start_time.elapsed().as_secs_f64(); + let bytes_sent = buf.len() as f64; + let ideal_time_per_byte = 1.0 / self.bandwidth_limit; + let ideal_send_duration = bytes_sent * ideal_time_per_byte; + let sleep_duration = if ideal_send_duration > send_duration { + Duration::from_secs_f64(ideal_send_duration - send_duration) + } else { + Duration::from_secs(0) + }; + + thread::sleep(sleep_duration); + } + }else { + let to_send = bufchunk.len(); + + for (i, buf) in bufchunk.iter_mut().enumerate() { + self.msgvec[i].msg_len = buf.len() as u32; + self.iovecs[i].iov_base = buf.as_mut_ptr().cast::(); + self.iovecs[i].iov_len = buf.len(); + } + + let nb_msg; + unsafe { + nb_msg = libc::sendmmsg( + self.socket.as_raw_fd(), + self.msgvec.as_mut_ptr(), + to_send as u32, + 0, + ); + } + if nb_msg == -1 { + return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg")); + } + if nb_msg as usize != to_send { + log::warn!("nb prepared messages doesn't match with nb sent messages"); + } } } Ok(())