Skip to content

Commit

Permalink
Improve fidelity of error counters and iat per packet pcap capture me…
Browse files Browse the repository at this point in the history
…trics (#69)

* global iat and cc errors

* add source ip/port and current cc errors vs. tally

* reset cc error stateful counter, only count 1 per cc disruption

---------

Co-authored-by: Chris Kennedy <chris.kennedy@ltnglobal.com>
  • Loading branch information
groovybits and ltn-chriskennedy authored Apr 5, 2024
1 parent bd4e841 commit d847b2d
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 34 deletions.
38 changes: 21 additions & 17 deletions schema/stream_data.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ struct StreamDataCapnp {
iatMin @12 :UInt64;
iatAvg @13 :UInt64;
errorCount @14 :UInt32;
lastArrivalTime @15 :UInt64;
lastSampleTime @16 :UInt64;
startTime @17 :UInt64;
totalBits @18 :UInt64;
count @19 :UInt32;
rtpTimestamp @20 :UInt32;
rtpPayloadType @21 :UInt8;
rtpPayloadTypeName @22 :Text;
rtpLineNumber @23 :UInt16;
rtpLineOffset @24 :UInt16;
rtpLineLength @25 :UInt16;
rtpFieldId @26 :UInt8;
rtpLineContinuation @27 :UInt8;
rtpExtendedSequenceNumber @28 :UInt16;
streamTypeNumber @29 :UInt8;
totalBitsSample @30 :UInt64;
captureTime @31 :UInt64;
currentErrorCount @15 :UInt32;
lastArrivalTime @16 :UInt64;
lastSampleTime @17 :UInt64;
startTime @18 :UInt64;
totalBits @19 :UInt64;
count @20 :UInt32;
rtpTimestamp @21 :UInt32;
rtpPayloadType @22 :UInt8;
rtpPayloadTypeName @23 :Text;
rtpLineNumber @24 :UInt16;
rtpLineOffset @25 :UInt16;
rtpLineLength @26 :UInt16;
rtpFieldId @27 :UInt8;
rtpLineContinuation @28 :UInt8;
rtpExtendedSequenceNumber @29 :UInt16;
streamTypeNumber @30 :UInt8;
totalBitsSample @31 :UInt64;
captureTime @32 :UInt64;
captureIat @33 :UInt64;
sourceIp @34 :Text;
sourcePort @35 :UInt32;
}
63 changes: 59 additions & 4 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ struct CombinedSchema {
streams: HashMap<u16, CombinedStreamData>,
bitrate_avg_global: u32,
iat_avg_global: u64,
cc_errors_global: u32,
current_cc_errors_global: u32,
}

fn is_cea_608(itu_t_t35_data: &sei::user_data_registered_itu_t_t35::ItuTT35) -> bool {
Expand Down Expand Up @@ -182,6 +184,11 @@ fn capnp_to_stream_data(bytes: &[u8]) -> capnp::Result<StreamData> {
|text_reader| text_reader.to_string().unwrap_or_default(),
);

let source_ip = reader.get_source_ip().map_or_else(
|_| String::new(),
|text_reader| text_reader.to_string().unwrap_or_default(),
);

// Same conversion for rtp_payload_type_name
let rtp_payload_type_name = reader.get_rtp_payload_type_name().map_or_else(
|_| String::new(),
Expand All @@ -204,6 +211,7 @@ fn capnp_to_stream_data(bytes: &[u8]) -> capnp::Result<StreamData> {
iat_min: reader.get_iat_min(),
iat_avg: reader.get_iat_avg(),
error_count: reader.get_error_count(),
current_error_count: reader.get_current_error_count(),
last_arrival_time: reader.get_last_arrival_time(),
last_sample_time: reader.get_last_sample_time(),
start_time: reader.get_start_time(),
Expand All @@ -224,6 +232,9 @@ fn capnp_to_stream_data(bytes: &[u8]) -> capnp::Result<StreamData> {
packet_len: 0,
stream_type_number: reader.get_stream_type_number(),
capture_time: reader.get_capture_time(),
capture_iat: reader.get_capture_iat(),
source_ip,
source_port: reader.get_source_port() as i32,
};

Ok(stream_data)
Expand Down Expand Up @@ -312,6 +323,10 @@ fn flatten_streams(
format!("{}.error_count", prefix),
json!(stream_data.error_count),
);
flat_structure.insert(
format!("{}.current_error_count", prefix),
json!(stream_data.current_error_count),
);
flat_structure.insert(
format!("{}.last_arrival_time", prefix),
json!(stream_data.last_arrival_time),
Expand Down Expand Up @@ -385,6 +400,18 @@ fn flatten_streams(
format!("{}.capture_time", prefix),
json!(stream_data.capture_time),
);
flat_structure.insert(
format!("{}.capture_iat", prefix),
json!(stream_data.capture_iat),
);
flat_structure.insert(
format!("{}.source_ip", prefix),
json!(stream_data.source_ip),
);
flat_structure.insert(
format!("{}.source_port", prefix),
json!(stream_data.source_port),
);
}

flat_structure
Expand Down Expand Up @@ -1103,26 +1130,38 @@ async fn main() {
// Initialize variables to accumulate global averages
let mut total_bitrate_avg: u64 = 0;
let mut total_iat_avg: u64 = 0;
let mut total_cc_errors: u64 = 0;
let mut total_cc_errors_current: u64 = 0;
let mut stream_count: u64 = 0;
let mut source_ip: String = String::new();
let mut source_port: u32 = 0;

// Process each stream to accumulate averages
for (_, grouping) in stream_groupings.iter() {
for stream_data in &grouping.stream_data_list {
total_bitrate_avg += stream_data.bitrate_avg as u64;
if stream_data.pmt_pid != 65535 {
total_iat_avg += stream_data.iat_avg;
}
total_iat_avg += stream_data.capture_iat;
total_cc_errors += stream_data.error_count as u64;
total_cc_errors_current += stream_data.current_error_count as u64;
source_port = stream_data.source_port as u32;
source_ip = stream_data.source_ip.clone();
stream_count += 1;
}
}

// Continuity Counter errors
let global_cc_errors = total_cc_errors;
let global_cc_errors_current = total_cc_errors_current;

// avg IAT
let global_iat_avg = if stream_count > 0 { total_iat_avg as f64 / stream_count as f64 } else { 0.0 };

// Calculate global averages
let global_bitrate_avg = if stream_count > 0 {
total_bitrate_avg
} else {
0
};
let global_iat_avg = if stream_count > 0 { total_iat_avg } else { 0 };
let current_timestamp = current_unix_timestamp_ms().unwrap_or(0); // stream_data.capture_time;

// Directly insert global statistics and timestamp into the flattened_data map
Expand All @@ -1134,10 +1173,26 @@ async fn main() {
"iat_avg_global".to_string(),
serde_json::json!(global_iat_avg),
);
flattened_data.insert(
"cc_errors_global".to_string(),
serde_json::json!(global_cc_errors),
);
flattened_data.insert(
"current_cc_errors_global".to_string(),
serde_json::json!(global_cc_errors_current),
);
flattened_data.insert(
"timestamp".to_string(),
serde_json::json!(current_timestamp),
);
flattened_data.insert(
"source_ip".to_string(),
serde_json::json!(source_ip),
);
flattened_data.insert(
"source_port".to_string(),
serde_json::json!(source_port),
);

// Convert the Map directly to a Value for serialization
let combined_stats = serde_json::Value::Object(flattened_data);
Expand Down
35 changes: 28 additions & 7 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ fn stream_data_to_capnp(stream_data: &StreamData) -> capnp::Result<Builder<HeapA
stream_data_msg.set_iat_min(stream_data.iat_min);
stream_data_msg.set_iat_avg(stream_data.iat_avg);
stream_data_msg.set_error_count(stream_data.error_count);
stream_data_msg.set_current_error_count(stream_data.current_error_count);
stream_data_msg.set_last_arrival_time(stream_data.last_arrival_time);
stream_data_msg.set_capture_time(stream_data.capture_time);
stream_data_msg.set_capture_iat(stream_data.capture_iat);
stream_data_msg.set_last_sample_time(stream_data.last_sample_time);
stream_data_msg.set_start_time(stream_data.start_time);
stream_data_msg.set_total_bits(stream_data.total_bits);
Expand All @@ -200,6 +202,8 @@ fn stream_data_to_capnp(stream_data: &StreamData) -> capnp::Result<Builder<HeapA
stream_data_msg.set_rtp_field_id(stream_data.rtp_field_id);
stream_data_msg.set_rtp_line_continuation(stream_data.rtp_line_continuation);
stream_data_msg.set_rtp_extended_sequence_number(stream_data.rtp_extended_sequence_number);
stream_data_msg.set_source_ip(stream_data.source_ip.as_str().into());
stream_data_msg.set_source_port(stream_data.source_port as u32);
}

Ok(message)
Expand Down Expand Up @@ -639,7 +643,7 @@ async fn rscap() {
let target_port = args.target_port;
let target_ip = args.target_ip;
let source_device = args.source_device;
let source_ip = args.source_ip;
let source_ip = args.source_ip.clone();
let source_protocol = args.source_protocol;
let source_port = args.source_port;
let debug_on = args.debug_on;
Expand Down Expand Up @@ -701,7 +705,7 @@ async fn rscap() {
// Initialize logging
let _ = env_logger::try_init();

let (ptx, mut prx) = mpsc::channel::<(Arc<Vec<u8>>, u64)>(pcap_channel_size);
let (ptx, mut prx) = mpsc::channel::<(Arc<Vec<u8>>, u64, u64)>(pcap_channel_size);

let running = Arc::new(AtomicBool::new(true));
let running_capture = running.clone();
Expand All @@ -726,6 +730,7 @@ async fn rscap() {
};

let mut packets = Vec::new();
let mut last_iat = 0;
while running_capture.load(Ordering::SeqCst) {
match port.rx_burst(&mut packets) {
Ok(_) => {
Expand All @@ -736,9 +741,15 @@ async fn rscap() {
// Convert to Arc<Vec<u8>> to maintain consistency with pcap logic
let packet_data = Arc::new(data.to_vec());
let timestamp = current_unix_timestamp_ms().unwrap_or(0);
let iat = if last_iat == 0 {
0
} else {
timestamp - last_iat
};
last_iat = timestamp;

// Send packet data to processing channel
ptx.send((packet_data, timestamp)).await.unwrap();
ptx.send((packet_data, timestamp, iat)).await.unwrap();

// Here you can implement additional processing such as parsing the packet,
// updating statistics, handling specific packet types, etc.
Expand Down Expand Up @@ -780,6 +791,7 @@ async fn rscap() {

let mut stats_last_sent_ts = Instant::now();
let mut packets_dropped = 0;
let mut last_iat = 0;

while running_capture.load(Ordering::SeqCst) {
while let Some(packet) = stream.next().await {
Expand All @@ -797,8 +809,14 @@ async fn rscap() {
.expect("Time went backwards");
let timestamp_ms = duration_since_epoch.as_secs() * 1_000
+ duration_since_epoch.subsec_millis() as u64;
let iat = if last_iat == 0 {
0
} else {
timestamp_ms - last_iat
};
last_iat = timestamp_ms;

ptx.send((packet_data, timestamp_ms)).await.unwrap();
ptx.send((packet_data, timestamp_ms, iat)).await.unwrap();

if !running_capture.load(Ordering::SeqCst) {
break;
Expand Down Expand Up @@ -1370,7 +1388,7 @@ async fn rscap() {
};

let mut dot_last_sent_ts = Instant::now();
while let Some((packet, timestamp)) = prx.recv().await {
while let Some((packet, timestamp, iat)) = prx.recv().await {
if packet_count > 0 && packets_captured > packet_count {
println!(
"\nPacket count limit reached {}, signaling termination...",
Expand Down Expand Up @@ -1399,7 +1417,7 @@ async fn rscap() {
}

let chunks = if is_mpegts {
process_mpegts_packet(payload_offset, packet, packet_size, start_time, timestamp)
process_mpegts_packet(payload_offset, packet, packet_size, start_time, timestamp, iat, args.source_ip.clone(), args.source_port)
} else {
process_smpte2110_packet(
payload_offset,
Expand All @@ -1408,6 +1426,9 @@ async fn rscap() {
start_time,
debug_smpte2110,
timestamp,
iat,
args.source_ip.clone(),
args.source_port,
)
};

Expand Down Expand Up @@ -1444,7 +1465,7 @@ async fn rscap() {
if pid == pmt_info.pid {
debug!("ProcessPacket: PMT packet detected with PID {}", pid);
// Update PID_MAP with new stream types
update_pid_map(&packet_chunk, &pmt_info.packet, timestamp);
update_pid_map(&packet_chunk, &pmt_info.packet, timestamp, iat, args.source_ip.clone(), args.source_port);
// Identify the video PID (if not already identified)
if let Some((new_pid, new_codec)) = identify_video_pid(&packet_chunk) {
if video_pid.map_or(true, |vp| vp != new_pid) {
Expand Down
Loading

0 comments on commit d847b2d

Please sign in to comment.