From 9063bc6741e1471fd9098d5d6dd8f74d297b80de Mon Sep 17 00:00:00 2001 From: Chris Kennedy Date: Mon, 8 Apr 2024 17:38:14 -0700 Subject: [PATCH] Kafka images (#72) * send images as base64 into kafka json * use non-depracated base64 encoding * update version to v0.5.0 enough changes to increment. --------- Co-authored-by: Chris Kennedy --- Cargo.toml | 3 ++- src/bin/monitor.rs | 16 ++++++++++++++-- src/bin/probe.rs | 4 ++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3830dd1b..049d1501 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license-file = "LICENSE" homepage = "https://github.com/groovybits/wiki" repository = "https://github.com/groovybits/rscap" authors = ["Chris Kennedy"] -version = "0.4.1" +version = "0.5.0" edition = "2021" [lib] @@ -57,6 +57,7 @@ crossbeam = "0.8.4" image = "0.25.1" num_cpus = "1.16.0" threadpool = "1.8.1" +base64 = "0.22.0" [build-dependencies] capnpc = "0.18.0" diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index e2667a16..8bbc44ac 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -16,6 +16,7 @@ use async_zmq; //use chrono::TimeZone; use clap::Parser; use log::{debug, error, info}; +use base64::{Engine as _, engine::general_purpose}; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic}; use rdkafka::client::DefaultClientContext; use rdkafka::config::ClientConfig; @@ -514,7 +515,7 @@ async fn produce_message( #[derive(Parser, Debug)] #[clap( author = "Chris Kennedy", - version = "0.4.1", + version = "0.5.0", about = "RsCap Monitor for ZeroMQ input of MPEG-TS and SMPTE 2110 streams from remote probe." )] struct Args { @@ -1204,6 +1205,8 @@ async fn main() { hexdump(&data_msg_arc, 0, data_msg.len()); } + let mut base64_image = String::new(); + // Check if Decoding or if Demuxing if args.recv_raw_stream { // Initialize an Option to None @@ -1244,7 +1247,7 @@ async fn main() { // remove existing .jpg if given first let output_file_without_jpg = output_file.replace(".jpg", ""); if data_msg.len() > 0 && stream_data.has_image > 0 { - log::info!("Data msg is {} size", data_msg.len()); + log::debug!("Monitor: Jpeg image received: {} size", data_msg.len()); let output_file_incremental = format!( "{}_{:08}.jpg", output_file_without_jpg, @@ -1268,6 +1271,9 @@ async fn main() { } file.write_all(&data_msg).unwrap(); } + + // Encode the JPEG image as Base64 + base64_image = general_purpose::STANDARD.encode(&data_msg); } } @@ -1362,6 +1368,12 @@ async fn main() { flattened_data .insert("source_port".to_string(), serde_json::json!(source_port)); + // Insert the base64_image field into the flattened_data map + flattened_data.insert( + "base64_image".to_string(), + serde_json::json!(base64_image), + ); + // Convert the Map directly to a Value for serialization let combined_stats = serde_json::Value::Object(flattened_data); diff --git a/src/bin/probe.rs b/src/bin/probe.rs index 3975496f..5f7093a0 100644 --- a/src/bin/probe.rs +++ b/src/bin/probe.rs @@ -460,7 +460,7 @@ fn init_pcap( #[derive(Parser, Debug)] #[clap( author = "Chris Kennedy", - version = "0.4.1", + version = "0.5.0", about = "RsCap Probe for ZeroMQ output of MPEG-TS and SMPTE 2110 streams from pcap." )] struct Args { @@ -1662,7 +1662,7 @@ async fn rscap() { stream_data.packet_len = image_data.len(); // Process the received image data - log::info!("Received an image with size: {} bytes", image_data.len()); + log::debug!("Probe: Received a jpeg image with size: {} bytes", image_data.len()); } else { // zero out the packet data