diff --git a/Cargo.toml b/Cargo.toml index eeb3ef44..45ac489c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ path = "src/lib.rs" [features] default = [] dpdk_enabled = ["capsule"] +gst = ["gstreamer", "gstreamer-app", "gstreamer-video"] [profile.release-with-debug] inherits = "release" @@ -49,6 +50,11 @@ scte35-reader = "0.14.0" hex-slice = "0.1.4" chrono = "0.4.37" rdkafka = "0.36.2" +gstreamer = { version = "0.20", optional = true } +gstreamer-app = { version = "0.20", optional = true } +gstreamer-video = { version = "0.20", optional = true } +crossbeam = "0.8.4" +image = "0.25.1" [build-dependencies] capnpc = "0.18.0" diff --git a/schema/stream_data.capnp b/schema/stream_data.capnp index 0ac56cfe..ed5a522e 100644 --- a/schema/stream_data.capnp +++ b/schema/stream_data.capnp @@ -37,4 +37,18 @@ struct StreamDataCapnp { captureIat @33 :UInt64; sourceIp @34 :Text; sourcePort @35 :UInt32; + totalMemory @36 :UInt64; + usedMemory @37 :UInt64; + totalSwap @38 :UInt64; + usedSwap @39 :UInt64; + cpuUsage @40 :Float32; + cpuCount @41 :UInt32; + coreCount @42 :UInt32; + bootTime @43 :UInt64; + loadAvgOne @44 :Float64; + loadAvgFive @45 :Float64; + loadAvgFifteen @46 :Float64; + hostName @47 :Text; + kernelVersion @48 :Text; + osVersion @49 :Text; } diff --git a/scripts/compile.sh b/scripts/compile.sh index e6d035df..f0382be9 100755 --- a/scripts/compile.sh +++ b/scripts/compile.sh @@ -29,6 +29,7 @@ install_rust() { # Function to run a command within the SCL environment for CentOS run_with_scl() { + export PKG_CONFIG_PATH=/usr/local/lib64/pkgconfig:/usr/lib64/pkgconfig:$PKG_CONFIG_PATH scl enable devtoolset-11 -- "$@" } @@ -88,4 +89,3 @@ else fi echo "Build completed successfully." - diff --git a/scripts/install_gstreamer.sh b/scripts/install_gstreamer.sh new file mode 100755 index 00000000..88af9070 --- /dev/null +++ b/scripts/install_gstreamer.sh @@ -0,0 +1,244 @@ +#!/bin/bash +set -e + +# Function to run a command within the SCL environment for CentOS +run_with_scl() { + scl enable devtoolset-11 -- "$@" +} + +# Define versions for dependencies and GStreamer +GLIB_VERSION=2.56.4 +ORC_VERSION=0.4.31 +GST_VERSION=1.20.0 +LIBFFI_VERSION=3.3 +NASM_VERSION=2.15.05 +FFMPEG_VERSION=5.1.4 + +# Define the installation prefix +PREFIX=/usr/local +export PATH=$PREFIX/bin:$PATH + +# For pkg-config to find .pc files +export PKG_CONFIG_PATH=/usr/local/lib64/pkgconfig:/usr/local/lib/pkgconfig:/usr/lib64/pkgconfig:$PKG_CONFIG_PATH +export LD_LIBRARY_PATH=/usr/local/lib64:/usr/local/lib:$LD_LIBRARY_PATH + +export PATH=/usr/local/bin:$PATH + +# Ensure the system is up to date and has the basic build tools +sudo yum groupinstall -y "Development Tools" +sudo yum install -y bison flex python3 wget libffi-devel util-linux-devel util-linux libmount-devel + +# Explicitly use cmake from /usr/local/bin for Meson configuration +echo "[binaries]" > meson-native-file.ini +echo "cmake = 'cmake3'" >> meson-native-file.ini +CWD=$(pwd) +MESON_NATIVE_FILE=$CWD/meson-native-file.ini + +# Ensure Meson and Ninja are installed and use the correct Ninja +sudo pip3 install meson +sudo pip3 install ninja + +echo "------------------------------------------------------------" +echo "Installing GStreamer and essential dependencies..." +echo "------------------------------------------------------------" + +echo "---" +echo "Installing libffi..." +echo "---" +# Download, compile, and install libffi +if [ ! -f libffi-$LIBFFI_VERSION.tar.gz ]; then + wget ftp://sourceware.org/pub/libffi/libffi-$LIBFFI_VERSION.tar.gz +fi +if [ ! -d libffi-$LIBFFI_VERSION ]; then + tar xf libffi-$LIBFFI_VERSION.tar.gz +fi +cd libffi-$LIBFFI_VERSION +run_with_scl ./configure --prefix=$PREFIX +run_with_scl make +sudo make install +cd .. + +echo "---" +echo "Installing ORC..." +echo "---" +# Download, compile, and install ORC +if [ ! -f orc-$ORC_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/orc/orc-$ORC_VERSION.tar.xz +fi +if [ ! -d orc-$ORC_VERSION ]; then + tar xf orc-$ORC_VERSION.tar.xz +fi +cd orc-$ORC_VERSION +run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE +run_with_scl ninja -C _build +sudo /usr/local/bin/ninja -C _build install +cd .. + +echo "---" +echo "Installing Gstreamer core..." +echo "---" +# Download, compile, and install GStreamer core +if [ ! -f gstreamer-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gstreamer/gstreamer-$GST_VERSION.tar.xz +fi +if [ ! -d gstreamer-$GST_VERSION ]; then + tar xf gstreamer-$GST_VERSION.tar.xz +fi +cd gstreamer-$GST_VERSION +run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE +run_with_scl ninja -C _build +sudo /usr/local/bin/ninja -C _build install +cd .. + +echo "---" +echo "Installing Gstreamer base..." +echo "---" +# Download, compile, and install gst-plugins-base +if [ ! -f gst-plugins-base-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-plugins-base/gst-plugins-base-$GST_VERSION.tar.xz +fi +if [ ! -d gst-plugins-base-$GST_VERSION ]; then + tar xf gst-plugins-base-$GST_VERSION.tar.xz +fi +cd gst-plugins-base-$GST_VERSION +run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE +run_with_scl ninja -C _build +sudo /usr/local/bin/ninja -C _build install +cd .. + +# Install GStreamer bad plugins (includes tsdemux) +echo "---" +echo "Installing Gstreamer bad plugins..." +echo "---" +if [ ! -f gst-plugins-bad-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-plugins-bad/gst-plugins-bad-$GST_VERSION.tar.xz +fi +if [ ! -d gst-plugins-bad-$GST_VERSION ]; then + tar xf gst-plugins-bad-$GST_VERSION.tar.xz +fi +cd gst-plugins-bad-$GST_VERSION +run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE +run_with_scl ninja -C _build +sudo /usr/local/bin/ninja -C _build install +cd .. + +echo "---" +echo "Downloading and compiling NASM (Netwide Assembler)..." +echo "---" + +# Download +if [ ! -f nasm-$NASM_VERSION.tar.gz ]; then + wget https://www.nasm.us/pub/nasm/releasebuilds/$NASM_VERSION/nasm-$NASM_VERSION.tar.gz +fi +# Extract +if [ ! -d nasm-$NASM_VERSION ]; then + tar -xzf nasm-$NASM_VERSION.tar.gz +fi +cd nasm-$NASM_VERSION +# Compile and install +./autogen.sh +./configure --prefix=/usr/local +make +sudo make install +cd .. + +echo "---" +echo "Downloading and compiling libx264..." +echo "---" +echo "---" +echo "Cloning and compiling libx264..." +echo "---" +# Ensure git is installed +sudo yum install -y git + +# Clone the repository +if [ ! -d "x264" ]; then + git clone https://code.videolan.org/videolan/x264.git +fi +cd x264 + +# Compile +run_with_scl ./configure --prefix=$PREFIX --enable-shared --enable-static --enable-pic +run_with_scl make +sudo make install +sudo ldconfig +cd .. + +echo "---" +echo "Cloning and compiling x265..." +echo "---" +# Ensure necessary tools are installed +sudo yum install -y cmake3 git + +# Clone the x265 repository if it doesn't already exist +if [ ! -d "x265" ]; then + git clone https://github.com/videolan/x265.git +fi +cd x265 + +# Create a build directory and navigate into it +mkdir -p build +cd build + +# Use cmake3 to configure the build, respecting the PREFIX variable for installation +run_with_scl cmake3 -G "Unix Makefiles" -DCMAKE_INSTALL_PREFIX=$PREFIX -DENABLE_SHARED:bool=on ../source + +# Compile and install +run_with_scl make +sudo make install +sudo ldconfig + +# Navigate back to the initial directory +cd ../../.. + +echo "---" +echo "Downloading and compiling FFmpeg..." +echo "---" +# Download +if [ ! -f ffmpeg-$FFMPEG_VERSION.tar.bz2 ]; then + wget http://ffmpeg.org/releases/ffmpeg-$FFMPEG_VERSION.tar.bz2 +fi +# Extract +if [ ! -d ffmpeg-$FFMPEG_VERSION ]; then + tar xf ffmpeg-$FFMPEG_VERSION.tar.bz2 +fi +# Compile +cd ffmpeg-$FFMPEG_VERSION +run_with_scl ./configure --prefix=$PREFIX \ + --enable-shared --enable-static \ + --enable-pic --enable-gpl --enable-libx264 \ + --enable-libx265 \ + --extra-cflags="-I$PREFIX/include" --extra-ldflags="-L$PREFIX/lib" +run_with_scl make +sudo make install +sudo ldconfig +cd .. + +echo "---" +echo "Installing Gstreamer libav plugins..." +echo "---" + +PWD=$(pwd) +echo "PWD: $PWD" + +if [ ! -f gst-libav-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-libav/gst-libav-$GST_VERSION.tar.xz +fi +if [ ! -d gst-libav-$GST_VERSION ]; then + tar xf gst-libav-$GST_VERSION.tar.xz +fi +cd gst-libav-$GST_VERSION +run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE --pkg-config-path=$PKG_CONFIG_PATH +run_with_scl ninja -C _build +sudo /usr/local/bin/ninja -C _build install +cd .. + +# Verify GStreamer installation +echo "------------------------------------------------------------" +echo "Verifying GStreamer installation..." +echo "------------------------------------------------------------" +gst-launch-1.0 --version + +echo "------------------------------------------------------------" +echo "GStreamer and essential dependencies installed." +echo "------------------------------------------------------------" diff --git a/scripts/probe.sh b/scripts/probe.sh old mode 100644 new mode 100755 diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index c07feb43..661630e0 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -43,9 +43,7 @@ use lazy_static::lazy_static; use mpeg2ts_reader::demultiplex; use rscap::current_unix_timestamp_ms; use rscap::mpegts; -use serde::Serialize; use serde_json::{json, Value}; -use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::RwLock; use tokio::sync::mpsc::{self}; @@ -60,26 +58,6 @@ struct StreamGrouping { stream_data_list: Vec, } -#[derive(Serialize)] -struct CombinedStreamData { - stream_type: String, - program_number: u16, - pmt_pid: u16, - bitrate_avg: u32, - iat_avg: u64, - packet_count: u32, - stream_data: StreamData, -} - -#[derive(Serialize)] -struct CombinedSchema { - streams: HashMap, - bitrate_avg_global: u32, - iat_avg_global: u64, - cc_errors_global: u32, - current_cc_errors_global: u32, -} - fn is_cea_608(itu_t_t35_data: &sei::user_data_registered_itu_t_t35::ItuTT35) -> bool { // In this example, we check if the ITU-T T.35 data matches the known format for CEA-608. // This is a simplified example and might need adjustment based on the actual data format. @@ -235,6 +213,22 @@ fn capnp_to_stream_data(bytes: &[u8]) -> capnp::Result { capture_iat: reader.get_capture_iat(), source_ip, source_port: reader.get_source_port() as i32, + + // System stats fields + total_memory: reader.get_total_memory(), + used_memory: reader.get_used_memory(), + total_swap: reader.get_total_swap(), + used_swap: reader.get_used_swap(), + cpu_usage: reader.get_cpu_usage(), + cpu_count: reader.get_cpu_count() as usize, + core_count: reader.get_core_count() as usize, + boot_time: reader.get_boot_time(), + load_avg_one: reader.get_load_avg_one(), + load_avg_five: reader.get_load_avg_five(), + load_avg_fifteen: reader.get_load_avg_fifteen(), + host_name: reader.get_host_name()?.to_string()?, + kernel_version: reader.get_kernel_version()?.to_string()?, + os_version: reader.get_os_version()?.to_string()?, }; Ok(stream_data) @@ -412,6 +406,22 @@ fn flatten_streams( format!("{}.source_port", prefix), json!(stream_data.source_port), ); + + // Add system stats fields to the flattened structure + flat_structure.insert(format!("{}.total_memory", prefix), json!(stream_data.total_memory)); + flat_structure.insert(format!("{}.used_memory", prefix), json!(stream_data.used_memory)); + flat_structure.insert(format!("{}.total_swap", prefix), json!(stream_data.total_swap)); + flat_structure.insert(format!("{}.used_swap", prefix), json!(stream_data.used_swap)); + flat_structure.insert(format!("{}.cpu_usage", prefix), json!(stream_data.cpu_usage)); + flat_structure.insert(format!("{}.cpu_count", prefix), json!(stream_data.cpu_count)); + flat_structure.insert(format!("{}.core_count", prefix), json!(stream_data.core_count)); + flat_structure.insert(format!("{}.boot_time", prefix), json!(stream_data.boot_time)); + flat_structure.insert(format!("{}.load_avg_one", prefix), json!(stream_data.load_avg_one)); + flat_structure.insert(format!("{}.load_avg_five", prefix), json!(stream_data.load_avg_five)); + flat_structure.insert(format!("{}.load_avg_fifteen", prefix), json!(stream_data.load_avg_fifteen)); + flat_structure.insert(format!("{}.host_name", prefix), json!(stream_data.host_name)); + flat_structure.insert(format!("{}.kernel_version", prefix), json!(stream_data.kernel_version)); + flat_structure.insert(format!("{}.os_version", prefix), json!(stream_data.os_version)); } flat_structure @@ -425,7 +435,7 @@ async fn produce_message( key: String, _stream_data_timestamp: i64, producer: FutureProducer, - admin_client: &AdminClient, + admin_client: &AdminClient ) { debug!("Service {} sending message", kafka_topic); let kafka_topic = kafka_topic.replace(":", "_").replace(".", "_"); @@ -1078,6 +1088,17 @@ async fn main() { .create() .expect("Failed to create Kafka producer"); + // OS and Network stats + let mut system_stats_json = if show_os_stats { + get_stats_as_json(StatsType::System).await + } else { + json!({}) + }; + + if system_stats_json != json!({}) { + info!("Startup System OS Stats:\n{:?}", system_stats_json); + } + loop { // check for packet count if packet_count > 0 && counter >= packet_count { @@ -1092,12 +1113,16 @@ async fn main() { // get first message let header_msg = packet_msg[0].clone(); - // OS and Network stats - let system_stats_json = if show_os_stats { - get_stats_as_json(StatsType::System).await - } else { - json!({}) - }; + if show_os_stats && dot_last_sent_stats.elapsed().as_secs() > 10 { + dot_last_sent_stats = Instant::now(); + + // OS and Network stats + system_stats_json = get_stats_as_json(StatsType::System).await; + + if show_os_stats && system_stats_json != json!({}) { + info!("System stats as JSON:\n{:?}", system_stats_json); + } + } // Deserialize the received message into StreamData match capnp_to_stream_data(&header_msg) { @@ -1216,7 +1241,7 @@ async fn main() { kafka_key.clone(), current_unix_timestamp_ms().unwrap_or(0) as i64, producer.clone(), - &admin_client, + &admin_client ); // Await the future for sending the message @@ -1295,12 +1320,6 @@ async fn main() { std::io::stdout().flush().unwrap(); } - if dot_last_sent_stats.elapsed().as_secs() > 10 { - dot_last_sent_stats = Instant::now(); - if show_os_stats && system_stats_json != json!({}) { - info!("System stats as JSON:\n{:?}", system_stats_json); - } - } counter += 1; } diff --git a/src/bin/probe.rs b/src/bin/probe.rs index d7845e9e..ad268a0c 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -22,10 +22,13 @@ use log::{debug, error, info}; use mpeg2ts_reader::demultiplex; use pcap::{Active, Capture, Device, PacketCodec}; use rscap::mpegts; +use rscap::system_stats::get_system_stats; use rscap::stream_data::{ identify_video_pid, is_mpegts_or_smpte2110, parse_and_store_pat, process_packet, - update_pid_map, Codec, PmtInfo, StreamData, Tr101290Errors, PAT_PID, + update_pid_map, Codec, PmtInfo, StreamData, Tr101290Errors, PAT_PID }; +#[cfg(feature = "gst")] +use rscap::stream_data::{generate_images, feed_mpegts_packets, get_image }; use rscap::{current_unix_timestamp_ms, hexdump}; use std::{ error::Error as StdError, @@ -47,6 +50,8 @@ use h264_reader::annexb::AnnexBReader; use h264_reader::nal::{pps, sei, slice, sps, Nal, RefNal, UnitType}; use h264_reader::push::NalInterest; use h264_reader::Context; +#[cfg(feature = "gst")] +use image::GenericImageView; //use rscap::videodecoder::VideoProcessor; use rscap::stream_data::{process_mpegts_packet, process_smpte2110_packet}; use tokio::task; @@ -204,6 +209,22 @@ fn stream_data_to_capnp(stream_data: &StreamData) -> capnp::Result 0 && packets_captured > packet_count { @@ -1521,11 +1552,52 @@ async fn rscap() { video_batch.push(stream_data_clone); } } + + // Store the video packet and stream type number + #[cfg(feature = "gst")] + if args.extract_images { + let stream_type_number = stream_data.stream_type_number; + if stream_type_number > 0 { + let video_packet = stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len].to_vec(); + feed_mpegts_packets(vec![video_packet]); + generate_images(stream_type_number); + } + } } } else { // TODO: Add SMPTE 2110 handling for line to frame conversion and other processing and analysis } + #[cfg(feature = "gst")] + if args.extract_images { + match get_image() { + Some(image_data) => { + // Process the image data here + // For example, you can save it to a file or perform further analysis + // ... + println!("Received an image with size: {} bytes", image_data.len()); + + // Attempt to decode the image to get its parameters + match image::load_from_memory(&image_data) { + Ok(img) => { + println!("Image dimensions: {:?}", img.dimensions()); + println!("Image color type: {:?}", img.color()); + + // Save the image data to a file named "image.jpg" + let mut file = File::create("image.jpg").expect("Failed to create file."); + file.write_all(&image_data).expect("Failed to write image data to file."); + + println!("Image saved as image.jpg"); + }, + Err(e) => println!("Failed to decode image data: {:?}", e), + } + } + None => { + // No images available, continue processing + } + } + } + // release the packet Arc so it can be reused if !send_raw_stream && stream_data.packet_len > 0 { stream_data.packet = Arc::new(Vec::new()); // Create a new Arc> for the next packet diff --git a/src/stream_data.rs b/src/stream_data.rs index c6863839..c9dac0ad 100644 --- a/src/stream_data.rs +++ b/src/stream_data.rs @@ -5,17 +5,120 @@ */ use crate::current_unix_timestamp_ms; +use crate::system_stats::get_system_stats; +use crate::system_stats::SystemStats; use ahash::AHashMap; +#[cfg(feature = "gst")] +use gstreamer as gst; +#[cfg(feature = "gst")] +use gstreamer::prelude::*; +#[cfg(feature = "gst")] +use gstreamer_app as gst_app; use lazy_static::lazy_static; use log::{debug, error, info}; use rtp::RtpReader; use rtp_rs as rtp; use serde::{Deserialize, Serialize}; +use std::sync::RwLock; use std::{fmt, sync::Arc, sync::Mutex}; // global variable to store the MpegTS PID Map (initially empty) lazy_static! { static ref PID_MAP: Mutex>> = Mutex::new(AHashMap::new()); + static ref MPEGTS_PACKETS: RwLock>> = RwLock::new(Vec::new()); + static ref IMAGE_CHANNEL: ( + crossbeam::channel::Sender>, + crossbeam::channel::Receiver> + ) = crossbeam::channel::bounded(1); +} + +#[cfg(feature = "gst")] +pub fn feed_mpegts_packets(packets: Vec>) { + let mut mpegts_packets = MPEGTS_PACKETS.write().unwrap(); + mpegts_packets.extend(packets); +} + +#[cfg(feature = "gst")] +pub fn generate_images(stream_type_number: u8) { + let sender = IMAGE_CHANNEL.0.clone(); + let packets = MPEGTS_PACKETS.read().unwrap().clone(); + + // copy packets into a new vector for the thread + let packets_clone = packets.iter().map(|x| x.clone()).collect::>>(); + std::thread::spawn(move || { + let mut frame_data = None; + + // Initialize GStreamer + gst::init().unwrap(); + + // Create a pipeline to extract video frames + let pipeline = match stream_type_number { + 0x02 => gst::parse_launch("appsrc name=src ! tsdemux ! mpeg2dec ! videoconvert ! appsink name=sink"), + 0x1B => gst::parse_launch("appsrc name=src ! tsdemux ! h264parse ! avdec_h264 ! videoconvert ! appsink name=sink"), + 0x24 => gst::parse_launch("appsrc name=src ! tsdemux ! h265parse ! avdec_h265 ! videoconvert ! appsink name=sink"), + _ => panic!("Unsupported video stream type {}", stream_type_number), + }.unwrap(); + + // Get references to the appsrc and appsink elements + let appsrc = pipeline + .clone() + .dynamic_cast::() + .unwrap() + .by_name("src") + .unwrap() + .downcast::() + .unwrap(); + let appsink = pipeline + .clone() + .dynamic_cast::() + .unwrap() + .by_name("sink") + .unwrap() + .downcast::() + .unwrap(); + + // Set the appsrc caps + let caps = gst::Caps::builder("video/mpegts") + .field("packetsize", 188) + .build(); + appsrc.set_caps(Some(&caps)); + + // Configure the appsink + appsink.set_caps(Some(&gst::Caps::new_empty_simple("video/x-raw"))); + + // Start the pipeline + pipeline.set_state(gst::State::Playing).unwrap(); + + // Push MPEG-TS packets to the appsrc + for packet in packets_clone.into_iter() { + let buffer = gst::Buffer::from_slice(packet); + appsrc.push_buffer(buffer).unwrap(); + } + appsrc.end_of_stream().unwrap(); + + // Retrieve the video frames from the appsink + while let Some(sample) = appsink.pull_sample().ok() { + if let Some(buffer) = sample.buffer() { + let map = buffer.map_readable().unwrap(); + let data = map.as_slice().to_vec(); + frame_data = Some(data); + break; + } + } + + // Stop the pipeline + pipeline.set_state(gst::State::Null).unwrap(); + + // Send the extracted image through the channel + if let Some(image_data) = frame_data { + sender.send(image_data).unwrap(); + } + }); +} + +#[cfg(feature = "gst")] +pub fn get_image() -> Option> { + IMAGE_CHANNEL.1.try_recv().ok() } // constant for PAT PID @@ -100,6 +203,21 @@ pub struct StreamData { pub capture_iat: u64, pub source_ip: String, pub source_port: i32, + // System stats + pub total_memory: u64, + pub used_memory: u64, + pub total_swap: u64, + pub used_swap: u64, + pub cpu_usage: f32, + pub cpu_count: usize, + pub core_count: usize, + pub boot_time: u64, + pub load_avg_one: f64, + pub load_avg_five: f64, + pub load_avg_fifteen: f64, + pub host_name: String, + pub kernel_version: String, + pub os_version: String, } impl Clone for StreamData { @@ -144,6 +262,21 @@ impl Clone for StreamData { capture_iat: self.capture_iat, source_ip: self.source_ip.clone(), source_port: self.source_port, + // System stats initialization + total_memory: self.total_memory, + used_memory: self.used_memory, + total_swap: self.total_swap, + used_swap: self.used_swap, + cpu_usage: self.cpu_usage, + cpu_count: self.cpu_count, + core_count: self.core_count, + boot_time: self.boot_time, + load_avg_one: self.load_avg_one, + load_avg_five: self.load_avg_five, + load_avg_fifteen: self.load_avg_fifteen, + host_name: self.host_name.clone(), + kernel_version: self.kernel_version.clone(), + os_version: self.os_version.clone(), } } } @@ -165,6 +298,7 @@ impl StreamData { capture_iat: u64, source_ip: String, source_port: i32, + system_stats: SystemStats, ) -> Self { // convert capture_timestamp to unix timestamp in milliseconds since epoch let last_arrival_time = capture_timestamp; @@ -210,6 +344,21 @@ impl StreamData { capture_iat, source_ip, source_port, + // Initialize system stats fields from the SystemStats instance + total_memory: system_stats.total_memory, + used_memory: system_stats.used_memory, + total_swap: system_stats.total_swap, + used_swap: system_stats.used_swap, + cpu_usage: system_stats.cpu_usage, + cpu_count: system_stats.cpu_count, + core_count: system_stats.core_count, + boot_time: system_stats.boot_time, + load_avg_one: system_stats.load_avg.one, + load_avg_five: system_stats.load_avg.five, + load_avg_fifteen: system_stats.load_avg.fifteen, + host_name: system_stats.host_name, + kernel_version: system_stats.kernel_version, + os_version: system_stats.os_version, } } // set RTP fields @@ -677,6 +826,8 @@ pub fn process_packet( debug!("ProcessPacket: New PID {} Found, adding to PID map.", pid); } else { // PMT packet not found yet, add the stream_data_packet to the pid_map + // OS and Network stats + let system_stats = get_system_stats(); let mut stream_data = Arc::new(StreamData::new( Arc::new(Vec::new()), // Ensure packet_data is Arc> 0, @@ -692,6 +843,7 @@ pub fn process_packet( stream_data_packet.capture_iat, stream_data_packet.source_ip.clone(), stream_data_packet.source_port, + system_stats, )); Arc::make_mut(&mut stream_data).update_stats(packet.len()); @@ -785,6 +937,7 @@ pub fn update_pid_map( let timestamp = current_unix_timestamp_ms().unwrap_or(0); if !pid_map.contains_key(&stream_pid) { + let system_stats = get_system_stats(); let mut stream_data = Arc::new(StreamData::new( Arc::new(Vec::new()), // Ensure packet_data is Arc> 0, @@ -800,6 +953,7 @@ pub fn update_pid_map( capture_iat, source_ip.clone(), source_port, + system_stats, )); // update stream_data stats Arc::make_mut(&mut stream_data).update_stats(pmt_packet.len()); @@ -983,6 +1137,7 @@ pub fn process_smpte2110_packet( let stream_type = payload_type.to_string(); // Create new StreamData instance + let system_stats = get_system_stats(); let mut stream_data = StreamData::new( packet_arc, rtp_payload_offset, @@ -998,6 +1153,7 @@ pub fn process_smpte2110_packet( capture_iat, source_ip.clone(), source_port, + system_stats, ); // Update StreamData stats and RTP fields @@ -1067,6 +1223,7 @@ pub fn process_mpegts_packet( let timestamp = extract_timestamp(chunk); let continuity_counter = chunk[3] & 0x0F; + let system_stats = get_system_stats(); let mut stream_data = StreamData::new( Arc::clone(&packet), start, @@ -1082,6 +1239,7 @@ pub fn process_mpegts_packet( capture_iat, source_ip.clone(), source_port, + system_stats, ); stream_data.update_stats(packet_size); streams.push(stream_data); diff --git a/src/system_stats.rs b/src/system_stats.rs index 96e1111b..a0582530 100644 --- a/src/system_stats.rs +++ b/src/system_stats.rs @@ -1,56 +1,48 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use std::sync::Mutex; +use std::sync::RwLock; use std::time::{Duration, Instant}; -use sysinfo::{NetworkExt, NetworksExt}; -use sysinfo::{ProcessorExt, System, SystemExt}; +use sysinfo::{NetworkExt, NetworksExt, ProcessorExt, System, SystemExt}; -static SYSTEM: Lazy> = Lazy::new(|| { - let mut system = System::new_all(); - system.refresh_all(); // Initial refresh - Mutex::new((system, Instant::now())) +static CACHED_STATS: Lazy> = Lazy::new(|| { + let stats = get_system_stats_internal(); + RwLock::new((stats, Instant::now())) }); -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct SystemStats { - total_memory: u64, - used_memory: u64, - total_swap: u64, - used_swap: u64, - cpu_usage: f32, - cpu_count: usize, - core_count: usize, - boot_time: u64, - load_avg: LoadAverage, - host_name: String, - kernel_version: String, - os_version: String, - network_stats: Vec, + pub total_memory: u64, + pub used_memory: u64, + pub total_swap: u64, + pub used_swap: u64, + pub cpu_usage: f32, + pub cpu_count: usize, + pub core_count: usize, + pub boot_time: u64, + pub load_avg: LoadAverage, + pub host_name: String, + pub kernel_version: String, + pub os_version: String, + pub network_stats: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct NetworkStats { name: String, received: u64, transmitted: u64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoadAverage { - one: f64, - five: f64, - fifteen: f64, + pub one: f64, + pub five: f64, + pub fifteen: f64, } -pub fn get_system_stats() -> SystemStats { - let mut system_and_instant = SYSTEM.lock().unwrap(); - let (system, last_updated) = &mut *system_and_instant; - - // Only refresh if it's been more than a second since the last update - if last_updated.elapsed() > Duration::from_secs(1) { - system.refresh_all(); - *last_updated = Instant::now(); - } +fn get_system_stats_internal() -> SystemStats { + let mut system = System::new_all(); + system.refresh_all(); let host_name = system.host_name().unwrap_or_else(|| "Unknown".to_string()); let kernel_version = system @@ -67,6 +59,7 @@ pub fn get_system_stats() -> SystemStats { let cpu_count = system.processors().len(); let boot_time = system.boot_time(); let core_count = system.physical_core_count().unwrap_or_else(|| 0); + let networks = system.networks(); let network_stats = networks .iter() @@ -95,3 +88,17 @@ pub fn get_system_stats() -> SystemStats { network_stats, } } + +pub fn get_system_stats() -> SystemStats { + let cached_stats = CACHED_STATS.read().unwrap(); + let (stats, last_updated) = &*cached_stats; + + if last_updated.elapsed() > Duration::from_secs(60) { + drop(cached_stats); + let mut cached_stats_write = CACHED_STATS.write().unwrap(); + *cached_stats_write = (get_system_stats_internal(), Instant::now()); + cached_stats_write.0.clone() + } else { + stats.clone() + } +}