Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "netmap-rs"
version = "0.1.0"
edition = "2024"
edition = "2021"
authors = ["Meshack Bahati Ouma <bahatikylemeshack@gmail.com"]
description = "Safe, zero-cost abstractions for Netmap kernel-bypass networking"
license = "MIT OR Apache-2.0"
Expand All @@ -12,8 +12,8 @@ readme = 'README.md'

[features]
default = []
sys = ['netmap-min-sys']
fallback = []
sys = ['netmap-min-sys', 'core_affinity', 'reed-solomon-erasure'] # For FEC example
fallback = ['core_affinity'] # Also include for thread_per_ring example under fallback

[dependencies]
bitflags = "2.0"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Add to your `Cargo.toml`:

```toml

[dependecies]
[dependencies]
netmap-rs ="0.1"
```

Expand All @@ -37,11 +37,11 @@ fn main() -> Result<(), Error> {
.open()?;

let mut tx_ring = nm.tx_ring(0)?;
let mut rx-ring = nm.rx_ring(0)?;
let mut rx_ring = nm.rx_ring(0)?;

// send a packet
tx_ring.send(b"hello world")?;
tx.ring.sync();
tx_ring.sync();

// receive packets
while let Some(frame) = rx_ring.recv(){
Expand Down
126 changes: 96 additions & 30 deletions examples/fec.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,122 @@
//! Forward Error Correction (FEC) example using Reed-Solomon
#![cfg(feature = "sys")]

use netmap_rs::prelude::*;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::time::Duration;

const DATA_SHARDS: usize = 4;
const PARITY_SHARDS: usize = 2;
// Example: 2 data shards, 1 parity shard
const DATA_SHARDS: usize = 2;
const PARITY_SHARDS: usize = 1;
const TOTAL_SHARDS: usize = DATA_SHARDS + PARITY_SHARDS;

fn main() -> Result<(), Error> {
let nm = NetmapBuilder::new("netmap:eth0")
let nm = NetmapBuilder::new("netmap:eth0") // Replace with your interface
.num_tx_rings(1)
.num_rx_rings(1)
.open()?;
.build()?;

let mut tx_ring = nm.tx_ring(0)?;
let mut rx_ring = nm.rx_ring(0)?;

// Create encoder/decoder
let rs = ReedSolomon::new(DATA_SHARDS, PARITY_SHARDS)?;
let r = ReedSolomon::new(DATA_SHARDS, PARITY_SHARDS).unwrap();

// Original data
let mut data: Vec<Vec<u8>> = (0..DATA_SHARDS).map(|i| vec![i as u8; 128]).collect();
let original_data = b"Hello Netmap with FEC!".to_vec();
let chunk_size = (original_data.len() + DATA_SHARDS - 1) / DATA_SHARDS;
let mut shards = Vec::with_capacity(TOTAL_SHARDS);

// Add parity shards
let mut shards = data.clone();
shards.resize(DATA_SHARDS + PARITY_SHARDS, vec![0; 128]);
rs.encode(&mut shards)?;
for i in 0..DATA_SHARDS {
let start = i * chunk_size;
let end = std::cmp::min(start + chunk_size, original_data.len());
let mut shard = original_data[start..end].to_vec();
shard.resize(chunk_size, 0); // Pad if necessary
shards.push(shard);
}
for _ in 0..PARITY_SHARDS {
shards.push(vec![0u8; chunk_size]);
}

// Send all shards
for shard in &shards {
tx_ring.send(shard)?;
// Encode
r.encode(&mut shards).unwrap();

// Simulate sending shards (e.g., as separate packets)
println!("Sending shards...");
for (i, shard) in shards.iter().enumerate() {
// In a real scenario, prepend shard index or other metadata
let mut packet_data = vec![i as u8]; // Shard index
packet_data.extend_from_slice(shard);
tx_ring.send(&packet_data)?;
println!("Sent shard {}: len {}", i, shard.len());
}
tx_ring.sync();

// Simulate packet loss (drop 2 random shards)
let mut received_shards = shards.clone();
received_shards[1] = vec![0; 128]; // Mark as missing
received_shards[4] = vec![0; 128]; // Mark as missing
// Simulate receiving shards (and potentially losing one)
let mut received_shards: Vec<Option<Vec<u8>>> = vec![None; TOTAL_SHARDS];
let mut received_count = 0;

// Receive and reconstruct
let mut reconstructed = received_shards.clone();
rs.reconstruct(&mut reconstructed)?;
println!("Receiving shards (simulating loss of shard 0)...");
for _ in 0..10 { // Try to receive for a bit
rx_ring.sync();
while let Some(frame) = rx_ring.recv() {
let payload = frame.payload();
if payload.is_empty() { continue; }
let shard_index = payload[0] as usize;

// Verify reconstruction
for i in 0..DATA_SHARDS {
assert_eq!(
reconstructed[i], data[i],
"Reconstruction failed for shard {}",
i
);
// SIMULATE LOSS OF SHARD 0
if shard_index == 0 && received_shards[0].is_none() && received_count < DATA_SHARDS {
println!("Simulated loss of shard 0");
received_shards[0] = Some(vec![]); // Mark as lost for reconstruction logic
// but don't actually store it / increment received_count for it yet
// to ensure reconstruction is attempted.
// For this test, we'll actually skip storing it to force reconstruction.
continue;
}

if shard_index < TOTAL_SHARDS && received_shards[shard_index].is_none() {
received_shards[shard_index] = Some(payload[1..].to_vec());
received_count += 1;
println!("Received shard {}", shard_index);
}
if received_count >= DATA_SHARDS { break; }
}
if received_count >= DATA_SHARDS { break; }
std::thread::sleep(Duration::from_millis(50));
}


if received_count < DATA_SHARDS {
eprintln!("Did not receive enough shards to reconstruct.");
return Ok(());
}

println!("Attempting reconstruction...");
match r.reconstruct(&mut received_shards) {
Ok(_) => {
println!("Reconstruction successful!");
let mut reconstructed_data = Vec::new();
for i in 0..DATA_SHARDS {
if let Some(shard_data) = &received_shards[i] {
reconstructed_data.extend_from_slice(shard_data);
} else {
eprintln!("Missing data shard {} after reconstruction attempt.", i);
return Ok(());
}
}
// Trim padding if original length known, or handle as per application logic
reconstructed_data.truncate(original_data.len());

if reconstructed_data == original_data {
println!("Data successfully reconstructed: {:?}", String::from_utf8_lossy(&reconstructed_data));
} else {
eprintln!("Data mismatch after reconstruction!");
eprintln!("Original: {:?}", String::from_utf8_lossy(&original_data));
eprintln!("Reconstructed: {:?}", String::from_utf8_lossy(&reconstructed_data));
}
}
Err(e) => {
eprintln!("Reconstruction failed: {:?}", e);
}
}

println!("FEC test successful - data reconstructed correctly");
Ok(())
}
46 changes: 23 additions & 23 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
//! A simple ping-pong example demonstrating basic Netmap usage
#![cfg(feature = "sys")]

use netmap_rs::prelude::*;
use std::time::{Duration, Instant};
use std::time::Duration;

fn main() -> Result<(), Error> {
let nm = NetmapBuilder::new("netmap:eth0")
.num_tx_rings(1)
.nm_tx_rings(1)
.num_rx_rings(1)
.open()?;
.build()?;

let mut tx_ring = nm.tx_ring(0)?;
let mut rx_ring = nm.rx_ring(0)?;

let ping_data = b"PING";
let pong_data = b"PONG";

// Ping
tx_ring.send(ping_data)?;
// send a packet
tx_ring.send(b"hello world")?;
tx_ring.sync();

let start = Instant::now();
let timeout = Duration::from_secs(1);

// Wait for pong
loop {
if let Some(frame) = rx_ring.recv() {
if frame.payload() == pong_data {
let rtt = start.elapsed();
println!("Ping-pong round trip: {:?}", rtt);
break;
}
// receive packets
let mut received = false;
for _ in 0..10 {
// try a few times
while let Some(frame) = rx_ring.recv() {
println!("Received packet: {:?}", frame.payload());
assert_eq!(frame.payload(), b"hello world");
received = true;
break;
}

if start.elapsed() > timeout {
return Err(Error::WouldBlock);
if received {
break;
}
std::thread::sleep(Duration::from_millis(100)); // wait for packets
rx_ring.sync(); // tell kernel we are done with previous packets
}

if !received {
return Err(Error::WouldBlock); // or some other error
}

Ok(())
Expand Down
Loading
Loading