Skip to content

Commit

Permalink
Merge branch 'main' into feature/improve-benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
PickingUpPieces committed Mar 13, 2024
2 parents 860856a + a877efc commit 22fd495
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 49 deletions.
8 changes: 4 additions & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"env": {
"RUST_LOG": "TRACE",
},
"args": ["client", "--with-mmsg", "--with-gso"],
"args": ["client", "--with-msg", "--single-socket", "--parallel", "2"],
"cwd": "${workspaceFolder}",
"preLaunchTask": "delay"
},
Expand All @@ -44,7 +44,7 @@
"env": {
"RUST_LOG": "TRACE",
},
"args": ["server", "--with-mmsg", "--with-gro"],
"args": ["server", "--with-msg", "--single-socket", "--parallel", "2"],
"cwd": "${workspaceFolder}"
},
{
Expand All @@ -55,7 +55,7 @@
"RUST_LOG": "INFO",
},
"program": "${workspaceFolder}/target/debug/nperf",
"args": ["client", "--with-msg", "--with-gso"],
"args": ["client", "--with-msg", "--single-socket", "--parallel", "2"],
"cwd": "${workspaceFolder}",
"stopOnEntry": false,
"preLaunchTask": "delay"
Expand All @@ -68,7 +68,7 @@
"RUST_LOG": "INFO",
},
"program": "${workspaceFolder}/target/debug/nperf",
"args": ["server", "--with-msg", "--with-gro"],
"args": ["server", "--with-msg", "--single-socket", "--parallel", "2"],
"cwd": "${workspaceFolder}",
"stopOnEntry": false,
},
Expand Down
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ A network performance measurement tool
* `-t`, `--time <TIME>` — Time to run the test

Default value: `10`
* `--with-gso` — Enable GSO on sending socket
* `--with-gsro` — Enable GSO/GRO on socket

Default value: `false`

Expand All @@ -53,12 +53,6 @@ A network performance measurement tool
* `--with-mss <WITH_MSS>` — Set transmit buffer size. Gets overwritten by GSO/GRO buffer size if GSO/GRO is enabled

Default value: `1472`
* `--with-gro` — Enable GRO on receiving socket

Default value: `false`

Possible values: `true`, `false`

* `--with-ip-frag` — Disable fragmentation on sending socket

Default value: `false`
Expand All @@ -80,7 +74,13 @@ A network performance measurement tool
* `--with-mmsg-amount <WITH_MMSG_AMOUNT>` — Amount of message packs of gso_buffers to send when using sendmmsg

Default value: `1024`
* `--without-non-blocking` — Enable non-blocking socket
* `--with-socket-buffer` — Enable setting udp socket buffer size

Default value: `false`

Possible values: `true`, `false`

* `--without-non-blocking` — Disable non-blocking socket

Default value: `false`

Expand All @@ -95,6 +95,12 @@ A network performance measurement tool

Possible values: `true`, `false`

* `--single-connection` — Only one socket descriptor is used for all threads

Default value: `false`

Possible values: `true`, `false`

* `--markdown-help`

Possible values: `true`, `false`
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def run_test(run_config):

def write_results_to_csv(test_results, test_name, csv_file_path):
# FIXME: If new measurement parameters are added, the header should be updated
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'gso', 'gro']
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'gso', 'gro', 'single-socket']
file_exists = os.path.isfile(csv_file_path)

with open(csv_file_path, 'a', newline='') as csvfile:
Expand Down Expand Up @@ -162,7 +162,8 @@ def write_results_to_csv(test_results, test_name, csv_file_path):
'nonblocking': server_result['parameter']['socket_options']['nonblocking'],
'ip_fragmentation': client_result['parameter']['socket_options']['ip_fragmentation'],
'gso': client_result['parameter']['socket_options']['gso'],
'gro': server_result['parameter']['socket_options']['gro']
'gro': server_result['parameter']['socket_options']['gro'],
'single-socket': server_result['parameter']['single_socket']
}
writer.writerow(row)

