Skip to content

Commit

Permalink
Merge pull request #17 from PickingUpPieces/feature/single-connection
Browse files Browse the repository at this point in the history
Feature/single connection
  • Loading branch information
PickingUpPieces authored Feb 27, 2024
2 parents d366408 + feed204 commit a86a8b1
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 37 deletions.
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ A network performance measurement tool
* `-t`, `--time <TIME>` — Time to run the test

Default value: `10`
* `--with-gso` — Enable GSO on sending socket
* `--with-gsro` — Enable GSO/GRO on socket

Default value: `false`

Expand All @@ -53,12 +53,6 @@ A network performance measurement tool
* `--with-mss <WITH_MSS>` — Set transmit buffer size. Gets overwritten by GSO/GRO buffer size if GSO/GRO is enabled

Default value: `1472`
* `--with-gro` — Enable GRO on receiving socket

Default value: `false`

Possible values: `true`, `false`

* `--with-ip-frag` — Disable fragmentation on sending socket

Default value: `false`
Expand All @@ -80,7 +74,13 @@ A network performance measurement tool
* `--with-mmsg-amount <WITH_MMSG_AMOUNT>` — Amount of message packs of gso_buffers to send when using sendmmsg

Default value: `1024`
* `--without-non-blocking` — Enable non-blocking socket
* `--with-socket-buffer` — Enable setting udp socket buffer size

Default value: `false`

Possible values: `true`, `false`

* `--without-non-blocking` — Disable non-blocking socket

Default value: `false`

Expand All @@ -95,6 +95,12 @@ A network performance measurement tool

Possible values: `true`, `false`

* `--single-connection` — Only one socket descriptor is used for all threads

Default value: `false`

Possible values: `true`, `false`

* `--markdown-help`

Possible values: `true`, `false`
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def run_test(run_config):

def write_results_to_csv(test_results, test_name, csv_file_path):
# FIXME: If new measurement parameters are added, the header should be updated
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'gso', 'gro']
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'gso', 'gro', 'single-socket']
file_exists = os.path.isfile(csv_file_path)

with open(csv_file_path, 'a', newline='') as csvfile:
Expand Down Expand Up @@ -162,7 +162,8 @@ def write_results_to_csv(test_results, test_name, csv_file_path):
'nonblocking': server_result['parameter']['socket_options']['nonblocking'],
'ip_fragmentation': client_result['parameter']['socket_options']['ip_fragmentation'],
'gso': client_result['parameter']['socket_options']['gso'],
'gro': server_result['parameter']['socket_options']['gro']
'gro': server_result['parameter']['socket_options']['gro'],
'single-socket': server_result['parameter']['single_socket']
}
writer.writerow(row)

