Skip to content

Commit a333213

Browse files
Merge pull request #20 from PickingUpPieces/feature/add-reuseport-sender
Feature/add reuseport sender
2 parents 93bd707 + 5bd0911 commit a333213

13 files changed

+101
-47
lines changed

benchmarks/benchmark.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def run_test(run_config):
122122

123123
def write_results_to_csv(test_results, test_name, csv_file_path):
124124
# FIXME: If new measurement parameters are added, the header should be updated
125-
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'gso', 'gro', 'single-socket', 'receive_buffer_size', 'send_buffer_size']
125+
header = ['test_name', 'run_number', 'run_name', 'amount_threads_client', 'amount_threads_server', 'amount_used_ports_server', 'test_runtime_length', 'datagram_size', 'packet_buffer_size', 'exchange_function', 'io_model', 'total_data_gbyte', 'amount_datagrams', 'amount_data_bytes', 'amount_reordered_datagrams', 'amount_duplicated_datagrams', 'amount_omitted_datagrams', 'amount_syscalls', 'amount_io_model_syscalls', 'data_rate_gbit', 'packet_loss', 'nonblocking', 'ip_fragmentation', 'reuseport', 'gso', 'gro', 'single-socket', 'receive_buffer_size', 'send_buffer_size']
126126
file_exists = os.path.isfile(csv_file_path)
127127

128128
with open(csv_file_path, 'a', newline='') as csvfile:
@@ -161,6 +161,7 @@ def write_results_to_csv(test_results, test_name, csv_file_path):
161161
'packet_loss': server_result['packet_loss'],
162162
'nonblocking': server_result['parameter']['socket_options']['nonblocking'],
163163
'ip_fragmentation': client_result['parameter']['socket_options']['ip_fragmentation'],
164+
'reuseport': server_result['parameter']['socket_options']['reuseport'],
164165
'gso': client_result['parameter']['socket_options']['gso'],
165166
'gro': server_result['parameter']['socket_options']['gro'],
166167
'single-socket': server_result['parameter']['single_socket'],

