Skip to content

Commit

Permalink
fix exit on packet-count
Browse files Browse the repository at this point in the history
fixes issue where giving a packet count didn't exit the capture
thread early.
  • Loading branch information
ltn-chriskennedy committed Jan 17, 2024
1 parent 9e83cb1 commit 32204dd
Showing 1 changed file with 78 additions and 52 deletions.
130 changes: 78 additions & 52 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ async fn rscap() {
let (ptx, mut prx) = mpsc::channel::<Arc<Vec<u8>>>(pcap_channel_size);

let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let running_capture = running.clone();
let running_decoder = running.clone();
let running_zmq = running.clone();

// Spawn a new thread for packet capture
let capture_task = if cfg!(feature = "dpdk_enabled") && args.dpdk {
Expand All @@ -794,7 +796,7 @@ async fn rscap() {
};

let mut packets = Vec::new();
while running_clone.load(Ordering::SeqCst) {
while running_capture.load(Ordering::SeqCst) {
match port.rx_burst(&mut packets) {
Ok(_) => {
for packet in packets.drain(..) {
Expand Down Expand Up @@ -848,13 +850,16 @@ async fn rscap() {
let mut stats_last_sent_ts = Instant::now();
let mut packets_dropped = 0;

while running_clone.load(Ordering::SeqCst) {
while running_capture.load(Ordering::SeqCst) {
while let Some(packet) = stream.next().await {
match packet {
Ok(data) => {
count += 1;
let packet_data = Arc::new(data.to_vec());
ptx.send(packet_data).await.unwrap();
if !running_capture.load(Ordering::SeqCst) {
break;
}
let current_ts = Instant::now();
if pcap_stats
&& ((current_ts.duration_since(stats_last_sent_ts).as_secs() >= 30)
Expand All @@ -873,11 +878,15 @@ async fn rscap() {
// Print error and information about it
error!("PCap Capture Error occurred: {}", e);
if e == pcap::Error::TimeoutExpired {
// If timeout expired, check for running_capture
if !running_capture.load(Ordering::SeqCst) {
break;
}
// Timeout expired, continue and try again
continue;
} else {
// Exit the loop if an error occurs
running_clone.store(false, Ordering::SeqCst);
running_capture.store(false, Ordering::SeqCst);
break;
}
}
Expand All @@ -890,6 +899,9 @@ async fn rscap() {
stats.received, stats.dropped, stats.if_dropped
);
}
if !running_capture.load(Ordering::SeqCst) {
break;
}
}

let stats = stream.capture_mut().stats().unwrap();
Expand All @@ -913,28 +925,35 @@ async fn rscap() {
// Spawn a new thread for Decoder communication
let decoder_thread = tokio::spawn(async move {
loop {
if !running_decoder.load(Ordering::SeqCst) {
debug!("Decoder thread received stop signal.");
break;
}

if !decode_video {
// sleep for 1 second
tokio::time::sleep(Duration::from_secs(1)).await;
// Sleep for a short duration to prevent a tight loop
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
while let Some(mut batch) = drx.recv().await {
debug!("processing {} video packets in decoder thread", batch.len());
for stream_data in &batch {
{

// Use tokio::select to simultaneously wait for a new batch or a stop signal
tokio::select! {
Some(mut batch) = drx.recv() => {
debug!("Processing {} video packets in decoder thread", batch.len());
for stream_data in &batch {
let mut locked_processor = processor_clone.lock().unwrap();
locked_processor
.process_packet(&stream_data.packet)
.expect("Failed to process packet");
// MutexGuard is automatically dropped here
}
// Now the mutex is not locked across the await point
debug!(
"processed video packet with continuity counter {}",
stream_data.continuity_counter
);
// Clear the batch after processing
batch.clear();
}
_ = tokio::time::sleep(Duration::from_millis(100)), if !running_decoder.load(Ordering::SeqCst) => {
// This branch allows checking the running flag regularly
continue;
}
batch.clear();
}
}
});
Expand Down Expand Up @@ -965,47 +984,50 @@ async fn rscap() {

let mut dot_last_sent_ts = Instant::now();

while let Some(mut batch) = rx.recv().await {
for stream_data in batch.iter() {
// Serialize StreamData to Cap'n Proto message
let capnp_message = stream_data_to_capnp(stream_data)
.expect("Failed to convert to Cap'n Proto message");
let mut serialized_data = Vec::new();
capnp::serialize::write_message(&mut serialized_data, &capnp_message)
.expect("Failed to serialize Cap'n Proto message");

// Create ZeroMQ message from serialized Cap'n Proto data
let capnp_msg = zmq::Message::from(serialized_data);

// Send the Cap'n Proto message
if !no_zmq {
publisher.send(capnp_msg, zmq::SNDMORE).unwrap();
}
while running_zmq.load(Ordering::SeqCst) {
while let Some(mut batch) = rx.recv().await {
// Process and send messages
for stream_data in batch.iter() {
// Serialize StreamData to Cap'n Proto message
let capnp_message = stream_data_to_capnp(stream_data)
.expect("Failed to convert to Cap'n Proto message");
let mut serialized_data = Vec::new();
capnp::serialize::write_message(&mut serialized_data, &capnp_message)
.expect("Failed to serialize Cap'n Proto message");

// Create ZeroMQ message from serialized Cap'n Proto data
let capnp_msg = zmq::Message::from(serialized_data);

// Send the Cap'n Proto message
if !no_zmq {
publisher.send(capnp_msg, zmq::SNDMORE).unwrap();
}

let packet_slice = &stream_data.packet
[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len];
let packet_msg = if send_raw_stream {
// Write to file if output_file is provided
if let Some(file) = file.as_mut() {
if !no_progress && dot_last_sent_ts.elapsed().as_secs() >= 1 {
dot_last_sent_ts = Instant::now();
print!("*");
// Flush stdout to ensure the progress dots are printed
io::stdout().flush().unwrap();
let packet_slice = &stream_data.packet[stream_data.packet_start
..stream_data.packet_start + stream_data.packet_len];
let packet_msg = if send_raw_stream {
// Write to file if output_file is provided
if let Some(file) = file.as_mut() {
if !no_progress && dot_last_sent_ts.elapsed().as_secs() >= 1 {
dot_last_sent_ts = Instant::now();
print!("*");
// Flush stdout to ensure the progress dots are printed
io::stdout().flush().unwrap();
}
file.write_all(&packet_slice).unwrap();
}
file.write_all(&packet_slice).unwrap();
zmq::Message::from(packet_slice)
} else {
zmq::Message::from(Vec::new())
};

// Send the raw packet
if !no_zmq {
publisher.send(packet_msg, 0).unwrap();
}
zmq::Message::from(packet_slice)
} else {
zmq::Message::from(Vec::new())
};

// Send the raw packet
if !no_zmq {
publisher.send(packet_msg, 0).unwrap();
}
batch.clear();
}
batch.clear();
}
});

Expand All @@ -1031,6 +1053,10 @@ async fn rscap() {
let mut dot_last_sent_ts = Instant::now();
while let Some(packet) = prx.recv().await {
if packet_count > 0 && packets_captured > packet_count {
println!(
"\nPacket count limit reached {}, signaling termination...",
packet_count
);
running.store(false, Ordering::SeqCst);
break;
}
Expand Down

0 comments on commit 32204dd

Please sign in to comment.