Expand Down
3 changes: 2 additions & 1 deletion benchmarks/configs/send_methods_with_threads.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"parameters": {
"repetitions": 7,
"repetitions": 3,
"ip": "0.0.0.0",
"port": 45001,
"datagram-size": 1472,
Expand All @@ -13,6 +13,7 @@
"with-mmsg-amount": 20,
"with-ip-frag": false,
"without-non-blocking": false,
"single-socket": true,
"io-model": "select"
},
"sendmmsg": {
Expand Down
57 changes: 46 additions & 11 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use clap::Parser;
use log::{debug, error, info};

use std::{sync::mpsc::{self, Sender}, thread};
use std::{cmp::max, sync::mpsc::{self, Sender}, thread};

use crate::{node::{client::Client, server::Server, Node}, util::NPerfMode};
use crate::{net::socket::Socket, node::{client::Client, server::Server, Node}, util::NPerfMode};
use crate::net::{self, socket_options::SocketOptions};
use crate::util::{self, statistic::Statistic, ExchangeFunction};

Expand Down Expand Up @@ -83,6 +83,10 @@ pub struct nPerf {
#[arg(long, default_value_t = false)]
json: bool,

/// Only one socket descriptor is used for all threads
#[arg(long, default_value_t = false)]
single_socket: bool,

#[arg(long, hide = true)]
markdown_help: bool,
}
Expand Down Expand Up @@ -112,20 +116,39 @@ impl nPerf {
let mut fetch_handle: Vec<thread::JoinHandle<()>> = Vec::new();
let (tx, rx) = mpsc::channel();

// If single-connection, creating the socket and bind to port/connect must happen before the threads are spawned
let socket = if self.single_socket {
let socket = Socket::new(parameter.ip, self.port, parameter.socket_options).expect("Error creating socket");
if parameter.mode == util::NPerfMode::Client {
socket.connect().expect("Error connecting to remote host");
} else {
socket.bind().expect("Error binding to local port");
}
Some(socket)
} else {
None
};

for i in 0..self.parallel {
let tx: Sender<_> = tx.clone();
let port = if self.port != 45001 {
info!("Port is set to different port than 45001. Incrementing port number is disabled.");
let port = if self.port != 45001 || self.single_socket {
info!("Port is set to different port than 45001 or single_socket mode is enabled. Incrementing port number is disabled.");
self.port
} else {
self.port + i
};


let i = if self.single_socket {
0
} else {
i
};

fetch_handle.push(thread::spawn(move || {
let mut node:Box<dyn Node> = if parameter.mode == util::NPerfMode::Client {
Box::new(Client::new(i as u64, parameter.ip, port, parameter))
Box::new(Client::new(i as u64, parameter.ip, port, socket, parameter))
} else {
Box::new(Server::new(parameter.ip, port, parameter))
Box::new(Server::new(parameter.ip, port, socket, parameter))
};

match node.run(parameter.io_model) {
Expand All @@ -143,19 +166,30 @@ impl nPerf {

info!("Waiting for all threads to finish...");
let mut statistic = fetch_handle.into_iter().fold(Statistic::new(parameter), |acc: Statistic, handle| {
let stat = acc + match rx.recv_timeout(std::time::Duration::from_secs(parameter.test_runtime_length * 2)).expect("Timeout") {
Some(x) => x,
None => Statistic::new(parameter)
let stat = acc + match rx.recv_timeout(std::time::Duration::from_secs(max(parameter.test_runtime_length * 2, 120))) {
Ok(x) => {
match x {
Some(x) => x,
None => Statistic::new(parameter)
}
},
Err(_) => Statistic::new(parameter)
};

handle.join().unwrap();
stat
});

info!("All threads finished!");
if let Some(socket) = socket {
socket.close().expect("Error closing socket");
}

if statistic.amount_datagrams != 0 {
statistic.calculate_statistics();
statistic.print(parameter.output_format);
}

if !(self.run_infinite && parameter.mode == util::NPerfMode::Server) {
return Some(statistic);
}
Expand Down Expand Up @@ -226,7 +260,8 @@ impl nPerf {
self.datagram_size,
packet_buffer_size,
socket_options,
exchange_function
exchange_function,
self.single_socket
))
}

Expand Down
39 changes: 29 additions & 10 deletions src/net/socket.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@

use log::{info, trace, debug, error};
use log::{debug, error, info, trace, warn};
use std::{self, net::Ipv4Addr, io::Error};

use super::socket_options::{self, SocketOptions};

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct Socket {
ip: Ipv4Addr,
port: u16,
socket: i32
socket: i32,
sendmmsg_econnrefused_counter: u16
}

impl Socket {
Expand All @@ -20,7 +21,8 @@ impl Socket {
Some(Socket {
ip,
port,
socket
socket,
sendmmsg_econnrefused_counter: 0
})
}

Expand Down Expand Up @@ -89,10 +91,9 @@ impl Socket {
if buffer_len == 0 {
error!("Buffer is empty");
return Err("Buffer is empty");
} else {
debug!("Sending on socket {} with buffer size: {}", self.socket, buffer_len);
trace!("Buffer: {:?}", buffer)
}
debug!("Sending on socket {} with buffer size: {}", self.socket, buffer_len);
trace!("Buffer: {:?}", buffer);

let send_result = unsafe {
libc::send(
Expand Down Expand Up @@ -145,6 +146,10 @@ impl Socket {
error!("Connection refused while trying to send data!");
return Err("ECONNREFUSED");
},
Some(libc::EAGAIN) => {
warn!("Error EGAIN: Probably socket buffer is full!");
return Err("EAGAIN");
},
_ => {
error!("Errno when trying to send data with sendmsg(): {}", errno);
return Err("Failed to send data");
Expand All @@ -156,7 +161,7 @@ impl Socket {
Ok(send_result as usize)
}

pub fn sendmmsg(&self, mmsgvec: &mut [libc::mmsghdr]) -> Result<usize, &'static str> {
pub fn sendmmsg(&mut self, mmsgvec: &mut [libc::mmsghdr]) -> Result<usize, &'static str> {
let send_result: i32 = unsafe {
libc::sendmmsg(
self.socket,
Expand All @@ -173,14 +178,24 @@ impl Socket {
error!("Connection refused while trying to send data!");
return Err("ECONNREFUSED");
},
Some(libc::EAGAIN) => {
warn!("Error EGAIN: Probably socket buffer is full!");
return Err("EAGAIN");
},
_ => {
error!("Errno when trying to send data with sendmmsg(): {}", errno);
return Err("Failed to send data");
}
}
// sendmmsg() always returns 1, even when it should return ECONNREFUSED (when the server isn't up yet, similar to send()/sendmsg()). This is a workaround to detect ECONNREFUSED.
} else if send_result == 1 && mmsgvec.len() > 1 {
error!("sendmmsg() returned 1, but mmsgvec.len() > 1. This probably means that the first message was sent successfully, but the second one failed. We assume that the server is not running.");
return Err("ECONNREFUSED");
if self.sendmmsg_econnrefused_counter > 5 {
error!("sendmmsg() returned 1, but mmsgvec.len() > 1. This probably means that the first message was sent successfully, but the second one failed. We assume that the server is not running.");
return Err("ECONNREFUSED");
}
self.sendmmsg_econnrefused_counter += 1;
} else {
self.sendmmsg_econnrefused_counter = 0;
}

debug!("Sent {} mmsghdr(s)", send_result);
Expand Down Expand Up @@ -277,6 +292,10 @@ impl Socket {
socket_options::get_mss(self.socket)
}

pub fn get_socket_id(&self) -> i32 {
self.socket
}

fn create_sockaddr(address: Ipv4Addr, port: u16) -> libc::sockaddr_in {
let addr_u32: u32 = address.into();

Expand Down
26 changes: 22 additions & 4 deletions src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub struct Client {
}

impl Client {
pub fn new(test_id: u64, ip: Ipv4Addr, remote_port: u16, parameter: Parameter) -> Self {
info!("Current mode 'client' sending to remote host {}:{} with test ID {}", ip, remote_port, test_id);
let socket = Socket::new(ip, remote_port, parameter.socket_options).expect("Error creating socket");
pub fn new(test_id: u64, ip: Ipv4Addr, remote_port: u16, socket: Option<Socket>, parameter: Parameter) -> Self {
let socket: Socket = socket.unwrap_or_else(|| Socket::new(ip, remote_port, parameter.socket_options).expect("Error creating socket"));
info!("Current mode 'client' sending to remote host {}:{} with test ID {} on socketID {}", ip, remote_port, test_id, socket.get_socket_id());
let packet_buffer = Vec::from_iter((0..parameter.packet_buffer_size).map(|_| PacketBuffer::new(parameter.mss, parameter.datagram_size).expect("Error creating packet buffer")));

Client {
Expand Down Expand Up @@ -62,6 +62,11 @@ impl Client {
trace!("Sent datagram to remote host");
Ok(())
},
Err("EAGAIN") => {
// Reset next_packet_id to the last packet_id that was sent
self.next_packet_id -= amount_datagrams;
Ok(())
},
Err("ECONNREFUSED") => Err("Start the server first! Abort measurement..."),
Err(x) => Err(x)
}
Expand All @@ -80,6 +85,11 @@ impl Client {
Ok(())
},
Err("ECONNREFUSED") => Err("Start the server first! Abort measurement..."),
Err("EAGAIN") => {
// Reset next_packet_id to the last packet_id that was sent
self.next_packet_id -= amount_datagrams;
Ok(())
},
Err(x) => Err(x)
}
}
Expand All @@ -104,6 +114,11 @@ impl Client {
Ok(())
},
Err("ECONNREFUSED") => Err("Start the server first! Abort measurement..."),
Err("EAGAIN") => {
// Reset next_packet_id to the last packet_id that was sent
self.next_packet_id -= amount_datagrams;
Ok(())
},
Err(x) => Err(x)
}
}
Expand Down Expand Up @@ -141,7 +156,10 @@ impl Node for Client {
fn run(&mut self, io_model: IOModel) -> Result<Statistic, &'static str> {
self.fill_packet_buffers_with_repeating_pattern();
self.add_message_headers();
self.socket.connect().expect("Error connecting to remote host");

if !self.statistic.parameter.single_socket {
self.socket.connect().expect("Error connecting to remote host");
}

if let Ok(mss) = self.socket.get_mss() {
info!("On the current socket the MSS is {}", mss);
Expand Down
Loading

0 comments on commit 22fd495

Please sign in to comment.