benchmarks/configs/benchmark_test_config.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"with-mmsg": false,
1818
"with-mmsg-amount": 1024,
1919
"without-non-blocking": false,
20+
"with-reuseport": false,
2021
"io-model": "select"
2122
},
2223
"server": {
@@ -50,6 +51,7 @@
5051
"with-mmsg": false,
5152
"with-mmsg-amount": 1024,
5253
"without-non-blocking": false,
54+
"with-reuseport": false,
5355
"io-model": "select"
5456
},
5557
"server": {
@@ -85,6 +87,7 @@
8587
"with-mmsg": false,
8688
"with-mmsg-amount": 1024,
8789
"without-non-blocking": false,
90+
"with-reuseport": false,
8891
"io-model": "select"
8992
},
9093
"server": {
@@ -118,6 +121,7 @@
118121
"with-mmsg": false,
119122
"with-mmsg-amount": 1024,
120123
"without-non-blocking": false,
124+
"with-reuseport": false,
121125
"io-model": "select"
122126
},
123127
"server": {

benchmarks/configs/send_methods_with_threads-max-mss.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"with-mmsg-amount": 20,
1414
"with-ip-frag": false,
1515
"without-non-blocking": false,
16+
"with-reuseport": false,
1617
"io-model": "select"
1718
},
1819
"sendmmsg": {

benchmarks/configs/send_methods_with_threads.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
"with-mmsg-amount": 20,
1414
"with-ip-frag": false,
1515
"without-non-blocking": false,
16-
"single-socket": true,
16+
"with-reuseport": true,
17+
"single-socket": false,
1718
"io-model": "select"
1819
},
1920
"sendmmsg": {

benchmarks/configs/sendmmsg_mmsg-vec_with_threads.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"with-gsro": true,
1313
"with-ip-frag": false,
1414
"without-non-blocking": false,
15+
"with-reuseport": false,
1516
"io-model": "select"
1617
},
1718
"|mmsg-vec| 1": {

src/command.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use log::{debug, error, info};
33

44
use std::{cmp::max, sync::mpsc::{self, Sender}, thread};
55

6-
use crate::{net::socket::Socket, node::{client::Client, server::Server, Node}, util::NPerfMode};
6+
use crate::{net::socket::Socket, node::{client::Client, server::Server, Node}, util::NPerfMode, DEFAULT_CLIENT_PORT};
77
use crate::net::{self, socket_options::SocketOptions};
88
use crate::util::{self, statistic::Statistic, ExchangeFunction};
99

@@ -20,7 +20,7 @@ pub struct nPerf {
2020
ip: String,
2121

2222
/// Port number to measure against/listen on. If port is defined with parallel mode, all client threads will measure against the same port.
23-
#[arg(short, long, default_value_t = crate::DEFAULT_PORT)]
23+
#[arg(short, long, default_value_t = crate::DEFAULT_SERVER_PORT)]
2424
port: u16,
2525

2626
/// Start multiple client/server threads in parallel. The port number will be incremented automatically.
@@ -87,6 +87,11 @@ pub struct nPerf {
8787
#[arg(long, default_value_t = false)]
8888
single_socket: bool,
8989

90+
// Enable reuseport option on socket. For parallel mode same port number for sending is used for all threads.
91+
#[arg(long, default_value_t = false)]
92+
with_reuseport: bool,
93+
94+
/// Show help in markdown format
9095
#[arg(long, hide = true)]
9196
markdown_help: bool,
9297
}
@@ -105,7 +110,7 @@ impl nPerf {
105110
None => { error!("Error running app"); return None; },
106111
};
107112

108-
match Self::parameter_check(&parameter) {
113+
match self.parameter_check(&parameter) {
109114
false => { error!("Invalid parameter!"); return None; },
110115
true => {}
111116
}
@@ -118,17 +123,21 @@ impl nPerf {
118123

119124
// If single-connection, creating the socket and bind to port/connect must happen before the threads are spawned
120125
let socket = if self.single_socket {
121-
let socket = Socket::new(parameter.ip, self.port, parameter.socket_options).expect("Error creating socket");
122126
if parameter.mode == util::NPerfMode::Client {
127+
let socket = Socket::new(parameter.ip, None, Some(self.port), parameter.socket_options).expect("Error creating socket");
123128
socket.connect().expect("Error connecting to remote host");
129+
Some(socket)
124130
} else {
131+
let socket = Socket::new(parameter.ip, Some(self.port), None, parameter.socket_options).expect("Error creating socket");
125132
socket.bind().expect("Error binding to local port");
133+
Some(socket)
126134
}
127-
Some(socket)
128135
} else {
129136
None
130137
};
131138

139+
let local_port_client: Option<u16> = if self.with_reuseport { Some(DEFAULT_CLIENT_PORT) } else { None };
140+
132141
for i in 0..self.parallel {
133142
let tx: Sender<_> = tx.clone();
134143
let port = if self.port != 45001 || self.single_socket {
@@ -138,15 +147,12 @@ impl nPerf {
138147
self.port + i
139148
};
140149

141-
let i = if self.single_socket {
142-
0
143-
} else {
144-
i
145-
};
150+
// Use same test id for all threads
151+
let i = if self.single_socket { 0 } else { i };
146152

147153
fetch_handle.push(thread::spawn(move || {
148154
let mut node:Box<dyn Node> = if parameter.mode == util::NPerfMode::Client {
149-
Box::new(Client::new(i as u64, parameter.ip, port, socket, parameter))
155+
Box::new(Client::new(i as u64, parameter.ip, local_port_client, port, socket, parameter))
150156
} else {
151157
Box::new(Server::new(parameter.ip, port, socket, parameter))
152158
};
@@ -265,12 +271,21 @@ impl nPerf {
265271
))
266272
}
267273

268-
fn parameter_check(parameter: &util::statistic::Parameter)-> bool {
274+
fn parameter_check(&self, parameter: &util::statistic::Parameter)-> bool {
269275
if parameter.datagram_size > crate::MAX_UDP_DATAGRAM_SIZE {
270276
error!("UDP datagram size is too big! Maximum is {}", crate::MAX_UDP_DATAGRAM_SIZE);
271277
return false;
272278
}
273279

280+
if self.with_reuseport && self.single_socket {
281+
error!("Reuseport and single socket option can't be used simultaneously!");
282+
return false;
283+
}
284+
285+
if self.with_reuseport && parameter.mode == util::NPerfMode::Server {
286+
error!("Reuseport option is enabled, but it is only available for client mode!");
287+
}
288+
274289
true
275290
}
276291

@@ -294,10 +309,11 @@ impl nPerf {
294309
SocketOptions::new(
295310
!self.without_non_blocking,
296311
self.with_ip_frag,
312+
self.with_reuseport,
297313
gso,
298314
gro,
299315
recv_buffer_size,
300316
send_buffer_size
301317
)
302318
}
303-
}
319+
}

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ const MAX_SOCKET_RECEIVE_BUFFER_SIZE: u32 = 212992 ; // 25MB; // The buffer size
1414
const DEFAULT_SOCKET_SEND_BUFFER_SIZE: u32 = 212992;
1515
const DEFAULT_SOCKET_RECEIVE_BUFFER_SIZE: u32 = 212992;
1616
const DEFAULT_DURATION: u64 = 10; // /* seconds */
17-
const DEFAULT_PORT: u16 = 45001;
17+
const DEFAULT_SERVER_PORT: u16 = 45001;
18+
const DEFAULT_CLIENT_PORT: u16 = 46001;
1819
const WAIT_CONTROL_MESSAGE: u64 = 200; // /* milliseconds */
1920

2021
// /* Maximum datagram size UDP is (64K - 1) - IP and UDP header sizes */

src/net/socket.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@ use super::socket_options::{self, SocketOptions};
77
#[derive(Debug, Copy, Clone)]
88
pub struct Socket {
99
ip: Ipv4Addr,
10-
port: u16,
10+
local_port: Option<u16>,
11+
remote_port: Option<u16>,
1112
socket: i32,
1213
sendmmsg_econnrefused_counter: u16
1314
}
1415

1516
impl Socket {
16-
pub fn new(ip: Ipv4Addr, port: u16, mut socket_options: SocketOptions) -> Option<Socket> {
17+
pub fn new(ip: Ipv4Addr, local_port: Option<u16>, remote_port: Option<u16>, mut socket_options: SocketOptions) -> Option<Socket> {
1718
let socket = Self::create_socket()?;
1819

1920
socket_options.set_socket_options(socket).expect("Error updating socket options");
2021

2122
Some(Socket {
2223
ip,
23-
port,
24+
local_port,
25+
remote_port,
2426
socket,
2527
sendmmsg_econnrefused_counter: 0
2628
})
@@ -39,7 +41,11 @@ impl Socket {
3941

4042

4143
pub fn connect(&self) -> Result<(), &'static str> {
42-
let sockaddr = Self::create_sockaddr(self.ip, self.port);
44+
if self.local_port.is_some() {
45+
self.bind()?;
46+
}
47+
48+
let sockaddr = Self::create_sockaddr(self.ip, self.remote_port.unwrap());
4349

4450
let connect_result = unsafe {
4551
libc::connect(
@@ -58,7 +64,7 @@ impl Socket {
5864
}
5965

6066
pub fn bind(&self) -> Result<(), &'static str> {
61-
let sockaddr = Self::create_sockaddr(self.ip, self.port);
67+
let sockaddr = Self::create_sockaddr(self.ip, self.local_port.unwrap());
6268

6369
let bind_result = unsafe {
6470
libc::bind(

src/net/socket_options.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ use std::io::Error;
77
pub struct SocketOptions {
88
nonblocking: bool,
99
ip_fragmentation: bool,
10-
pub gso: Option<u32>,
11-
pub gro: bool,
10+
reuseport: bool,
11+
gso: Option<u32>,
12+
gro: bool,
1213
recv_buffer_size: Option<u32>,
1314
send_buffer_size: Option<u32>,
1415
}
1516

1617
impl SocketOptions {
17-
pub fn new(nonblocking: bool, ip_fragmentation: bool, gso: Option<u32>, gro: bool, recv_buffer_size: Option<u32>, send_buffer_size: Option<u32>) -> Self {
18+
pub fn new(nonblocking: bool, ip_fragmentation: bool, reuseport: bool, gso: Option<u32>, gro: bool, recv_buffer_size: Option<u32>, send_buffer_size: Option<u32>) -> Self {
1819
SocketOptions {
1920
nonblocking,
2021
ip_fragmentation,
22+
reuseport,
2123
gso,
2224
gro,
2325
recv_buffer_size,
@@ -36,9 +38,9 @@ impl SocketOptions {
3638
if let Some(size) = self.gso {
3739
set_gso(socket, size)?;
3840
}
39-
if self.gro {
40-
set_gro(socket)?;
41-
}
41+
42+
set_gro(socket, self.gro)?;
43+
set_reuseport(socket, self.reuseport)?;
4244

4345
if let Some(size) = self.recv_buffer_size {
4446
set_buffer_size(socket, size, libc::SO_SNDBUF)?;
@@ -145,9 +147,9 @@ fn set_gso(socket: i32, gso_size: u32) -> Result<(), &'static str> {
145147
set_socket_option(socket, libc::SOL_UDP, libc::UDP_SEGMENT, gso_size)
146148
}
147149

148-
fn set_gro(socket: i32) -> Result<(), &'static str> {
149-
let value = 1;
150-
info!("Set socket option GRO to {}", value);
150+
fn set_gro(socket: i32, status: bool) -> Result<(), &'static str> {
151+
let value: u32 = if status { 1 } else { 0 };
152+
info!("Set socket option GRO to {}", status);
151153
set_socket_option(socket, libc::SOL_UDP, libc::UDP_GRO, value)
152154
}
153155

@@ -167,4 +169,10 @@ pub fn get_mss(socket: i32) -> Result<u32, &'static str> {
167169

168170
pub fn _get_gso_size(socket: i32) -> Result<u32, &'static str> {
169171
get_socket_option(socket, libc::SOL_UDP, libc::UDP_SEGMENT)
172+
}
173+
174+
pub fn set_reuseport(socket: i32, status: bool) -> Result<(), &'static str> {
175+
let value: u32 = if status { 1 } else { 0 };
176+
info!("Set socket option REUSEPORT to {}", status);
177+
set_socket_option(socket, libc::SOL_SOCKET, libc::SO_REUSEPORT, value)
170178
}

src/node/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ pub struct Client {
1616
}
1717

1818
impl Client {
19-
pub fn new(test_id: u64, ip: Ipv4Addr, remote_port: u16, socket: Option<Socket>, parameter: Parameter) -> Self {
20-
let socket: Socket = socket.unwrap_or_else(|| Socket::new(ip, remote_port, parameter.socket_options).expect("Error creating socket"));
21-
info!("Current mode 'client' sending to remote host {}:{} with test ID {} on socketID {}", ip, remote_port, test_id, socket.get_socket_id());
19+
pub fn new(test_id: u64, ip: Ipv4Addr, local_port: Option<u16>, remote_port: u16, socket: Option<Socket>, parameter: Parameter) -> Self {
20+
let socket: Socket = socket.unwrap_or_else(|| Socket::new(ip, local_port, Some(remote_port), parameter.socket_options).expect("Error creating socket"));
21+
info!("Current mode 'client' sending to remote host {}:{} from {}:{} with test ID {} on socketID {}", ip, remote_port, ip, local_port.unwrap_or_default(), test_id, socket.get_socket_id());
2222
let packet_buffer = Vec::from_iter((0..parameter.packet_buffer_size).map(|_| PacketBuffer::new(parameter.mss, parameter.datagram_size).expect("Error creating packet buffer")));
2323

2424
Client {

src/node/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub struct Server {
1717

1818
impl Server {
1919
pub fn new(ip: Ipv4Addr, local_port: u16, socket: Option<Socket>, parameter: Parameter) -> Server {
20-
let socket = socket.unwrap_or_else(|| Socket::new(ip, local_port, parameter.socket_options).expect("Error creating socket"));
20+
let socket = socket.unwrap_or_else(|| Socket::new(ip, Some(local_port), None, parameter.socket_options).expect("Error creating socket"));
2121
info!("Current mode 'server' listening on {}:{} with socketID {}", ip, local_port, socket.get_socket_id());
2222
let packet_buffer = Vec::from_iter((0..parameter.packet_buffer_size).map(|_| PacketBuffer::new(parameter.mss, parameter.datagram_size).expect("Error creating packet buffer")));
2323

tests/multithreading_tests.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,3 @@ fn multiple_clients_multiple_server() -> Result<(), Box<dyn std::error::Error>>{
2525
handle.join().unwrap();
2626
Ok(())
2727
}
28-
29-
#[test]
30-
fn multiple_clients_multiple_server_single_socket() -> Result<(), Box<dyn std::error::Error>>{
31-
let handle = common::start_nperf_server(Some(vec!["--parallel".to_string(), "2".to_string(), "--single-socket".to_string(), "--port".to_string(), "45003".to_string()]));
32-
33-
let args = vec!["client", "--with-msg", "--parallel", "2", "--port", "45003", "--single-socket"];
34-
if let Some(x) = nperf::nPerf::new().set_args(args).exec() {
35-
assert!(x.amount_datagrams > 10000);
36-
};
37-
38-
handle.join().unwrap();
39-
Ok(())
40-
}

tests/single_connection.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
mod common;
3+
4+
#[test]
5+
fn test_client_reuseport() -> Result<(), Box<dyn std::error::Error>>{
6+
let handle = common::start_nperf_server(Some(vec!["--port".to_string(), "45001".to_string(), "--parallel".to_string(), "2".to_string()]));
7+
8+
let args = vec!["client", "--port", "45001", "--parallel", "2", "--with-reuseport"];
9+
if let Some(x) = nperf::nPerf::new().set_args(args).exec() {
10+
assert!(x.amount_datagrams > 10000);
11+
};
12+
13+
handle.join().unwrap();
14+
Ok(())
15+
}
16+
17+
#[test]
18+
fn multiple_clients_multiple_server_single_socket() -> Result<(), Box<dyn std::error::Error>>{
19+
let handle = common::start_nperf_server(Some(vec!["--parallel".to_string(), "2".to_string(), "--single-socket".to_string(), "--port".to_string(), "45003".to_string()]));
20+
21+
let args = vec!["client", "--with-msg", "--parallel", "2", "--port", "45003", "--single-socket"];
22+
if let Some(x) = nperf::nPerf::new().set_args(args).exec() {
23+
assert!(x.amount_datagrams > 10000);
24+
};
25+
26+
handle.join().unwrap();
27+
Ok(())
28+
}

0 commit comments

Comments
 (0)