diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index e259bb89..fee2cad5 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -58,10 +58,6 @@ struct Args { #[clap(long, env = "NO_PROGRESS", default_value_t = false)] no_progress: bool, - /// Force smpte2110 mode - #[clap(long, env = "SMPT2110", default_value_t = false)] - smpte2110: bool, - /// Output Filename #[clap(long, env = "OUTPUT_FILE", default_value = "output.ts")] output_file: String, @@ -84,7 +80,6 @@ async fn main() { let recv_raw_stream = args.recv_raw_stream; let packet_count = args.packet_count; let no_progress = args.no_progress; - let smpte2110 = args.smpte2110; let output_file: String = args.output_file; if silent { diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 3f902faa..7a67b3dc 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -502,11 +502,13 @@ const PES_HEADER_START_CODE: u8 = 0xE0; // Start codes for video streams range f fn pes_start_offset(packet: &[u8]) -> Option { if packet.len() < TS_PACKET_SIZE { + error!("PESStartOffset: Packet size is incorrect {}", packet.len()); return None; } let payload_unit_start_indicator = (packet[1] & 0x40) != 0; if !payload_unit_start_indicator { + error!("PESStartOffset: Packet does not have Payload Unit Start Indicator set"); return None; } @@ -514,11 +516,13 @@ fn pes_start_offset(packet: &[u8]) -> Option { if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 { let adaptation_field_length = packet[4] as usize; if 4 + adaptation_field_length + 4 >= packet.len() { + error!("PESStartOffset: Packet Adaption Field size is incorrect {}", packet.len()); return None; } let start_code = &packet[4 + adaptation_field_length..4 + adaptation_field_length + 3]; if start_code != PES_START_CODE_PREFIX { + error!("PESStartOffset: Packet does not have PES start code prefix"); return None; } @@ -536,6 +540,8 @@ fn pes_start_offset(packet: &[u8]) -> Option { } } + error!("PESStartOffset: Failed to find PES start offset"); + None } @@ -547,6 +553,7 @@ const H265_IDR_N_LP: u8 = 20; #[derive(Clone, PartialEq)] enum Codec { + NONE, MPEG2, H264, H265, @@ -555,6 +562,7 @@ enum Codec { impl fmt::Display for Codec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Codec::NONE => write!(f, "NONE"), Codec::MPEG2 => write!(f, "MPEG2"), Codec::H264 => write!(f, "H264"), Codec::H265 => write!(f, "H265"), @@ -564,6 +572,7 @@ impl fmt::Display for Codec { fn is_idr_frame(buffer: &[u8], codec: Codec) -> bool { match codec { + Codec::NONE => false, Codec::MPEG2 => is_mpeg2_keyframe(buffer), Codec::H264 => is_h264_idr_frame(buffer), Codec::H265 => is_h265_idr_frame(buffer), @@ -659,7 +668,7 @@ fn parse_pat(packet: &[u8]) -> Vec { // Check if Payload Unit Start Indicator (PUSI) is set let pusi = (packet[1] & 0x40) != 0; if !pusi { - // If PUSI is not set, this packet does not start a new PAT + // If Payload Unit Start Indicator is not set, this packet does not start a new PAT return entries; } @@ -960,6 +969,10 @@ struct Args { /// Use promiscuous mode #[clap(long, env = "PROMISCUOUS", default_value_t = false)] promiscuous: bool, + + /// Show the TR101290 p1, p2 and p3 errors if any + #[clap(long, env = "SHOW_TR101290", default_value_t = false)] + show_tr101290: bool, } fn main() { @@ -997,6 +1010,7 @@ fn rscap() { let no_progress = args.no_progress; let no_zmq = args.no_zmq; let promiscuous = args.promiscuous; + let show_tr101290 = args.show_tr101290; if args.smpte2110 { packet_size = 1500; // set packet size to 1500 for smpte2110 @@ -1286,8 +1300,8 @@ fn rscap() { // Start packet capture let mut batch = Vec::new(); let mut video_pid: Option = Some(0xFFFF); - let mut video_codec: Option = Some(Codec::H264); - let mut current_video_frame = Vec::new(); + let mut video_codec: Option = Some(Codec::NONE); + let mut current_video_frame = Vec::::new(); let mut is_frame_start = false; let mut pmt_info: PmtInfo = PmtInfo { pid: 0xFFFF, @@ -1342,12 +1356,16 @@ fn rscap() { let mut pid: u16 = 0xFFFF; if is_mpegts { - pid = extract_pid(&packet_chunk); + pid = stream_data.pid; // Handle PAT and PMT packets match pid { PAT_PID => { debug!("ProcessPacket: PAT packet detected with PID {}", pid); pmt_info = parse_and_store_pat(&packet_chunk); + // Print TR 101 290 errors + if show_tr101290 { + info!("STATUS::TR101290:ERRORS: {}", tr101290_errors); + } } _ => { // Check if this is a PMT packet @@ -1385,14 +1403,20 @@ fn rscap() { } // Check if this is a video packet - if video_pid != Some(0xFFFF) { + /*if video_pid != Some(0xFFFF) { if let Some(vid_pid) = video_pid { if pid == vid_pid { if let Some(pes_payload_offset) = - pes_start_offset(&stream_data.packet) + pes_start_offset(&packet_chunk) { let pes_payload = - &stream_data.packet[pes_payload_offset..]; + &packet_chunk[pes_payload_offset..]; + + info!("Video PID: Comparing {} to {} with pes payload offset of {} with codec {}", + pid, + vid_pid, + pes_payload_offset, + video_codec.as_ref().unwrap()); // Ensure video_codec is not None before calling is_idr_frame if let Some(codec) = video_codec.as_ref() { @@ -1415,14 +1439,22 @@ fn rscap() { ); } } - } - if is_frame_start { - current_video_frame.push(packet.clone()); + if is_frame_start { + // copy stream data into current_video_frame using Arc to avoid copying + //current_video_frame.push(stream_data.clone()); + } + } else { + error!("Video PID: PES payload offset not found in {} size chunk.", packet_chunk.len()); + hexdump( + &packet, + 0, + packet_chunk.len(), + ); } } } - } + }*/ } } } @@ -1435,12 +1467,6 @@ fn rscap() { pmt_info.pid, ); - // Print TR 101 290 errors - // print out each field of structure similar to json but not wrapping into json - - // TODO: print only after an error occurs or on a time interval - //debug!("STATUS::TR101290:ERRORS: {}", tr101290_errors); - if pid == 0x1FFF { // clear the Arc so it can be reused stream_data.packet = Arc::new(Vec::new()); // Create a new Arc> for the next packet @@ -1542,25 +1568,27 @@ fn process_smpte2110_packet( packet_size: usize, start_time: u64, ) -> Vec { - let start = payload_offset; let mut streams = Vec::new(); - if packet_size > start + 12 { - if packet[start] == 0x80 || packet[start] == 0x81 { - let rtp_packet = &packet[start..]; + // Check if the packet is large enough to contain an RTP header + if packet_size > payload_offset + 12 { + // Check for RTP header marker + if packet[payload_offset] == 0x80 || packet[payload_offset] == 0x81 { + let rtp_packet = &packet[payload_offset..]; // Create an RtpReader if let Ok(rtp) = RtpReader::new(rtp_packet) { - // Extract the timestamp + // Extract the timestamp and payload type let timestamp = rtp.timestamp(); - let payload_type = rtp.payload_type(); - let payload_offset = rtp.payload_offset(); + // Calculate the actual start of the RTP payload + let rtp_payload_offset = payload_offset + rtp.payload_offset(); - // size of rtp payload - let chunk_len = packet_size - payload_offset; + // Calculate the length of the RTP payload + let rtp_payload_length = packet_size - rtp_payload_offset; + // Extract SMPTE 2110 specific fields let line_length = get_line_length(rtp_packet); let line_number = get_line_number(rtp_packet); let extended_sequence_number = get_extended_sequence_number(rtp_packet); @@ -1569,20 +1597,24 @@ fn process_smpte2110_packet( let line_continuation = get_line_continuation(rtp_packet); - let pid = payload_type as u16; /* FIXME */ - let stream_type = payload_type.to_string(); /* FIXME */ + // Use payload type as PID (for the purpose of this example) + let pid = payload_type as u16; + let stream_type = payload_type.to_string(); + + // Create new StreamData instance let mut stream_data = StreamData::new( Arc::clone(packet), - payload_offset, - chunk_len, + rtp_payload_offset, + rtp_payload_length, pid, stream_type, start_time, timestamp as u64, - 0, /* fix me */ + 0, ); - // update streams details in stream_data structure - stream_data.update_stats(chunk_len, current_unix_timestamp_ms().unwrap_or(0)); + + // Update StreamData stats and RTP fields + stream_data.update_stats(rtp_payload_length, current_unix_timestamp_ms().unwrap_or(0)); stream_data.set_rtp_fields( timestamp, payload_type, @@ -1594,6 +1626,7 @@ fn process_smpte2110_packet( line_continuation, ); + // Add the StreamData to the stream list streams.push(stream_data); } else { hexdump(&packet, 0, packet_size);