Expand Down
3 changes: 2 additions & 1 deletion benchmarks/configs/send_methods_with_threads.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"parameters": {
"repetitions": 7,
"repetitions": 3,
"ip": "0.0.0.0",
"port": 45001,
"datagram-size": 1472,
Expand All @@ -13,6 +13,7 @@
"with-mmsg-amount": 20,
"with-ip-frag": false,
"without-non-blocking": false,
"single-socket": true,
"io-model": "select"
},
"sendmmsg": {
Expand Down
43 changes: 36 additions & 7 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use log::{debug, error, info};

use std::{cmp::max, sync::mpsc::{self, Sender}, thread};

use crate::{node::{client::Client, server::Server, Node}, util::NPerfMode};
use crate::{net::socket::Socket, node::{client::Client, server::Server, Node}, util::NPerfMode};
use crate::net::{self, socket_options::SocketOptions};
use crate::util::{self, statistic::Statistic, ExchangeFunction};

Expand Down Expand Up @@ -83,6 +83,10 @@ pub struct nPerf {
#[arg(long, default_value_t = false)]
json: bool,

/// Only one socket descriptor is used for all threads
#[arg(long, default_value_t = false)]
single_socket: bool,

#[arg(long, hide = true)]
markdown_help: bool,
}
Expand Down Expand Up @@ -112,20 +116,39 @@ impl nPerf {
let mut fetch_handle: Vec<thread::JoinHandle<()>> = Vec::new();
let (tx, rx) = mpsc::channel();

// If single-connection, creating the socket and bind to port/connect must happen before the threads are spawned
let socket = if self.single_socket {
let socket = Socket::new(parameter.ip, self.port, parameter.socket_options).expect("Error creating socket");
if parameter.mode == util::NPerfMode::Client {
socket.connect().expect("Error connecting to remote host");
} else {
socket.bind().expect("Error binding to local port");
}
Some(socket)
} else {
None
};

for i in 0..self.parallel {
let tx: Sender<_> = tx.clone();
let port = if self.port != 45001 {
info!("Port is set to different port than 45001. Incrementing port number is disabled.");
let port = if self.port != 45001 || self.single_socket {
info!("Port is set to different port than 45001 or single_socket mode is enabled. Incrementing port number is disabled.");
self.port
} else {
self.port + i
};


let i = if self.single_socket {
0
} else {
i
};

fetch_handle.push(thread::spawn(move || {
let mut node:Box<dyn Node> = if parameter.mode == util::NPerfMode::Client {
Box::new(Client::new(i as u64, parameter.ip, port, parameter))
Box::new(Client::new(i as u64, parameter.ip, port, socket, parameter))
} else {
Box::new(Server::new(parameter.ip, port, parameter))
Box::new(Server::new(parameter.ip, port, socket, parameter))
};

match node.run(parameter.io_model) {
Expand Down Expand Up @@ -156,12 +179,17 @@ impl nPerf {
handle.join().unwrap();
stat
});

info!("All threads finished!");
if let Some(socket) = socket {
socket.close().expect("Error closing socket");
}

if statistic.amount_datagrams != 0 {
statistic.calculate_statistics();
statistic.print(parameter.output_format);
}

if !(self.run_infinite && parameter.mode == util::NPerfMode::Server) {
return Some(statistic);
}
Expand Down Expand Up @@ -232,7 +260,8 @@ impl nPerf {
self.datagram_size,
packet_buffer_size,
socket_options,
exchange_function
exchange_function,
self.single_socket
))
}

Expand Down
35 changes: 25 additions & 10 deletions src/net/socket.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@

use log::{info, trace, debug, error};
use log::{debug, error, info, trace, warn};
use std::{self, net::Ipv4Addr, io::Error};

use super::socket_options::{self, SocketOptions};

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct Socket {
ip: Ipv4Addr,
port: u16,
socket: i32
socket: i32,
sendmmsg_econnrefused_counter: u16
}

impl Socket {
Expand All @@ -20,7 +21,8 @@ impl Socket {
Some(Socket {
ip,
port,
socket
socket,
sendmmsg_econnrefused_counter: 0
})
}

Expand Down Expand Up @@ -89,10 +91,9 @@ impl Socket {
if buffer_len == 0 {
error!("Buffer is empty");
return Err("Buffer is empty");
} else {
debug!("Sending on socket {} with buffer size: {}", self.socket, buffer_len);
trace!("Buffer: {:?}", buffer)
}
debug!("Sending on socket {} with buffer size: {}", self.socket, buffer_len);
trace!("Buffer: {:?}", buffer);

let send_result = unsafe {
libc::send(
Expand Down Expand Up @@ -145,6 +146,10 @@ impl Socket {
error!("Connection refused while trying to send data!");
return Err("ECONNREFUSED");
},
Some(libc::EAGAIN) => {
warn!("Error EGAIN: Probably socket buffer is full!");
return Err("EAGAIN");
},
_ => {
error!("Errno when trying to send data with sendmsg(): {}", errno);
return Err("Failed to send data");
Expand All @@ -156,7 +161,7 @@ impl Socket {
Ok(send_result as usize)
}

pub fn sendmmsg(&self, mmsgvec: &mut [libc::mmsghdr]) -> Result<usize, &'static str> {
pub fn sendmmsg(&mut self, mmsgvec: &mut [libc::mmsghdr]) -> Result<usize, &'static str> {
let send_result: i32 = unsafe {
libc::sendmmsg(
self.socket,
Expand All @@ -173,14 +178,24 @@ impl Socket {
error!("Connection refused while trying to send data!");
return Err("ECONNREFUSED");
},
Some(libc::EAGAIN) => {
warn!("Error EGAIN: Probably socket buffer is full!");
return Err("EAGAIN");
},
_ => {
error!("Errno when trying to send data with sendmmsg(): {}", errno);
return Err("Failed to send data");
}
}
// sendmmsg() always returns 1, even when it should return ECONNREFUSED (when the server isn't up yet, similar to send()/sendmsg()). This is a workaround to detect ECONNREFUSED.
} else if send_result == 1 && mmsgvec.len() > 1 {
error!("sendmmsg() returned 1, but mmsgvec.len() > 1. This probably means that the first message was sent successfully, but the second one failed. We assume that the server is not running.");
return Err("ECONNREFUSED");
if self.sendmmsg_econnrefused_counter > 5 {
error!("sendmmsg() returned 1, but mmsgvec.len() > 1. This probably means that the first message was sent successfully, but the second one failed. We assume that the server is not running.");
return Err("ECONNREFUSED");
}
self.sendmmsg_econnrefused_counter += 1;
} else {
self.sendmmsg_econnrefused_counter = 0;
}

debug!("Sent {} mmsghdr(s)", send_result);
Expand Down
19 changes: 16 additions & 3 deletions src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub struct Client {
}

impl Client {
pub fn new(test_id: u64, ip: Ipv4Addr, remote_port: u16, parameter: Parameter) -> Self {
pub fn new(test_id: u64, ip: Ipv4Addr, remote_port: u16, socket: Option<Socket>, parameter: Parameter) -> Self {
info!("Current mode 'client' sending to remote host {}:{} with test ID {}", ip, remote_port, test_id);
let socket = Socket::new(ip, remote_port, parameter.socket_options).expect("Error creating socket");
let socket = socket.unwrap_or(Socket::new(ip, remote_port, parameter.socket_options).expect("Error creating socket"));
let packet_buffer = Vec::from_iter((0..parameter.packet_buffer_size).map(|_| PacketBuffer::new(parameter.mss, parameter.datagram_size).expect("Error creating packet buffer")));

Client {
Expand Down Expand Up @@ -80,6 +80,11 @@ impl Client {
Ok(())
},
Err("ECONNREFUSED") => Err("Start the server first! Abort measurement..."),
Err("EAGAIN") => {
// Reset next_packet_id to the last packet_id that was sent
self.next_packet_id -= amount_datagrams;
Ok(())
},
Err(x) => Err(x)
}
}
Expand All @@ -104,6 +109,11 @@ impl Client {
Ok(())
},
Err("ECONNREFUSED") => Err("Start the server first! Abort measurement..."),
Err("EAGAIN") => {
// Reset next_packet_id to the last packet_id that was sent
self.next_packet_id -= amount_datagrams;
Ok(())
},
Err(x) => Err(x)
}
}
Expand Down Expand Up @@ -141,7 +151,10 @@ impl Node for Client {
fn run(&mut self, io_model: IOModel) -> Result<Statistic, &'static str> {
self.fill_packet_buffers_with_repeating_pattern();
self.add_message_headers();
self.socket.connect().expect("Error connecting to remote host");

if !self.statistic.parameter.single_socket {
self.socket.connect().expect("Error connecting to remote host");
}

if let Ok(mss) = self.socket.get_mss() {
info!("On the current socket the MSS is {}", mss);
Expand Down
20 changes: 15 additions & 5 deletions src/node/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub struct Server {
}

impl Server {
pub fn new(ip: Ipv4Addr, local_port: u16, parameter: Parameter) -> Server {
pub fn new(ip: Ipv4Addr, local_port: u16, socket: Option<Socket>, parameter: Parameter) -> Server {
info!("Current mode 'server' listening on {}:{}", ip, local_port);
let socket = Socket::new(ip, local_port, parameter.socket_options).expect("Error creating socket");
let socket = socket.unwrap_or(Socket::new(ip, local_port, parameter.socket_options).expect("Error creating socket"));
let packet_buffer = Vec::from_iter((0..parameter.packet_buffer_size).map(|_| PacketBuffer::new(parameter.mss, parameter.datagram_size).expect("Error creating packet buffer")));

Server {
Expand Down Expand Up @@ -133,7 +133,12 @@ impl Server {
Err("INIT_MESSAGE_RECEIVED")
},
MessageType::MEASUREMENT => {
let measurement = self.measurements.get_mut(&test_id).expect("Error getting statistic in last message: test id not found");
let measurement = if let Some(x) = self.measurements.get_mut(&test_id) {
x
} else {
self.measurements.insert(test_id, Measurement::new(self.parameter));
self.measurements.get_mut(&test_id).expect("Error getting statistic in measurement message: test id not found")
};
if !measurement.first_packet_received {
info!("First packet received from test {}!", test_id);
measurement.start_time = Instant::now();
Expand All @@ -157,7 +162,10 @@ impl Server {
impl Node for Server {
fn run(&mut self, io_model: IOModel) -> Result<Statistic, &'static str>{
info!("Current mode: server");
self.socket.bind().expect("Error binding socket");

if !self.parameter.single_socket {
self.socket.bind().expect("Error binding to local port");
}

info!("Start server loop...");
let mut read_fds: libc::fd_set = unsafe { self.socket.create_fdset() };
Expand Down Expand Up @@ -197,7 +205,9 @@ impl Node for Server {
statistic.amount_syscalls += 1;
}

self.socket.close()?;
if !self.parameter.single_socket {
self.socket.close()?;
}

debug!("Finished receiving data from remote host");
// Fold over all statistics, and calculate the final statistic
Expand Down
4 changes: 3 additions & 1 deletion src/util/statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ pub struct Parameter {
pub packet_buffer_size: usize,
pub socket_options: SocketOptions,
pub exchange_function: super::ExchangeFunction,
pub single_socket: bool,
}

impl Parameter {
#[allow(clippy::too_many_arguments)]
pub fn new(mode: super::NPerfMode, ip: std::net::Ipv4Addr, amount_threads: u16, amount_ports: u16, output_format: OutputFormat, io_model: super::IOModel, test_runtime_length: u64, mss: u32, datagram_size: u32, packet_buffer_size: usize, socket_options: SocketOptions, exchange_function: super::ExchangeFunction) -> Parameter {
pub fn new(mode: super::NPerfMode, ip: std::net::Ipv4Addr, amount_threads: u16, amount_ports: u16, output_format: OutputFormat, io_model: super::IOModel, test_runtime_length: u64, mss: u32, datagram_size: u32, packet_buffer_size: usize, socket_options: SocketOptions, exchange_function: super::ExchangeFunction, single_socket: bool) -> Parameter {
Parameter {
mode,
ip,
Expand All @@ -200,6 +201,7 @@ impl Parameter {
packet_buffer_size,
socket_options,
exchange_function,
single_socket
}
}
}

0 comments on commit a86a8b1

Please sign in to comment.