diff --git a/.gitignore b/.gitignore index de12a6cc..fd61b0da 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ src/bin/probe.rs.main src/bin/probe.rs.staging .env src/stream_data_capnp.rs +images/*.jpg +build diff --git a/Cargo.toml b/Cargo.toml index 45ac489c..3830dd1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,8 @@ gstreamer-app = { version = "0.20", optional = true } gstreamer-video = { version = "0.20", optional = true } crossbeam = "0.8.4" image = "0.25.1" +num_cpus = "1.16.0" +threadpool = "1.8.1" [build-dependencies] capnpc = "0.18.0" diff --git a/images/.keep b/images/.keep new file mode 100644 index 00000000..e69de29b diff --git a/schema/stream_data.capnp b/schema/stream_data.capnp index ed5a522e..f327a4f4 100644 --- a/schema/stream_data.capnp +++ b/schema/stream_data.capnp @@ -51,4 +51,5 @@ struct StreamDataCapnp { hostName @47 :Text; kernelVersion @48 :Text; osVersion @49 :Text; + hasImage @50 :UInt8; } diff --git a/scripts/compile.sh b/scripts/compile.sh index f0382be9..6efbdd44 100755 --- a/scripts/compile.sh +++ b/scripts/compile.sh @@ -3,6 +3,13 @@ # Exit immediately if a command exits with a non-zero status. set -e +FEATURES= +if [ "$1" != "" ]; then + FEATURES="--features $1" +fi + +BUILD=release-with-debug + # Function to prompt for installation prompt_install() { while true; do @@ -64,9 +71,13 @@ if [ "$OS" = "Linux" ]; then # Build with SCL echo "Building project (CentOS 7)..." - run_with_scl cargo build - run_with_scl cargo build --release - run_with_scl cargo build --profile=release-with-debug + if [ "$BUILD" = "release" ]; then + run_with_scl cargo build $FEATURES --release + elif [ "$BUILD" = "release-with-debug" ]; then + run_with_scl cargo build $FEATURES --profile=release-with-debug + else + run_with_scl cargo build $FEATURES + fi fi fi # Add elif blocks here for other specific Linux distributions @@ -75,17 +86,25 @@ elif [ "$OS" = "Darwin" ]; then # macOS specific setup # Build on macOS echo "Building project (macOS)..." - cargo build - cargo build --release - cargo build --profile=release-with-debug + if [ "$BUILD" = "release" ]; then + cargo build $FEATURES --profile=release-with-debug + elif [ "$BUILD" == "release-with-debug" ]; then + cargo build $FEATURES --release + else + cargo build $FEATURES + fi else echo "Generic Unix-like OS detected." # Generic Unix/Linux setup # Build for generic Unix/Linux echo "Building project..." - cargo build - cargo build --release - cargo build --profile=release-with-debug + if [ "$BUILD" = "release" ]; then + cargo build $FEATURES --release + elif [ "$BUILD" == "release-with-debug" ]; then + cargo build $FEATURES --profile=release-with-debug + else + cargo build $FEATURES + fi fi echo "Build completed successfully." diff --git a/scripts/install_gstreamer.sh b/scripts/install_gstreamer.sh index 88af9070..67fe9ae2 100755 --- a/scripts/install_gstreamer.sh +++ b/scripts/install_gstreamer.sh @@ -6,6 +6,12 @@ run_with_scl() { scl enable devtoolset-11 -- "$@" } +BUILD_DIR=$(pwd)/build +if [ ! -d $BUILD_DIR ]; then + mkdir -p $BUILD_DIR +fi +cd $BUILD_DIR + # Define versions for dependencies and GStreamer GLIB_VERSION=2.56.4 ORC_VERSION=0.4.31 @@ -42,196 +48,256 @@ echo "------------------------------------------------------------" echo "Installing GStreamer and essential dependencies..." echo "------------------------------------------------------------" -echo "---" -echo "Installing libffi..." -echo "---" -# Download, compile, and install libffi -if [ ! -f libffi-$LIBFFI_VERSION.tar.gz ]; then - wget ftp://sourceware.org/pub/libffi/libffi-$LIBFFI_VERSION.tar.gz -fi -if [ ! -d libffi-$LIBFFI_VERSION ]; then - tar xf libffi-$LIBFFI_VERSION.tar.gz +# Install libFFI +if [ ! -f "libffi-installed.done" ] ; then + echo "---" + echo "Installing libffi..." + echo "---" + # Download, compile, and install libffi + if [ ! -f libffi-$LIBFFI_VERSION.tar.gz ]; then + wget ftp://sourceware.org/pub/libffi/libffi-$LIBFFI_VERSION.tar.gz + fi + if [ ! -d libffi-$LIBFFI_VERSION ]; then + tar xf libffi-$LIBFFI_VERSION.tar.gz + fi + cd libffi-$LIBFFI_VERSION + run_with_scl ./configure --prefix=$PREFIX + run_with_scl make + sudo make install + cd .. fi -cd libffi-$LIBFFI_VERSION -run_with_scl ./configure --prefix=$PREFIX -run_with_scl make -sudo make install -cd .. +touch libffi-installed.done -echo "---" -echo "Installing ORC..." -echo "---" -# Download, compile, and install ORC -if [ ! -f orc-$ORC_VERSION.tar.xz ]; then - wget https://gstreamer.freedesktop.org/src/orc/orc-$ORC_VERSION.tar.xz +# Install ORC +if [ ! -f "orc-installed.done" ] ; then + echo "---" + echo "Installing ORC..." + echo "---" + # Download, compile, and install ORC + if [ ! -f orc-$ORC_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/orc/orc-$ORC_VERSION.tar.xz + fi + if [ ! -d orc-$ORC_VERSION ]; then + tar xf orc-$ORC_VERSION.tar.xz + fi + cd orc-$ORC_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -if [ ! -d orc-$ORC_VERSION ]; then - tar xf orc-$ORC_VERSION.tar.xz -fi -cd orc-$ORC_VERSION -run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE -run_with_scl ninja -C _build -sudo /usr/local/bin/ninja -C _build install -cd .. +touch orc-installed.done -echo "---" -echo "Installing Gstreamer core..." -echo "---" -# Download, compile, and install GStreamer core -if [ ! -f gstreamer-$GST_VERSION.tar.xz ]; then - wget https://gstreamer.freedesktop.org/src/gstreamer/gstreamer-$GST_VERSION.tar.xz -fi -if [ ! -d gstreamer-$GST_VERSION ]; then - tar xf gstreamer-$GST_VERSION.tar.xz +# Install Gstreamer core +if [ ! -f "gstreamer-installed.done" ] ; then + echo "---" + echo "Installing Gstreamer core..." + echo "---" + # Download, compile, and install GStreamer core + if [ ! -f gstreamer-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gstreamer/gstreamer-$GST_VERSION.tar.xz + fi + if [ ! -d gstreamer-$GST_VERSION ]; then + tar xf gstreamer-$GST_VERSION.tar.xz + fi + cd gstreamer-$GST_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -cd gstreamer-$GST_VERSION -run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE -run_with_scl ninja -C _build -sudo /usr/local/bin/ninja -C _build install -cd .. +touch gstreamer-installed.done -echo "---" -echo "Installing Gstreamer base..." -echo "---" -# Download, compile, and install gst-plugins-base -if [ ! -f gst-plugins-base-$GST_VERSION.tar.xz ]; then - wget https://gstreamer.freedesktop.org/src/gst-plugins-base/gst-plugins-base-$GST_VERSION.tar.xz +# Install GStreamer base plugins +if [ ! -f "gst-plugins-base-installed.done" ] ; then + echo "---" + echo "Installing Gstreamer base..." + echo "---" + # Download, compile, and install gst-plugins-base + if [ ! -f gst-plugins-base-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-plugins-base/gst-plugins-base-$GST_VERSION.tar.xz + fi + if [ ! -d gst-plugins-base-$GST_VERSION ]; then + tar xf gst-plugins-base-$GST_VERSION.tar.xz + fi + cd gst-plugins-base-$GST_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -if [ ! -d gst-plugins-base-$GST_VERSION ]; then - tar xf gst-plugins-base-$GST_VERSION.tar.xz -fi -cd gst-plugins-base-$GST_VERSION -run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE -run_with_scl ninja -C _build -sudo /usr/local/bin/ninja -C _build install -cd .. +touch gst-plugins-base-installed.done # Install GStreamer bad plugins (includes tsdemux) -echo "---" -echo "Installing Gstreamer bad plugins..." -echo "---" -if [ ! -f gst-plugins-bad-$GST_VERSION.tar.xz ]; then - wget https://gstreamer.freedesktop.org/src/gst-plugins-bad/gst-plugins-bad-$GST_VERSION.tar.xz -fi -if [ ! -d gst-plugins-bad-$GST_VERSION ]; then - tar xf gst-plugins-bad-$GST_VERSION.tar.xz +if [ ! -f "gst-plugins-bad-installed.done" ] ; then + # Install GStreamer bad plugins (includes tsdemux) + echo "---" + echo "Installing Gstreamer bad plugins..." + echo "---" + if [ ! -f gst-plugins-bad-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-plugins-bad/gst-plugins-bad-$GST_VERSION.tar.xz + fi + if [ ! -d gst-plugins-bad-$GST_VERSION ]; then + tar xf gst-plugins-bad-$GST_VERSION.tar.xz + fi + cd gst-plugins-bad-$GST_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -cd gst-plugins-bad-$GST_VERSION -run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE -run_with_scl ninja -C _build -sudo /usr/local/bin/ninja -C _build install -cd .. +touch gst-plugins-bad-installed.done echo "---" echo "Downloading and compiling NASM (Netwide Assembler)..." echo "---" -# Download -if [ ! -f nasm-$NASM_VERSION.tar.gz ]; then - wget https://www.nasm.us/pub/nasm/releasebuilds/$NASM_VERSION/nasm-$NASM_VERSION.tar.gz -fi -# Extract -if [ ! -d nasm-$NASM_VERSION ]; then - tar -xzf nasm-$NASM_VERSION.tar.gz -fi -cd nasm-$NASM_VERSION -# Compile and install -./autogen.sh -./configure --prefix=/usr/local -make -sudo make install -cd .. - -echo "---" -echo "Downloading and compiling libx264..." -echo "---" -echo "---" -echo "Cloning and compiling libx264..." -echo "---" -# Ensure git is installed -sudo yum install -y git -# Clone the repository -if [ ! -d "x264" ]; then - git clone https://code.videolan.org/videolan/x264.git +# Download and compile NASM +if [ ! -f "nasm-installed.done" ] ; then + if [ ! -f nasm-$NASM_VERSION.tar.gz ]; then + wget https://www.nasm.us/pub/nasm/releasebuilds/$NASM_VERSION/nasm-$NASM_VERSION.tar.gz + fi + # Extract + if [ ! -d nasm-$NASM_VERSION ]; then + tar -xzf nasm-$NASM_VERSION.tar.gz + fi + cd nasm-$NASM_VERSION + # Compile and install + ./autogen.sh + ./configure --prefix=/usr/local + make + sudo make install + cd .. fi -cd x264 +touch nasm-installed.done -# Compile -run_with_scl ./configure --prefix=$PREFIX --enable-shared --enable-static --enable-pic -run_with_scl make -sudo make install -sudo ldconfig -cd .. +# libx264 +if [ ! -f "x264-installed.done" ] ; then + echo "---" + echo "Downloading and compiling libx264..." + echo "---" + echo "---" + echo "Cloning and compiling libx264..." + echo "---" + # Ensure git is installed + sudo yum install -y git -echo "---" -echo "Cloning and compiling x265..." -echo "---" -# Ensure necessary tools are installed -sudo yum install -y cmake3 git + # Clone the repository + if [ ! -d "x264" ]; then + git clone https://code.videolan.org/videolan/x264.git + fi + cd x264 -# Clone the x265 repository if it doesn't already exist -if [ ! -d "x265" ]; then - git clone https://github.com/videolan/x265.git + # Compile + run_with_scl ./configure --prefix=$PREFIX --enable-shared --enable-static --enable-pic + run_with_scl make + sudo make install + sudo ldconfig + cd .. fi -cd x265 +touch x264-installed.done -# Create a build directory and navigate into it -mkdir -p build -cd build +# libx265 +if [ ! -f "x265-installed.done" ] ; then + echo "---" + echo "Cloning and compiling x265..." + echo "---" + # Ensure necessary tools are installed + sudo yum install -y cmake3 git -# Use cmake3 to configure the build, respecting the PREFIX variable for installation -run_with_scl cmake3 -G "Unix Makefiles" -DCMAKE_INSTALL_PREFIX=$PREFIX -DENABLE_SHARED:bool=on ../source + # Clone the x265 repository if it doesn't already exist + if [ ! -d "x265" ]; then + git clone https://github.com/videolan/x265.git + fi + cd x265 -# Compile and install -run_with_scl make -sudo make install -sudo ldconfig + # Create a build directory and navigate into it + mkdir -p build + cd build -# Navigate back to the initial directory -cd ../../.. + # Use cmake3 to configure the build, respecting the PREFIX variable for installation + run_with_scl cmake3 -G "Unix Makefiles" -DCMAKE_INSTALL_PREFIX=$PREFIX -DENABLE_SHARED:bool=on ../source -echo "---" -echo "Downloading and compiling FFmpeg..." -echo "---" -# Download -if [ ! -f ffmpeg-$FFMPEG_VERSION.tar.bz2 ]; then - wget http://ffmpeg.org/releases/ffmpeg-$FFMPEG_VERSION.tar.bz2 + # Compile and install + run_with_scl make + sudo make install + sudo ldconfig + + # Navigate back to the initial directory + cd ../../.. fi -# Extract -if [ ! -d ffmpeg-$FFMPEG_VERSION ]; then - tar xf ffmpeg-$FFMPEG_VERSION.tar.bz2 +touch x265-installed.done + +# FFmpeg +if [ ! -f "ffmpeg-installed.done" ] ; then + echo "---" + echo "Downloading and compiling FFmpeg..." + echo "---" + # Download + if [ ! -f ffmpeg-$FFMPEG_VERSION.tar.bz2 ]; then + wget http://ffmpeg.org/releases/ffmpeg-$FFMPEG_VERSION.tar.bz2 + fi + # Extract + if [ ! -d ffmpeg-$FFMPEG_VERSION ]; then + tar xf ffmpeg-$FFMPEG_VERSION.tar.bz2 + fi + # Compile + cd ffmpeg-$FFMPEG_VERSION + run_with_scl ./configure --prefix=$PREFIX \ + --enable-shared --enable-static \ + --enable-pic --enable-gpl --enable-libx264 \ + --enable-libx265 \ + --extra-cflags="-I$PREFIX/include" --extra-ldflags="-L$PREFIX/lib" + run_with_scl make + sudo make install + sudo ldconfig + cd .. fi -# Compile -cd ffmpeg-$FFMPEG_VERSION -run_with_scl ./configure --prefix=$PREFIX \ - --enable-shared --enable-static \ - --enable-pic --enable-gpl --enable-libx264 \ - --enable-libx265 \ - --extra-cflags="-I$PREFIX/include" --extra-ldflags="-L$PREFIX/lib" -run_with_scl make -sudo make install -sudo ldconfig -cd .. +touch ffmpeg-installed.done -echo "---" -echo "Installing Gstreamer libav plugins..." -echo "---" +# GStreamer libav plugins +if [ ! -f "gst-libav-installed.done" ] ; then + echo "---" + echo "Installing Gstreamer libav plugins..." + echo "---" -PWD=$(pwd) -echo "PWD: $PWD" + PWD=$(pwd) + echo "PWD: $PWD" -if [ ! -f gst-libav-$GST_VERSION.tar.xz ]; then - wget https://gstreamer.freedesktop.org/src/gst-libav/gst-libav-$GST_VERSION.tar.xz + if [ ! -f gst-libav-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-libav/gst-libav-$GST_VERSION.tar.xz + fi + if [ ! -d gst-libav-$GST_VERSION ]; then + tar xf gst-libav-$GST_VERSION.tar.xz + fi + cd gst-libav-$GST_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE --pkg-config-path=$PKG_CONFIG_PATH + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -if [ ! -d gst-libav-$GST_VERSION ]; then - tar xf gst-libav-$GST_VERSION.tar.xz +touch gst-libav-installed.done + +# GStreamer good plugins +if [ ! -f "gst-plugins-good-installed.done" ] ; then + echo "---" + echo "Installing GStreamer good plugins..." + echo "---" + # Download, compile, and install gst-plugins-good + if [ ! -f gst-plugins-good-$GST_VERSION.tar.xz ]; then + wget https://gstreamer.freedesktop.org/src/gst-plugins-good/gst-plugins-good-$GST_VERSION.tar.xz + fi + if [ ! -d gst-plugins-good-$GST_VERSION ]; then + tar xf gst-plugins-good-$GST_VERSION.tar.xz + fi + cd gst-plugins-good-$GST_VERSION + run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE + run_with_scl ninja -C _build + sudo /usr/local/bin/ninja -C _build install + cd .. fi -cd gst-libav-$GST_VERSION -run_with_scl meson _build --prefix=$PREFIX --buildtype=release --native-file $MESON_NATIVE_FILE --pkg-config-path=$PKG_CONFIG_PATH -run_with_scl ninja -C _build -sudo /usr/local/bin/ninja -C _build install -cd .. +touch gst-plugins-good-installed.done # Verify GStreamer installation echo "------------------------------------------------------------" diff --git a/scripts/monitor.sh b/scripts/monitor.sh index c801ca4a..c7f48b29 100755 --- a/scripts/monitor.sh +++ b/scripts/monitor.sh @@ -1,4 +1,9 @@ #!/bin/bash # -target/release/monitor \ - --send-to-kafka # --recv-raw-stream --output-file capture.ts +BUILD=release-with-debug +OUTPUT_FILE=images/test.jpg +KAFKA_BROKER=sun:9092 + +RUST_LOG=info target/$BUILD/monitor \ + --kafka-broker $KAFKA_BROKER \ + --output-file $OUTPUT_FILE $@ diff --git a/scripts/probe.sh b/scripts/probe.sh index 5b26c42f..e7965a43 100755 --- a/scripts/probe.sh +++ b/scripts/probe.sh @@ -1,12 +1,21 @@ #!/bin/bash # -sudo RUST_LOG=info \ - target/release/probe \ +BUILD=release-with-debug +LOGLEVEL=info +GST_DEBUG_LEVEL=1 +BACKTRACE=full +SOURCE_IP=224.0.0.200 +SOURCE_DEVICE=eth0 +SOURCE_PORT=10000 +TARGET_PORT=5556 +sudo GST_DEBUG=$GST_DEBUG_LEVEL RUST_BACKTRACE=$BACKTRACE RUST_LOG=$LOGLEVEL \ + target/$BUILD/probe \ --pcap-stats \ - --source-ip 224.0.0.200 \ - --source-port 10000 \ - --source-device eth0 \ + --source-ip $SOURCE_IP \ + --source-port $SOURCE_PORT \ + --source-device $SOURCE_DEVICE \ --target-ip 0.0.0.0 \ --send-null-packets \ - --target-port 5556 \ + --target-port $TARGET_PORT \ + --extract-images \ --zmq-batch-size 10000 $@ diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 661630e0..e2667a16 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -229,6 +229,7 @@ fn capnp_to_stream_data(bytes: &[u8]) -> capnp::Result { host_name: reader.get_host_name()?.to_string()?, kernel_version: reader.get_kernel_version()?.to_string()?, os_version: reader.get_os_version()?.to_string()?, + has_image: reader.get_has_image(), }; Ok(stream_data) @@ -408,20 +409,66 @@ fn flatten_streams( ); // Add system stats fields to the flattened structure - flat_structure.insert(format!("{}.total_memory", prefix), json!(stream_data.total_memory)); - flat_structure.insert(format!("{}.used_memory", prefix), json!(stream_data.used_memory)); - flat_structure.insert(format!("{}.total_swap", prefix), json!(stream_data.total_swap)); - flat_structure.insert(format!("{}.used_swap", prefix), json!(stream_data.used_swap)); - flat_structure.insert(format!("{}.cpu_usage", prefix), json!(stream_data.cpu_usage)); - flat_structure.insert(format!("{}.cpu_count", prefix), json!(stream_data.cpu_count)); - flat_structure.insert(format!("{}.core_count", prefix), json!(stream_data.core_count)); - flat_structure.insert(format!("{}.boot_time", prefix), json!(stream_data.boot_time)); - flat_structure.insert(format!("{}.load_avg_one", prefix), json!(stream_data.load_avg_one)); - flat_structure.insert(format!("{}.load_avg_five", prefix), json!(stream_data.load_avg_five)); - flat_structure.insert(format!("{}.load_avg_fifteen", prefix), json!(stream_data.load_avg_fifteen)); - flat_structure.insert(format!("{}.host_name", prefix), json!(stream_data.host_name)); - flat_structure.insert(format!("{}.kernel_version", prefix), json!(stream_data.kernel_version)); - flat_structure.insert(format!("{}.os_version", prefix), json!(stream_data.os_version)); + flat_structure.insert( + format!("{}.total_memory", prefix), + json!(stream_data.total_memory), + ); + flat_structure.insert( + format!("{}.used_memory", prefix), + json!(stream_data.used_memory), + ); + flat_structure.insert( + format!("{}.total_swap", prefix), + json!(stream_data.total_swap), + ); + flat_structure.insert( + format!("{}.used_swap", prefix), + json!(stream_data.used_swap), + ); + flat_structure.insert( + format!("{}.cpu_usage", prefix), + json!(stream_data.cpu_usage), + ); + flat_structure.insert( + format!("{}.cpu_count", prefix), + json!(stream_data.cpu_count), + ); + flat_structure.insert( + format!("{}.core_count", prefix), + json!(stream_data.core_count), + ); + flat_structure.insert( + format!("{}.boot_time", prefix), + json!(stream_data.boot_time), + ); + flat_structure.insert( + format!("{}.load_avg_one", prefix), + json!(stream_data.load_avg_one), + ); + flat_structure.insert( + format!("{}.load_avg_five", prefix), + json!(stream_data.load_avg_five), + ); + flat_structure.insert( + format!("{}.load_avg_fifteen", prefix), + json!(stream_data.load_avg_fifteen), + ); + flat_structure.insert( + format!("{}.host_name", prefix), + json!(stream_data.host_name), + ); + flat_structure.insert( + format!("{}.kernel_version", prefix), + json!(stream_data.kernel_version), + ); + flat_structure.insert( + format!("{}.os_version", prefix), + json!(stream_data.os_version), + ); + flat_structure.insert( + format!("{}.has_image", prefix), + json!(stream_data.has_image), + ); } flat_structure @@ -435,7 +482,7 @@ async fn produce_message( key: String, _stream_data_timestamp: i64, producer: FutureProducer, - admin_client: &AdminClient + admin_client: &AdminClient, ) { debug!("Service {} sending message", kafka_topic); let kafka_topic = kafka_topic.replace(":", "_").replace(".", "_"); @@ -491,7 +538,7 @@ struct Args { #[clap(long, env = "SILENT", default_value_t = false)] silent: bool, - /// Sets if Raw Stream should be sent + /// Sets if Raw Stream will be received #[clap(long, env = "RECV_RAW_STREAM", default_value_t = false)] recv_raw_stream: bool, @@ -1064,13 +1111,6 @@ async fn main() { let mut total_bytes = 0; let mut counter = 0; - // Initialize an Option to None - let mut file = if !output_file.is_empty() { - Some(File::create(&output_file).unwrap()) - } else { - None - }; - let mut video_batch = Vec::new(); let mut dot_last_file_write = Instant::now(); @@ -1099,6 +1139,8 @@ async fn main() { info!("Startup System OS Stats:\n{:?}", system_stats_json); } + let mut output_file_counter = 0; + loop { // check for packet count if packet_count > 0 && counter >= packet_count { @@ -1117,7 +1159,7 @@ async fn main() { dot_last_sent_stats = Instant::now(); // OS and Network stats - system_stats_json = get_stats_as_json(StatsType::System).await; + system_stats_json = get_stats_as_json(StatsType::System).await; if show_os_stats && system_stats_json != json!({}) { info!("System stats as JSON:\n{:?}", system_stats_json); @@ -1127,6 +1169,108 @@ async fn main() { // Deserialize the received message into StreamData match capnp_to_stream_data(&header_msg) { Ok(stream_data) => { + // print the structure of the packet + log::debug!("MONITOR::PACKET:RECEIVE[{}] pid: {} stream_type: {} bitrate: {} bitrate_max: {} bitrate_min: {} bitrate_avg: {} iat: {} iat_max: {} iat_min: {} iat_avg: {} errors: {} continuity_counter: {} timestamp: {}", + counter + 1, + stream_data.pid, + stream_data.stream_type, + stream_data.bitrate, + stream_data.bitrate_max, + stream_data.bitrate_min, + stream_data.bitrate_avg, + stream_data.iat, + stream_data.iat_max, + stream_data.iat_min, + stream_data.iat_avg, + stream_data.error_count, + stream_data.continuity_counter, + stream_data.timestamp, + ); + + // get data message + let data_msg = packet_msg[1].clone(); + + // Process raw data packet + total_bytes += data_msg.len(); + debug!( + "Monitor: #{} Received {}/{} bytes", + counter, + data_msg.len(), + total_bytes + ); + + if debug_on { + let data_msg_arc = Arc::new(data_msg.to_vec()); + hexdump(&data_msg_arc, 0, data_msg.len()); + } + + // Check if Decoding or if Demuxing + if args.recv_raw_stream { + // Initialize an Option to None + let mut output_file_mut = if !output_file.is_empty() { + Some(File::create(&output_file).unwrap()) + } else { + None + }; + + if args.decode_video || args.mpegts_reader { + if video_batch.len() >= args.decode_video_batch_size { + dtx.send(video_batch).await.unwrap(); // Clone if necessary + video_batch = Vec::new(); + } else { + let mut stream_data_clone = stream_data.clone(); + stream_data_clone.packet_start = 0; + stream_data_clone.packet_len = data_msg.len(); + stream_data_clone.packet = Arc::new(data_msg.to_vec()); + video_batch.push(stream_data_clone); + } + } + + // Write to file if output_file is provided + if let Some(file) = output_file_mut.as_mut() { + output_file_counter += 1; + if !no_progress && dot_last_file_write.elapsed().as_secs() > 1 { + dot_last_file_write = Instant::now(); + print!("*"); + // flush stdout + std::io::stdout().flush().unwrap(); + } + file.write_all(&data_msg).unwrap(); + } + } else { + // change output_file_name_mut to contain an incrementing _00000000.jpg ending + // use output_file_counter to increment the file name + // example output_file_name_mut = "output_{:08}.jpg", output_file_counter + // remove existing .jpg if given first + let output_file_without_jpg = output_file.replace(".jpg", ""); + if data_msg.len() > 0 && stream_data.has_image > 0 { + log::info!("Data msg is {} size", data_msg.len()); + let output_file_incremental = format!( + "{}_{:08}.jpg", + output_file_without_jpg, + output_file_counter + ); + + let mut output_file_mut = if !output_file.is_empty() { + Some(File::create(&output_file_incremental).unwrap()) + } else { + None + }; + + // Write to file if output_file is provided + if let Some(file) = output_file_mut.as_mut() { + output_file_counter += 1; + if !no_progress && dot_last_file_write.elapsed().as_secs() > 1 { + dot_last_file_write = Instant::now(); + print!("*"); + // flush stdout + std::io::stdout().flush().unwrap(); + } + file.write_all(&data_msg).unwrap(); + } + } + } + let pid = stream_data.pid; { let mut stream_groupings = STREAM_GROUPINGS.write().unwrap(); @@ -1179,7 +1323,11 @@ async fn main() { let global_cc_errors_current = total_cc_errors_current; // avg IAT - let global_iat_avg = if stream_count > 0 { total_iat_avg as f64 / stream_count as f64 } else { 0.0 }; + let global_iat_avg = if stream_count > 0 { + total_iat_avg as f64 / stream_count as f64 + } else { + 0.0 + }; // Calculate global averages let global_bitrate_avg = if stream_count > 0 { @@ -1210,14 +1358,9 @@ async fn main() { "timestamp".to_string(), serde_json::json!(current_timestamp), ); - flattened_data.insert( - "source_ip".to_string(), - serde_json::json!(source_ip), - ); - flattened_data.insert( - "source_port".to_string(), - serde_json::json!(source_port), - ); + flattened_data.insert("source_ip".to_string(), serde_json::json!(source_ip)); + flattened_data + .insert("source_port".to_string(), serde_json::json!(source_port)); // Convert the Map directly to a Value for serialization let combined_stats = serde_json::Value::Object(flattened_data); @@ -1241,72 +1384,12 @@ async fn main() { kafka_key.clone(), current_unix_timestamp_ms().unwrap_or(0) as i64, producer.clone(), - &admin_client + &admin_client, ); // Await the future for sending the message future.await; } - - // print the structure of the packet - debug!("MONITOR::PACKET:RECEIVE[{}] pid: {} stream_type: {} bitrate: {} bitrate_max: {} bitrate_min: {} bitrate_avg: {} iat: {} iat_max: {} iat_min: {} iat_avg: {} errors: {} continuity_counter: {} timestamp: {}", - counter + 1, - stream_data.pid, - stream_data.stream_type, - stream_data.bitrate, - stream_data.bitrate_max, - stream_data.bitrate_min, - stream_data.bitrate_avg, - stream_data.iat, - stream_data.iat_max, - stream_data.iat_min, - stream_data.iat_avg, - stream_data.error_count, - stream_data.continuity_counter, - stream_data.timestamp, - ); - - // get data message - let data_msg = packet_msg[1].clone(); - - // Process raw data packet - total_bytes += data_msg.len(); - debug!( - "Monitor: #{} Received {}/{} bytes", - counter, - data_msg.len(), - total_bytes - ); - - if debug_on { - let data_msg_arc = Arc::new(data_msg.to_vec()); - hexdump(&data_msg_arc, 0, data_msg.len()); - } - - // Check if Decoding or if Demuxing - if args.decode_video || args.mpegts_reader { - if video_batch.len() >= args.decode_video_batch_size { - dtx.send(video_batch).await.unwrap(); // Clone if necessary - video_batch = Vec::new(); - } else { - let mut stream_data_clone = stream_data.clone(); - stream_data_clone.packet_start = 0; - stream_data_clone.packet_len = data_msg.len(); - stream_data_clone.packet = Arc::new(data_msg.to_vec()); - video_batch.push(stream_data_clone); - } - } - - // Write to file if output_file is provided - if let Some(file) = file.as_mut() { - if !no_progress && dot_last_file_write.elapsed().as_secs() > 1 { - dot_last_file_write = Instant::now(); - print!("*"); - // flush stdout - std::io::stdout().flush().unwrap(); - } - file.write_all(&data_msg).unwrap(); - } } Err(e) => { error!("Error deserializing message: {:?}", e); diff --git a/src/bin/probe.rs b/src/bin/probe.rs index ad268a0c..3975496f 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -18,17 +18,21 @@ use capsule::dpdk; use capsule::prelude::*; use clap::Parser; use futures::stream::StreamExt; +#[cfg(feature = "gst")] +use gstreamer as gst; +#[cfg(feature = "gst")] +use gstreamer::prelude::*; use log::{debug, error, info}; use mpeg2ts_reader::demultiplex; use pcap::{Active, Capture, Device, PacketCodec}; use rscap::mpegts; -use rscap::system_stats::get_system_stats; use rscap::stream_data::{ identify_video_pid, is_mpegts_or_smpte2110, parse_and_store_pat, process_packet, - update_pid_map, Codec, PmtInfo, StreamData, Tr101290Errors, PAT_PID + update_pid_map, Codec, PmtInfo, StreamData, Tr101290Errors, PAT_PID, }; #[cfg(feature = "gst")] -use rscap::stream_data::{generate_images, feed_mpegts_packets, get_image }; +use rscap::stream_data::{initialize_pipeline, process_video_packets, pull_images}; +use rscap::system_stats::get_system_stats; use rscap::{current_unix_timestamp_ms, hexdump}; use std::{ error::Error as StdError, @@ -50,8 +54,6 @@ use h264_reader::annexb::AnnexBReader; use h264_reader::nal::{pps, sei, slice, sps, Nal, RefNal, UnitType}; use h264_reader::push::NalInterest; use h264_reader::Context; -#[cfg(feature = "gst")] -use image::GenericImageView; //use rscap::videodecoder::VideoProcessor; use rscap::stream_data::{process_mpegts_packet, process_smpte2110_packet}; use tokio::task; @@ -225,6 +227,7 @@ fn stream_data_to_capnp(stream_data: &StreamData) -> capnp::Result 0 + && (args.send_raw_stream || args.extract_images) + { // Write to file if output_file is provided if let Some(file) = file.as_mut() { if !no_progress && dot_last_sent_ts.elapsed().as_secs() >= 1 { @@ -1394,6 +1402,38 @@ async fn rscap() { } }); + // Create channels for sending video packets and receiving images + #[cfg(feature = "gst")] + let (video_packet_sender, video_packet_receiver) = mpsc::channel(10000); + #[cfg(feature = "gst")] + let (image_sender, mut image_receiver) = mpsc::channel(10000); + + // Initialize the pipeline + #[cfg(feature = "gst")] + let (pipeline, appsrc, appsink) = match initialize_pipeline(0x1B) { + Ok((pipeline, appsrc, appsink)) => (pipeline, appsrc, appsink), + Err(err) => { + eprintln!("Failed to initialize the pipeline: {}", err); + return; + } + }; + + // Start the pipeline + #[cfg(feature = "gst")] + match pipeline.set_state(gst::State::Playing) { + Ok(_) => (), + Err(err) => { + eprintln!("Failed to set the pipeline state to Playing: {}", err); + return; + } + } + + // Spawn separate tasks for processing video packets and pulling images + #[cfg(feature = "gst")] + process_video_packets(appsrc, video_packet_receiver); + #[cfg(feature = "gst")] + pull_images(appsink, image_sender, args.save_images); + // Perform TR 101 290 checks let mut tr101290_errors = Tr101290Errors::new(); @@ -1414,7 +1454,9 @@ async fn rscap() { }; // OS and Network stats - let system_stats = get_system_stats(); + let system_stats = get_system_stats(); + + let mut video_stream_type = 0; info!("Startup System OS Stats:\n{:?}", system_stats); @@ -1448,7 +1490,16 @@ async fn rscap() { } let chunks = if is_mpegts { - process_mpegts_packet(payload_offset, packet, packet_size, start_time, timestamp, iat, args.source_ip.clone(), args.source_port) + process_mpegts_packet( + payload_offset, + packet, + packet_size, + start_time, + timestamp, + iat, + args.source_ip.clone(), + args.source_port, + ) } else { process_smpte2110_packet( payload_offset, @@ -1496,17 +1547,28 @@ async fn rscap() { if pid == pmt_info.pid { debug!("ProcessPacket: PMT packet detected with PID {}", pid); // Update PID_MAP with new stream types - update_pid_map(&packet_chunk, &pmt_info.packet, timestamp, iat, args.source_ip.clone(), args.source_port); + update_pid_map( + &packet_chunk, + &pmt_info.packet, + timestamp, + iat, + args.source_ip.clone(), + args.source_port, + ); // Identify the video PID (if not already identified) if let Some((new_pid, new_codec)) = identify_video_pid(&packet_chunk) { if video_pid.map_or(true, |vp| vp != new_pid) { video_pid = Some(new_pid); + let old_stream_type = video_stream_type; + video_stream_type = stream_data.stream_type_number; info!( - "STATUS::VIDEO_PID:CHANGE: to {}/{} from {}/{}", + "STATUS::VIDEO_PID:CHANGE: to {}/{}/{} from {}/{}/{}", new_pid, new_codec.clone(), + video_stream_type, video_pid.unwrap(), - video_codec.unwrap() + video_codec.unwrap(), + old_stream_type ); video_codec = Some(new_codec.clone()); // Reset video frame as the video stream has changed @@ -1527,6 +1589,22 @@ async fn rscap() { } } + if video_pid < Some(0x1FFF) + && video_pid > Some(0) + && stream_data.pid == video_pid.unwrap() + && video_stream_type != stream_data.stream_type_number + { + let old_stream_type = video_stream_type; + video_stream_type = stream_data.stream_type_number; + log::info!( + "STATUS::VIDEO_STREAM:FOUND: to {}/{} from {}/{}", + video_pid.unwrap(), + video_stream_type, + video_pid.unwrap(), + old_stream_type + ); + } + // Check for TR 101 290 errors process_packet( &mut stream_data, @@ -1552,54 +1630,53 @@ async fn rscap() { video_batch.push(stream_data_clone); } } + } - // Store the video packet and stream type number + // Process video packets + #[cfg(feature = "gst")] + if args.extract_images { #[cfg(feature = "gst")] - if args.extract_images { - let stream_type_number = stream_data.stream_type_number; - if stream_type_number > 0 { - let video_packet = stream_data.packet[stream_data.packet_start..stream_data.packet_start + stream_data.packet_len].to_vec(); - feed_mpegts_packets(vec![video_packet]); - generate_images(stream_type_number); + if video_stream_type > 0 { + let video_packet = Arc::new( + stream_data.packet[stream_data.packet_start + ..stream_data.packet_start + stream_data.packet_len] + .to_vec(), + ); + + // Send the video packet to the processing task + if let Err(_) = video_packet_sender + .try_send(Arc::try_unwrap(video_packet).unwrap_or_default()) + { + // If the channel is full, drop the packet + log::warn!("Video packet channel is full. Dropping packet."); } } - } - } else { - // TODO: Add SMPTE 2110 handling for line to frame conversion and other processing and analysis - } - #[cfg(feature = "gst")] - if args.extract_images { - match get_image() { - Some(image_data) => { - // Process the image data here - // For example, you can save it to a file or perform further analysis - // ... - println!("Received an image with size: {} bytes", image_data.len()); - - // Attempt to decode the image to get its parameters - match image::load_from_memory(&image_data) { - Ok(img) => { - println!("Image dimensions: {:?}", img.dimensions()); - println!("Image color type: {:?}", img.color()); - - // Save the image data to a file named "image.jpg" - let mut file = File::create("image.jpg").expect("Failed to create file."); - file.write_all(&image_data).expect("Failed to write image data to file."); - - println!("Image saved as image.jpg"); - }, - Err(e) => println!("Failed to decode image data: {:?}", e), - } - } - None => { - // No images available, continue processing + // Receive and process images + #[cfg(feature = "gst")] + if let Ok(image_data) = image_receiver.try_recv() { + // attach image to the stream_data.packet arc, clearing the current arc value + stream_data.packet = Arc::new(image_data.clone()); + stream_data.has_image = image_data.len() as u8; + stream_data.packet_start = 0; + stream_data.packet_len = image_data.len(); + + // Process the received image data + log::info!("Received an image with size: {} bytes", image_data.len()); + + } else { + // zero out the packet data + stream_data.packet_start = 0; + stream_data.packet_len = 0; + stream_data.packet = Arc::new(Vec::new()); } } + } else { + // TODO: Add SMPTE 2110 handling for line to frame conversion and other processing and analysis } - // release the packet Arc so it can be reused - if !send_raw_stream && stream_data.packet_len > 0 { + if !args.extract_images && !args.send_raw_stream && stream_data.packet_len > 0 { + // release the packet Arc so it can be reused stream_data.packet = Arc::new(Vec::new()); // Create a new Arc> for the next packet stream_data.packet_len = 0; stream_data.packet_start = 0; @@ -1608,7 +1685,7 @@ async fn rscap() { continue; } } - } else if send_raw_stream { + } else if !args.extract_images && args.send_raw_stream { // Skip null packets if !args.send_null_packets { if pid == 0x1FFF && is_mpegts { @@ -1647,6 +1724,15 @@ async fn rscap() { println!("\nSending stop signals to threads..."); + // Stop the pipeline when done + #[cfg(feature = "gst")] + match pipeline.set_state(gst::State::Null) { + Ok(_) => (), + Err(err) => { + eprintln!("Failed to set the pipeline state to Null: {}", err); + } + } + // Send ZMQ stop signal tx.send(Vec::new()).await.unwrap(); drop(tx); diff --git a/src/stream_data.rs b/src/stream_data.rs index c9dac0ad..12b473e1 100644 --- a/src/stream_data.rs +++ b/src/stream_data.rs @@ -9,116 +9,268 @@ use crate::system_stats::get_system_stats; use crate::system_stats::SystemStats; use ahash::AHashMap; #[cfg(feature = "gst")] +use gst_app::{AppSink, AppSrc}; +#[cfg(feature = "gst")] use gstreamer as gst; #[cfg(feature = "gst")] use gstreamer::prelude::*; #[cfg(feature = "gst")] use gstreamer_app as gst_app; +#[cfg(feature = "gst")] +use gstreamer_video::VideoFormat; +#[cfg(feature = "gst")] +use gstreamer_video::VideoInfo; +#[cfg(feature = "gst")] +use image::imageops::resize; +#[cfg(feature = "gst")] +use image::{ImageBuffer, Rgb}; use lazy_static::lazy_static; use log::{debug, error, info}; use rtp::RtpReader; use rtp_rs as rtp; use serde::{Deserialize, Serialize}; -use std::sync::RwLock; use std::{fmt, sync::Arc, sync::Mutex}; +#[cfg(feature = "gst")] +use tokio::sync::mpsc; -// global variable to store the MpegTS PID Map (initially empty) lazy_static! { static ref PID_MAP: Mutex>> = Mutex::new(AHashMap::new()); - static ref MPEGTS_PACKETS: RwLock>> = RwLock::new(Vec::new()); - static ref IMAGE_CHANNEL: ( - crossbeam::channel::Sender>, - crossbeam::channel::Receiver> - ) = crossbeam::channel::bounded(1); } #[cfg(feature = "gst")] -pub fn feed_mpegts_packets(packets: Vec>) { - let mut mpegts_packets = MPEGTS_PACKETS.write().unwrap(); - mpegts_packets.extend(packets); +fn create_pipeline(desc: &str) -> Result { + let pipeline = gst::parse_launch(desc)? + .downcast::() + .expect("Expected a gst::Pipeline"); + Ok(pipeline) } #[cfg(feature = "gst")] -pub fn generate_images(stream_type_number: u8) { - let sender = IMAGE_CHANNEL.0.clone(); - let packets = MPEGTS_PACKETS.read().unwrap().clone(); - - // copy packets into a new vector for the thread - let packets_clone = packets.iter().map(|x| x.clone()).collect::>>(); - std::thread::spawn(move || { - let mut frame_data = None; - - // Initialize GStreamer - gst::init().unwrap(); - - // Create a pipeline to extract video frames - let pipeline = match stream_type_number { - 0x02 => gst::parse_launch("appsrc name=src ! tsdemux ! mpeg2dec ! videoconvert ! appsink name=sink"), - 0x1B => gst::parse_launch("appsrc name=src ! tsdemux ! h264parse ! avdec_h264 ! videoconvert ! appsink name=sink"), - 0x24 => gst::parse_launch("appsrc name=src ! tsdemux ! h265parse ! avdec_h265 ! videoconvert ! appsink name=sink"), - _ => panic!("Unsupported video stream type {}", stream_type_number), - }.unwrap(); - - // Get references to the appsrc and appsink elements - let appsrc = pipeline - .clone() - .dynamic_cast::() - .unwrap() - .by_name("src") - .unwrap() - .downcast::() - .unwrap(); - let appsink = pipeline - .clone() - .dynamic_cast::() - .unwrap() - .by_name("sink") - .unwrap() - .downcast::() - .unwrap(); - - // Set the appsrc caps - let caps = gst::Caps::builder("video/mpegts") - .field("packetsize", 188) - .build(); - appsrc.set_caps(Some(&caps)); - - // Configure the appsink - appsink.set_caps(Some(&gst::Caps::new_empty_simple("video/x-raw"))); - - // Start the pipeline - pipeline.set_state(gst::State::Playing).unwrap(); - - // Push MPEG-TS packets to the appsrc - for packet in packets_clone.into_iter() { - let buffer = gst::Buffer::from_slice(packet); - appsrc.push_buffer(buffer).unwrap(); +pub fn initialize_pipeline( + stream_type_number: u8, +) -> Result<(gst::Pipeline, AppSrc, AppSink), anyhow::Error> { + // Initialize GStreamer + gst::init()?; + + // Create a pipeline to extract video frames + let pipeline = match stream_type_number { + 0x02 => create_pipeline( + "appsrc name=src ! tsdemux ! mpeg2dec ! videoconvert ! appsink name=sink", + )?, + 0x1B => create_pipeline( + "appsrc name=src ! tsdemux ! h264parse ! avdec_h264 ! videoconvert ! appsink name=sink", + )?, + 0x24 => create_pipeline( + "appsrc name=src ! tsdemux ! h265parse ! avdec_h265 ! videoconvert ! appsink name=sink", + )?, + _ => { + return Err(anyhow::anyhow!( + "Unsupported video stream type {}", + stream_type_number + )) } - appsrc.end_of_stream().unwrap(); - - // Retrieve the video frames from the appsink - while let Some(sample) = appsink.pull_sample().ok() { - if let Some(buffer) = sample.buffer() { - let map = buffer.map_readable().unwrap(); - let data = map.as_slice().to_vec(); - frame_data = Some(data); - break; + }; + + // Get references to the appsrc and appsink elements + let appsrc = pipeline + .clone() + .dynamic_cast::() + .unwrap() + .by_name("src") + .unwrap() + .downcast::() + .unwrap(); + + let appsink = pipeline + .clone() + .dynamic_cast::() + .unwrap() + .by_name("sink") + .unwrap() + .downcast::() + .unwrap(); + + // Set appsink to drop old buffers and only keep the most recent one + appsink.set_drop(true); + appsink.set_max_buffers(188); + + Ok((pipeline, appsrc, appsink)) +} + +#[cfg(feature = "gst")] +pub fn process_video_packets(appsrc: AppSrc, mut video_packet_receiver: mpsc::Receiver>) { + tokio::spawn(async move { + while let Some(packet) = video_packet_receiver.recv().await { + let buffer = gst::Buffer::from_slice(packet); + if let Err(err) = appsrc.push_buffer(buffer) { + eprintln!("Failed to push buffer to appsrc: {}", err); } } + }); +} + +#[cfg(feature = "gst")] +fn i420_to_rgb(width: usize, height: usize, i420_data: &[u8]) -> Vec { + let mut rgb_data = Vec::with_capacity(width * height * 3); + + let y_plane_size = width * height; + let uv_plane_size = y_plane_size / 4; + let u_offset = y_plane_size; + let v_offset = u_offset + uv_plane_size; + + for j in 0..height { + for i in 0..width { + let y = i420_data[j * width + i] as f32; + let u = i420_data[u_offset + (j / 2) * (width / 2) + (i / 2)] as f32; + let v = i420_data[v_offset + (j / 2) * (width / 2) + (i / 2)] as f32; + + let r = (y + 1.402 * (v - 128.0)).max(0.0).min(255.0); + let g = (y - 0.344136 * (u - 128.0) - 0.714136 * (v - 128.0)) + .max(0.0) + .min(255.0); + let b = (y + 1.772 * (u - 128.0)).max(0.0).min(255.0); + + rgb_data.push(r as u8); + rgb_data.push(g as u8); + rgb_data.push(b as u8); + } + } - // Stop the pipeline - pipeline.set_state(gst::State::Null).unwrap(); + rgb_data +} - // Send the extracted image through the channel - if let Some(image_data) = frame_data { - sender.send(image_data).unwrap(); +#[cfg(feature = "gst")] +fn i422_10le_to_rgb(width: usize, height: usize, i422_data: &[u8]) -> Vec { + let mut rgb_data = Vec::with_capacity(width * height * 3); + + let y_plane_size = width * height * 2; // Y plane uses 2 bytes per pixel + let u_plane_offset = y_plane_size; + let v_plane_offset = u_plane_offset + (width / 2) * height * 2; // U plane uses 2 bytes per value, half the width + + for j in 0..height { + for i in 0..width { + let y_index = j * width * 2 + i * 2; + let uv_horizontal_index = (i / 2) * 2; + let u_index = u_plane_offset + j * (width / 2) * 2 + uv_horizontal_index; + let v_index = v_plane_offset + j * (width / 2) * 2 + uv_horizontal_index; + + // Unpacking the 10-bit YUV values, shifting to the right by 6 to get the value in the lower bits + // and scaling to the full 8-bit range (0-255) + let y = (((i422_data[y_index] as u16) | ((i422_data[y_index + 1] as u16) << 8)) + & 0x03FF) as f32 + * 255.0 + / 1023.0; + let u = (((i422_data[u_index] as u16) | ((i422_data[u_index + 1] as u16) << 8)) + & 0x03FF) as f32 + * 255.0 + / 1023.0 + - 128.0; + let v = (((i422_data[v_index] as u16) | ((i422_data[v_index + 1] as u16) << 8)) + & 0x03FF) as f32 + * 255.0 + / 1023.0 + - 128.0; + + // Convert to RGB using the YUV to RGB conversion formula + let r = y + 1.402 * v; + let g = y - 0.344136 * u - 0.714136 * v; + let b = y + 1.772 * u; + + // Clamping the RGB values to the 0-255 range after conversion + let r = r.clamp(0.0, 255.0) as u8; + let g = g.clamp(0.0, 255.0) as u8; + let b = b.clamp(0.0, 255.0) as u8; + + rgb_data.push(r); + rgb_data.push(g); + rgb_data.push(b); } - }); + } + + rgb_data } #[cfg(feature = "gst")] -pub fn get_image() -> Option> { - IMAGE_CHANNEL.1.try_recv().ok() +pub fn pull_images(appsink: AppSink, image_sender: mpsc::Sender>, save_images: bool) { + tokio::spawn(async move { + let mut frame_count = 0; + + loop { + let sample = appsink.try_pull_sample(gst::ClockTime::ZERO); + if let Some(sample) = sample { + if let Some(buffer) = sample.buffer() { + let caps = sample.caps().expect("Sample without caps"); + let info = VideoInfo::from_caps(&caps).expect("Failed to parse caps"); + + // print entire videoinfo struct + log::debug!("Video Frame Info: {:?}", info); + + let width = info.width(); + let height = info.height(); + + let map = buffer.map_readable().unwrap(); + let data = if info.format() == VideoFormat::I420 { + i420_to_rgb(width as usize, height as usize, &map.as_slice()) + } else if info.format() == VideoFormat::I42210le { + i422_10le_to_rgb(width as usize, height as usize, &map.as_slice()) + } else { + map.as_slice().to_vec() + }; + + // Check if the data length matches the expected dimensions + let expected_length = width as usize * height as usize * 3; // Assuming RGB format + if data.len() == expected_length { + // Create an ImageBuffer from the received image data + if let Some(image) = + ImageBuffer::, _>::from_raw(width, height, data.clone()) + { + let scaled_height = 480; + let scaled_width = + (width as f32 / height as f32 * scaled_height as f32) as u32; + + let resized_image = resize( + &image, + scaled_width, + scaled_height, + image::imageops::FilterType::Triangle, + ); + + if save_images { + // Save the resized JPEG image + let filename = format!("images/frame_{:04}.jpg", frame_count); + if let Err(err) = resized_image.save(filename) { + log::error!("Failed to save image: {}", err); + } else { + frame_count += 1; + } + } + + // Convert the resized image to a JPEG vector + let mut jpeg_data = Vec::new(); + let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality( + &mut jpeg_data, + 80, + ); + + encoder + .encode_image(&resized_image) + .expect("JPEG encoding failed"); + + if let Err(err) = image_sender.send(jpeg_data).await { + log::error!("Failed to send image data through channel: {}", err); + // exit the loop if the receiver is gone + break; + } + } else { + log::error!("Failed to create ImageBuffer"); + } + } else { + log::error!("Received image data with unexpected length: {}", data.len()); + } + } + } + } + }); } // constant for PAT PID @@ -218,6 +370,7 @@ pub struct StreamData { pub host_name: String, pub kernel_version: String, pub os_version: String, + pub has_image: u8, } impl Clone for StreamData { @@ -277,6 +430,7 @@ impl Clone for StreamData { host_name: self.host_name.clone(), kernel_version: self.kernel_version.clone(), os_version: self.os_version.clone(), + has_image: self.has_image, } } } @@ -359,6 +513,7 @@ impl StreamData { host_name: system_stats.host_name, kernel_version: system_stats.kernel_version, os_version: system_stats.os_version, + has_image: 0, } } // set RTP fields @@ -416,22 +571,21 @@ impl StreamData { // check for continuity continuous increment and wrap around from 0 to 15 let previous_continuity_counter = self.continuity_counter; self.continuity_counter = continuity_counter & 0x0F; + // check if we incremented without loss - if self.continuity_counter != previous_continuity_counter + 1 + if self.continuity_counter != (previous_continuity_counter + 1) & 0x0F && self.continuity_counter != previous_continuity_counter { - // check if we wrapped around from 15 to 0 - if self.continuity_counter == 0 { - // check if previous value was 15 - if previous_continuity_counter == 15 { - // no loss - return; - } - } // increment the error count by the difference between the current and previous continuity counter - let error_count = (self.continuity_counter - previous_continuity_counter) as u32; + let error_count = if self.continuity_counter < previous_continuity_counter { + (self.continuity_counter + 16) - previous_continuity_counter + } else { + self.continuity_counter - previous_continuity_counter + } as u32; + self.error_count += 1; self.current_error_count = error_count; + error!( "Continuity Counter Error: PID: {} Previous: {} Current: {} Loss: {} Total Loss: {}", self.pid, previous_continuity_counter, self.continuity_counter, error_count, self.error_count @@ -440,6 +594,7 @@ impl StreamData { // reset the error count self.current_error_count = 0; } + self.continuity_counter = continuity_counter; } pub fn update_stats(&mut self, packet_size: usize) { @@ -788,11 +943,15 @@ pub fn process_packet( } // calculate uptime using the arrival time as SystemTime and start_time as u64 - let uptime = stream_data.capture_time - stream_data.start_time; + //let uptime = stream_data.capture_time - stream_data.start_time; // print out each field of structure debug!("STATUS::PACKET:MODIFY[{}] pid: {} stream_type: {} bitrate: {} bitrate_max: {} bitrate_min: {} bitrate_avg: {} iat: {} iat_max: {} iat_min: {} iat_avg: {} errors: {} continuity_counter: {} timestamp: {} uptime: {} packet_offset: {}, packet_len: {}", - stream_data.pid, stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data.bitrate_max, stream_data.bitrate_min, stream_data.bitrate_avg, stream_data.iat, stream_data.iat_max, stream_data.iat_min, stream_data.iat_avg, stream_data.error_count, stream_data.continuity_counter, stream_data.timestamp, uptime, stream_data_packet.packet_start, stream_data_packet.packet_len); + stream_data.pid, stream_data.pid, stream_data.stream_type, + stream_data.bitrate, stream_data.bitrate_max, stream_data.bitrate_min, + stream_data.bitrate_avg, stream_data.iat, stream_data.iat_max, stream_data.iat_min, + stream_data.iat_avg, stream_data.error_count, stream_data.continuity_counter, + stream_data.timestamp, 0/*uptime*/, stream_data_packet.packet_start, stream_data_packet.packet_len); stream_data_packet.bitrate = stream_data.bitrate; stream_data_packet.bitrate_avg = stream_data.bitrate_avg;