Skip to content

Commit

Permalink
Terminate the packet writer early if stop signal is received
Browse files Browse the repository at this point in the history
If SIGTERM or SIGINT is received, we should not drain the channel before
stopping.
  • Loading branch information
jtt committed Dec 21, 2023
1 parent 46c20d4 commit ae3e50f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
11 changes: 9 additions & 2 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
fmt::Display,
sync::{
atomic::AtomicBool,
mpsc::{self, Receiver, SendError, Sender},
Arc, Condvar, Mutex,
},
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct Rx {
recv: Receiver<Packet>,
ctx: Arc<(Mutex<ChannelContext>, Condvar)>,
watermark_lo: u64,
stop: Arc<AtomicBool>,
}

/// Iterator for reading packets.
Expand All @@ -57,6 +59,9 @@ impl Iterator for IntoRxIter {
type Item = Packet;

fn next(&mut self) -> Option<Self::Item> {
if self.rx.stop.load(std::sync::atomic::Ordering::Relaxed) {
return None;
}
let (mux, cvar) = &*self.rx.ctx;
let packet = self.rx.recv.recv().ok();
if packet.is_some() {
Expand Down Expand Up @@ -126,12 +131,13 @@ impl Tx {
}

/// Creates a channel, returning [Tx] and [Rx] for a channel that allows
/// `hi` number of packets to be queued.
/// `hi` number of packets to be queued. `stop` can be used to signal that
/// [Rx] should terminate immediately instead of draining the buffer.
///
/// When hi number of packets are queued, the [Tx::write_packet()] will
/// block until packets are consumed from channel and only `lo` number of
/// packets are left.
pub fn create(hi: u64, lo: u64) -> (Tx, Rx) {
pub fn create(hi: u64, lo: u64, stop: Arc<AtomicBool>) -> (Tx, Rx) {
let (sender, recv) = mpsc::channel();
let ctx = Arc::new((
Mutex::new(ChannelContext {
Expand All @@ -151,6 +157,7 @@ pub fn create(hi: u64, lo: u64) -> (Tx, Rx) {
recv,
ctx: ctx2,
watermark_lo: lo,
stop,
},
)
}
18 changes: 12 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,19 @@ fn input_task(
terminate: Arc<AtomicBool>,
limit: Option<usize>,
) -> i32 {
let stop = terminate.clone();
let rd_handle: thread::JoinHandle<anyhow::Result<()>> = thread::Builder::new()
.name("pcap-reader".to_string())
.spawn(move || {
loop {
let inp = method.to_pcap_input()?;
let it = match limit {
Some(n) => Box::new(inp.packets(&terminate)?.take(n))
Some(n) => Box::new(inp.packets(&stop)?.take(n))
as Box<dyn Iterator<Item = input::Packet>>,
None => Box::new(inp.packets(&terminate)?),
None => Box::new(inp.packets(&stop)?),
};
pipe::read_packets_to(it, &tx)?;
if !loop_file || terminate.load(std::sync::atomic::Ordering::Relaxed) {
if !loop_file || stop.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
tracing::info!("pcap file iteration complete");
Expand All @@ -94,8 +95,13 @@ fn input_task(
.unwrap();
let mut ret = 0;
if let Err(err) = rd_handle.join().unwrap() {
tracing::error!("Error while reading packets: {}", err);
ret = -1;
// if we have received signal indicating we should stop, discard
// reader errors as the packet writer might have terminated
// already and reader just complains about closed channel.
if !terminate.load(std::sync::atomic::Ordering::Relaxed) {
tracing::error!("Error while reading packets: {}", err);
ret = -1;
}
}
tracing::trace!("Reader terminated");
match pipe.wait() {
Expand Down Expand Up @@ -238,7 +244,7 @@ fn main() {
rate = Rate::Full;
}

let (tx, rx) = channel::create(ch_hi, ch_low);
let (tx, rx) = channel::create(ch_hi, ch_low, terminate.clone());
let stat_period = params.stats.map(Duration::from_secs);
let (stats, stat_printer) = if let Some(period) = stat_period {
let (s, r) = pipe::Stats::periodic(period);
Expand Down

0 comments on commit ae3e50f

Please sign in to comment.