Skip to content

Commit

Permalink
get smpte2110 payload framed properly (#39)
Browse files Browse the repository at this point in the history
* print tr101290 errors if requested each PAT emit

error messages added to video packet detection. WIP still fails.

* smpte2110 offset adjustment to focus on payload

---------

Co-authored-by: Chris Kennedy <chris.kennedy@ltnglobal.com>
  • Loading branch information
groovybits and ltn-chriskennedy committed Jan 3, 2024
1 parent ef9cbf5 commit 6c52ba0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 38 deletions.
5 changes: 0 additions & 5 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ struct Args {
#[clap(long, env = "NO_PROGRESS", default_value_t = false)]
no_progress: bool,

/// Force smpte2110 mode
#[clap(long, env = "SMPT2110", default_value_t = false)]
smpte2110: bool,

/// Output Filename
#[clap(long, env = "OUTPUT_FILE", default_value = "output.ts")]
output_file: String,
Expand All @@ -84,7 +80,6 @@ async fn main() {
let recv_raw_stream = args.recv_raw_stream;
let packet_count = args.packet_count;
let no_progress = args.no_progress;
let smpte2110 = args.smpte2110;
let output_file: String = args.output_file;

if silent {
Expand Down
99 changes: 66 additions & 33 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,23 +502,27 @@ const PES_HEADER_START_CODE: u8 = 0xE0; // Start codes for video streams range f

fn pes_start_offset(packet: &[u8]) -> Option<usize> {
if packet.len() < TS_PACKET_SIZE {
error!("PESStartOffset: Packet size is incorrect {}", packet.len());
return None;
}

let payload_unit_start_indicator = (packet[1] & 0x40) != 0;
if !payload_unit_start_indicator {
error!("PESStartOffset: Packet does not have Payload Unit Start Indicator set");
return None;
}

let adaptation_field_control = (packet[3] & 0x30) >> 4;
if adaptation_field_control == 0x02 || adaptation_field_control == 0x03 {
let adaptation_field_length = packet[4] as usize;
if 4 + adaptation_field_length + 4 >= packet.len() {
error!("PESStartOffset: Packet Adaption Field size is incorrect {}", packet.len());
return None;
}

let start_code = &packet[4 + adaptation_field_length..4 + adaptation_field_length + 3];
if start_code != PES_START_CODE_PREFIX {
error!("PESStartOffset: Packet does not have PES start code prefix");
return None;
}

Expand All @@ -536,6 +540,8 @@ fn pes_start_offset(packet: &[u8]) -> Option<usize> {
}
}

error!("PESStartOffset: Failed to find PES start offset");

None
}

Expand All @@ -547,6 +553,7 @@ const H265_IDR_N_LP: u8 = 20;

#[derive(Clone, PartialEq)]
enum Codec {
NONE,
MPEG2,
H264,
H265,
Expand All @@ -555,6 +562,7 @@ enum Codec {
impl fmt::Display for Codec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Codec::NONE => write!(f, "NONE"),
Codec::MPEG2 => write!(f, "MPEG2"),
Codec::H264 => write!(f, "H264"),
Codec::H265 => write!(f, "H265"),
Expand All @@ -564,6 +572,7 @@ impl fmt::Display for Codec {

fn is_idr_frame(buffer: &[u8], codec: Codec) -> bool {
match codec {
Codec::NONE => false,
Codec::MPEG2 => is_mpeg2_keyframe(buffer),
Codec::H264 => is_h264_idr_frame(buffer),
Codec::H265 => is_h265_idr_frame(buffer),
Expand Down Expand Up @@ -659,7 +668,7 @@ fn parse_pat(packet: &[u8]) -> Vec<PatEntry> {
// Check if Payload Unit Start Indicator (PUSI) is set
let pusi = (packet[1] & 0x40) != 0;
if !pusi {
// If PUSI is not set, this packet does not start a new PAT
// If Payload Unit Start Indicator is not set, this packet does not start a new PAT
return entries;
}

Expand Down Expand Up @@ -960,6 +969,10 @@ struct Args {
/// Use promiscuous mode
#[clap(long, env = "PROMISCUOUS", default_value_t = false)]
promiscuous: bool,

/// Show the TR101290 p1, p2 and p3 errors if any
#[clap(long, env = "SHOW_TR101290", default_value_t = false)]
show_tr101290: bool,
}

fn main() {
Expand Down Expand Up @@ -997,6 +1010,7 @@ fn rscap() {
let no_progress = args.no_progress;
let no_zmq = args.no_zmq;
let promiscuous = args.promiscuous;
let show_tr101290 = args.show_tr101290;

if args.smpte2110 {
packet_size = 1500; // set packet size to 1500 for smpte2110
Expand Down Expand Up @@ -1286,8 +1300,8 @@ fn rscap() {
// Start packet capture
let mut batch = Vec::new();
let mut video_pid: Option<u16> = Some(0xFFFF);
let mut video_codec: Option<Codec> = Some(Codec::H264);
let mut current_video_frame = Vec::new();
let mut video_codec: Option<Codec> = Some(Codec::NONE);
let mut current_video_frame = Vec::<StreamData>::new();
let mut is_frame_start = false;
let mut pmt_info: PmtInfo = PmtInfo {
pid: 0xFFFF,
Expand Down Expand Up @@ -1342,12 +1356,16 @@ fn rscap() {
let mut pid: u16 = 0xFFFF;

if is_mpegts {
pid = extract_pid(&packet_chunk);
pid = stream_data.pid;
// Handle PAT and PMT packets
match pid {
PAT_PID => {
debug!("ProcessPacket: PAT packet detected with PID {}", pid);
pmt_info = parse_and_store_pat(&packet_chunk);
// Print TR 101 290 errors
if show_tr101290 {
info!("STATUS::TR101290:ERRORS: {}", tr101290_errors);
}
}
_ => {
// Check if this is a PMT packet
Expand Down Expand Up @@ -1385,14 +1403,20 @@ fn rscap() {
}

// Check if this is a video packet
if video_pid != Some(0xFFFF) {
/*if video_pid != Some(0xFFFF) {
if let Some(vid_pid) = video_pid {
if pid == vid_pid {
if let Some(pes_payload_offset) =
pes_start_offset(&stream_data.packet)
pes_start_offset(&packet_chunk)
{
let pes_payload =
&stream_data.packet[pes_payload_offset..];
&packet_chunk[pes_payload_offset..];
info!("Video PID: Comparing {} to {} with pes payload offset of {} with codec {}",
pid,
vid_pid,
pes_payload_offset,
video_codec.as_ref().unwrap());
// Ensure video_codec is not None before calling is_idr_frame
if let Some(codec) = video_codec.as_ref() {
Expand All @@ -1415,14 +1439,22 @@ fn rscap() {
);
}
}
}
if is_frame_start {
current_video_frame.push(packet.clone());
if is_frame_start {
// copy stream data into current_video_frame using Arc to avoid copying
//current_video_frame.push(stream_data.clone());
}
} else {
error!("Video PID: PES payload offset not found in {} size chunk.", packet_chunk.len());
hexdump(
&packet,
0,
packet_chunk.len(),
);
}
}
}
}
}*/
}
}
}
Expand All @@ -1435,12 +1467,6 @@ fn rscap() {
pmt_info.pid,
);

// Print TR 101 290 errors
// print out each field of structure similar to json but not wrapping into json

// TODO: print only after an error occurs or on a time interval
//debug!("STATUS::TR101290:ERRORS: {}", tr101290_errors);

if pid == 0x1FFF {
// clear the Arc so it can be reused
stream_data.packet = Arc::new(Vec::new()); // Create a new Arc<Vec<u8>> for the next packet
Expand Down Expand Up @@ -1542,25 +1568,27 @@ fn process_smpte2110_packet(
packet_size: usize,
start_time: u64,
) -> Vec<StreamData> {
let start = payload_offset;
let mut streams = Vec::new();

if packet_size > start + 12 {
if packet[start] == 0x80 || packet[start] == 0x81 {
let rtp_packet = &packet[start..];
// Check if the packet is large enough to contain an RTP header
if packet_size > payload_offset + 12 {
// Check for RTP header marker
if packet[payload_offset] == 0x80 || packet[payload_offset] == 0x81 {
let rtp_packet = &packet[payload_offset..];

// Create an RtpReader
if let Ok(rtp) = RtpReader::new(rtp_packet) {
// Extract the timestamp
// Extract the timestamp and payload type
let timestamp = rtp.timestamp();

let payload_type = rtp.payload_type();

let payload_offset = rtp.payload_offset();
// Calculate the actual start of the RTP payload
let rtp_payload_offset = payload_offset + rtp.payload_offset();

// size of rtp payload
let chunk_len = packet_size - payload_offset;
// Calculate the length of the RTP payload
let rtp_payload_length = packet_size - rtp_payload_offset;

// Extract SMPTE 2110 specific fields
let line_length = get_line_length(rtp_packet);
let line_number = get_line_number(rtp_packet);
let extended_sequence_number = get_extended_sequence_number(rtp_packet);
Expand All @@ -1569,20 +1597,24 @@ fn process_smpte2110_packet(

let line_continuation = get_line_continuation(rtp_packet);

let pid = payload_type as u16; /* FIXME */
let stream_type = payload_type.to_string(); /* FIXME */
// Use payload type as PID (for the purpose of this example)
let pid = payload_type as u16;
let stream_type = payload_type.to_string();

// Create new StreamData instance
let mut stream_data = StreamData::new(
Arc::clone(packet),
payload_offset,
chunk_len,
rtp_payload_offset,
rtp_payload_length,
pid,
stream_type,
start_time,
timestamp as u64,
0, /* fix me */
0,
);
// update streams details in stream_data structure
stream_data.update_stats(chunk_len, current_unix_timestamp_ms().unwrap_or(0));

// Update StreamData stats and RTP fields
stream_data.update_stats(rtp_payload_length, current_unix_timestamp_ms().unwrap_or(0));
stream_data.set_rtp_fields(
timestamp,
payload_type,
Expand All @@ -1594,6 +1626,7 @@ fn process_smpte2110_packet(
line_continuation,
);

// Add the StreamData to the stream list
streams.push(stream_data);
} else {
hexdump(&packet, 0, packet_size);
Expand Down

0 comments on commit 6c52ba0

Please sign in to comment.