Skip to content

Commit

Permalink
Kafka images (#72)
Browse files Browse the repository at this point in the history
* send images as base64 into kafka json

* use non-depracated base64 encoding

* update version to v0.5.0

enough changes to increment.

---------

Co-authored-by: Chris Kennedy <chris.kennedy@ltnglobal.com>
  • Loading branch information
groovybits and ltn-chriskennedy committed Apr 9, 2024
1 parent 3460e64 commit 9063bc6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license-file = "LICENSE"
homepage = "https://github.com/groovybits/wiki"
repository = "https://github.com/groovybits/rscap"
authors = ["Chris Kennedy"]
version = "0.4.1"
version = "0.5.0"
edition = "2021"

[lib]
Expand Down Expand Up @@ -57,6 +57,7 @@ crossbeam = "0.8.4"
image = "0.25.1"
num_cpus = "1.16.0"
threadpool = "1.8.1"
base64 = "0.22.0"

[build-dependencies]
capnpc = "0.18.0"
16 changes: 14 additions & 2 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use async_zmq;
//use chrono::TimeZone;
use clap::Parser;
use log::{debug, error, info};
use base64::{Engine as _, engine::general_purpose};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
Expand Down Expand Up @@ -514,7 +515,7 @@ async fn produce_message(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.4.1",
version = "0.5.0",
about = "RsCap Monitor for ZeroMQ input of MPEG-TS and SMPTE 2110 streams from remote probe."
)]
struct Args {
Expand Down Expand Up @@ -1204,6 +1205,8 @@ async fn main() {
hexdump(&data_msg_arc, 0, data_msg.len());
}

let mut base64_image = String::new();

// Check if Decoding or if Demuxing
if args.recv_raw_stream {
// Initialize an Option<File> to None
Expand Down Expand Up @@ -1244,7 +1247,7 @@ async fn main() {
// remove existing .jpg if given first
let output_file_without_jpg = output_file.replace(".jpg", "");
if data_msg.len() > 0 && stream_data.has_image > 0 {
log::info!("Data msg is {} size", data_msg.len());
log::debug!("Monitor: Jpeg image received: {} size", data_msg.len());
let output_file_incremental = format!(
"{}_{:08}.jpg",
output_file_without_jpg,
Expand All @@ -1268,6 +1271,9 @@ async fn main() {
}
file.write_all(&data_msg).unwrap();
}

// Encode the JPEG image as Base64
base64_image = general_purpose::STANDARD.encode(&data_msg);
}
}

Expand Down Expand Up @@ -1362,6 +1368,12 @@ async fn main() {
flattened_data
.insert("source_port".to_string(), serde_json::json!(source_port));

// Insert the base64_image field into the flattened_data map
flattened_data.insert(
"base64_image".to_string(),
serde_json::json!(base64_image),
);

// Convert the Map directly to a Value for serialization
let combined_stats = serde_json::Value::Object(flattened_data);

Expand Down
4 changes: 2 additions & 2 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ fn init_pcap(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.4.1",
version = "0.5.0",
about = "RsCap Probe for ZeroMQ output of MPEG-TS and SMPTE 2110 streams from pcap."
)]
struct Args {
Expand Down Expand Up @@ -1662,7 +1662,7 @@ async fn rscap() {
stream_data.packet_len = image_data.len();

// Process the received image data
log::info!("Received an image with size: {} bytes", image_data.len());
log::debug!("Probe: Received a jpeg image with size: {} bytes", image_data.len());

} else {
// zero out the packet data
Expand Down

0 comments on commit 9063bc6

Please sign in to comment.