Skip to content

Commit

Permalink
Added bandwidth limits to prevent receiver overflow (--bandwidth_limi…
Browse files Browse the repository at this point in the history
…t in Mbit/s) (#12)

Co-authored-by: define42 <define42@github.com>
  • Loading branch information
define42 and define42 authored Jul 8, 2024
1 parent fa7ffeb commit 3137369
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 23 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ services:
- "172.16.0.2:5000"
- "--to_udp"
- "172.16.0.3:6000"
- "--bandwidth_limit"
- "10"

receive:
image: diode:receive
Expand Down
16 changes: 16 additions & 0 deletions src/bin/diode-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct Config {
to_udp: net::SocketAddr,
to_udp_mtu: u16,
heartbeat: Option<time::Duration>,
bandwidth_limit: f64,
}

fn command_args() -> Config {
Expand Down Expand Up @@ -120,6 +121,14 @@ fn command_args() -> Config {
.value_parser(clap::value_parser!(u16))
.help("Duration between two emitted heartbeat messages, 0 to disable"),
)
.arg(
Arg::new("bandwidth_limit")
.long("bandwidth_limit")
.value_name("bandwidth_limit_mbit")
.default_value("0")
.value_parser(clap::value_parser!(f64))
.help("Set the bandwidth limit for transfer speed between pitcher and catcher in Mbit/s. Use 0 to disable the limit."),
)
.get_matches();

let from_tcp = net::SocketAddr::from_str(args.get_one::<String>("from_tcp").expect("default"))
Expand Down Expand Up @@ -148,6 +157,11 @@ fn command_args() -> Config {
(hb != 0).then(|| time::Duration::from_secs(hb))
};

let bandwidth_limit = {
let target_bandwidth_mbps = *args.get_one::<f64>("bandwidth_limit").expect("default");// Target bandwidth in Mbps
target_bandwidth_mbps * 1_000_000.0 / 8.0 // Convert Mbps to bytes per second
};

Config {
from_tcp,
from_unix,
Expand All @@ -161,6 +175,7 @@ fn command_args() -> Config {
to_udp,
to_udp_mtu,
heartbeat,
bandwidth_limit,
}
}

Expand Down Expand Up @@ -248,6 +263,7 @@ fn main() {
to_bind: config.to_bind,
to_udp: config.to_udp,
to_mtu: config.to_udp_mtu,
bandwidth_limit: config.bandwidth_limit,
});

thread::scope(|scope| {
Expand Down
1 change: 1 addition & 0 deletions src/send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Config {
pub to_bind: net::SocketAddr,
pub to_udp: net::SocketAddr,
pub to_mtu: u16,
pub bandwidth_limit: f64,
}

impl Config {
Expand Down
1 change: 1 addition & 0 deletions src/send/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) fn start<C>(sender: &send::Sender<C>) -> Result<(), send::Error> {
socket,
usize::from(sender.to_max_messages),
sender.config.to_udp,
sender.config.bandwidth_limit,
);

loop {
Expand Down
82 changes: 59 additions & 23 deletions src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
use std::marker::PhantomData;
use std::os::fd::AsRawFd;
use std::{io, mem, net};
use std::time::{Duration, Instant};
use std::{io, mem, net, thread};

pub struct UdpRecv;
pub struct UdpSend;
Expand All @@ -20,6 +21,7 @@ pub struct UdpMessages<D> {
iovecs: Vec<libc::iovec>,
buffers: Vec<Vec<u8>>,
marker: PhantomData<D>,
bandwidth_limit: f64,
}

impl<D> UdpMessages<D> {
Expand All @@ -28,6 +30,7 @@ impl<D> UdpMessages<D> {
vlen: usize,
msglen: Option<usize>,
addr: Option<net::SocketAddr>,
bandwidth_limit: f64,
) -> Self {
let (mut msgvec, mut iovecs, mut buffers);

Expand Down Expand Up @@ -89,14 +92,15 @@ impl<D> UdpMessages<D> {
iovecs,
buffers,
marker: PhantomData,
bandwidth_limit,
}
}
}

impl UdpMessages<UdpRecv> {
pub fn new_receiver(socket: net::UdpSocket, vlen: usize, msglen: usize) -> Self {
log::info!("UDP configured to receive {vlen} messages (datagrams)");
Self::new(socket, vlen, Some(msglen), None)
Self::new(socket, vlen, Some(msglen), None, 0.0)
}

pub fn recv_mmsg(&mut self) -> Result<impl Iterator<Item = &[u8]>, io::Error> {
Expand Down Expand Up @@ -128,35 +132,67 @@ impl UdpMessages<UdpSend> {
socket: net::UdpSocket,
vlen: usize,
dest: net::SocketAddr,
bandwidth_limit: f64,
) -> UdpMessages<UdpSend> {
log::info!("UDP configured to send {vlen} messages (datagrams) at a time");
Self::new(socket, vlen, None, Some(dest))
//std::println!("{}",udpdelay.unwrap().as_micros());
Self::new(socket, vlen, None, Some(dest), bandwidth_limit)
}

pub fn send_mmsg(&mut self, mut buffers: Vec<Vec<u8>>) -> Result<(), io::Error> {
for bufchunk in buffers.chunks_mut(self.vlen) {
let to_send = bufchunk.len();
if self.bandwidth_limit > 0.0 {
for (i, buf) in bufchunk.iter_mut().enumerate() {
self.msgvec[i].msg_len = buf.len() as u32;
self.iovecs[i].iov_base = buf.as_mut_ptr().cast::<libc::c_void>();
self.iovecs[i].iov_len = buf.len();

for (i, buf) in bufchunk.iter_mut().enumerate() {
self.msgvec[i].msg_len = buf.len() as u32;
self.iovecs[i].iov_base = buf.as_mut_ptr().cast::<libc::c_void>();
self.iovecs[i].iov_len = buf.len();
}
let start_time = Instant::now();
let nb_msg;
unsafe {
nb_msg = libc::sendmmsg(self.socket.as_raw_fd(), &mut self.msgvec[i], 1, 0);
}

let nb_msg;
unsafe {
nb_msg = libc::sendmmsg(
self.socket.as_raw_fd(),
self.msgvec.as_mut_ptr(),
to_send as u32,
0,
);
}
if nb_msg == -1 {
return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg"));
}
if nb_msg as usize != to_send {
log::warn!("nb prepared messages doesn't match with nb sent messages");
if nb_msg == -1 {
return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg"));
}

let send_duration = start_time.elapsed().as_secs_f64();
let bytes_sent = buf.len() as f64;
let ideal_time_per_byte = 1.0 / self.bandwidth_limit;
let ideal_send_duration = bytes_sent * ideal_time_per_byte;
let sleep_duration = if ideal_send_duration > send_duration {
Duration::from_secs_f64(ideal_send_duration - send_duration)
} else {
Duration::from_secs(0)
};

thread::sleep(sleep_duration);
}
}else {
let to_send = bufchunk.len();

for (i, buf) in bufchunk.iter_mut().enumerate() {
self.msgvec[i].msg_len = buf.len() as u32;
self.iovecs[i].iov_base = buf.as_mut_ptr().cast::<libc::c_void>();
self.iovecs[i].iov_len = buf.len();
}

let nb_msg;
unsafe {
nb_msg = libc::sendmmsg(
self.socket.as_raw_fd(),
self.msgvec.as_mut_ptr(),
to_send as u32,
0,
);
}
if nb_msg == -1 {
return Err(io::Error::new(io::ErrorKind::Other, "libc::sendmmsg"));
}
if nb_msg as usize != to_send {
log::warn!("nb prepared messages doesn't match with nb sent messages");
}
}
}
Ok(())
Expand Down

0 comments on commit 3137369

Please sign in to comment.