Skip to content

Commit

Permalink
don't setup gstreamer if we are not extracting images
Browse files Browse the repository at this point in the history
  • Loading branch information
ltn-chriskennedy committed Apr 20, 2024
1 parent aaa6b87 commit 206f46e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license-file = "LICENSE"
homepage = "https://github.com/groovybits/rscap"
repository = "https://github.com/groovybits/rscap"
authors = ["Chris Kennedy"]
version = "0.5.112"
version = "0.5.113"
edition = "2021"

[lib]
Expand Down
5 changes: 4 additions & 1 deletion scripts/probe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ else
exit 1
fi

EXTRACT_IMAGES_ARG=
EXTRACT_IMAGES_ARG=--extract-images

echo "Using $PROBE_BIN"

$PROBE_BIN -V
Expand All @@ -64,10 +67,10 @@ sudo GST_PLUGIN_PATH=$GST_PLUGIN_PATH \
GST_DEBUG=$GST_DEBUG_LEVEL \
RUST_BACKTRACE=$BACKTRACE \
$VALGRIND $NUMACTL $PROBE_BIN \
$EXTRACT_IMAGES_ARG \
--source-device $SOURCE_DEVICE \
--source-ip $SOURCE_IP \
--source-port $SOURCE_PORT \
--kafka-broker $KAFKA_BROKER \
--kafka-topic "rsprobe" \
--extract-images \
$@
2 changes: 1 addition & 1 deletion specs/rsprobe.spec
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: rsprobe
Version: 0.5.112
Version: 0.5.113
Release: 1%{?dist}
Summary: MpegTS Stream Analysis Probe with Kafka and GStreamer
License: MIT
Expand Down
100 changes: 58 additions & 42 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ fn init_pcap(
#[derive(Parser, Debug)]
#[clap(
author = "Chris Kennedy",
version = "0.5.112",
version = "0.5.113",
about = "MpegTS Stream Analysis Probe with Kafka and GStreamer"
)]
struct Args {
Expand Down Expand Up @@ -650,10 +650,6 @@ struct Args {
#[clap(long, env = "VIDEO_BUFFER_SIZE", default_value_t = 100000)]
video_buffer_size: usize,

/// Chunk Buffer size = Size of the buffer for the chunked packet bundles to the main processing thread
#[clap(long, env = "CHUNK_BUFFER_SIZE", default_value_t = 100000)]
chunk_buffer_size: usize,

/// Kafka Broker
#[clap(long, env = "KAFKA_BROKER", default_value = "")]
kafka_broker: String,
Expand Down Expand Up @@ -1422,52 +1418,70 @@ async fn rsprobe(running: Arc<AtomicBool>) {
#[cfg(feature = "gst")]
let (image_sender, mut image_receiver) = mpsc::channel(args.image_buffer_size);

// Initialize GStreamer
#[cfg(feature = "gst")]
gstreamer::init().expect("Failed to initialize GStreamer");

// Initialize the pipeline
#[cfg(feature = "gst")]
let (pipeline, appsrc, appsink) = match initialize_pipeline(
&args.input_codec,
args.image_height,
args.gst_queue_buffers,
!args.scale_images_after_gstreamer,
&args.image_framerate,
) {
Ok((pipeline, appsrc, appsink)) => (pipeline, appsrc, appsink),
Err(err) => {
eprintln!("Failed to initialize the pipeline: {}", err);
return;
let (pipeline, appsrc, appsink) = if args.extract_images {
match initialize_pipeline(
&args.input_codec,
args.image_height,
args.gst_queue_buffers,
!args.scale_images_after_gstreamer,
&args.image_framerate,
) {
Ok((pipeline, appsrc, appsink)) => (pipeline, appsrc, appsink),
Err(err) => {
eprintln!("Failed to initialize the pipeline: {}", err);
return;
}
}
} else {
(
gstreamer::Pipeline::new(),
gstreamer_app::AppSrc::builder().build(),
gstreamer_app::AppSink::builder().build(),
)
};

// Start the pipeline
#[cfg(feature = "gst")]
match pipeline.set_state(gst::State::Playing) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Playing: {}", err);
return;
if args.extract_images {
match pipeline.set_state(gst::State::Playing) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Playing: {}", err);
return;
}
}
}

// Spawn separate tasks for processing video packets and pulling images
#[cfg(feature = "gst")]
process_video_packets(
appsrc,
video_packet_receiver,
running_gstreamer_process.clone(),
);
if args.extract_images {
process_video_packets(
appsrc,
video_packet_receiver,
running_gstreamer_process.clone(),
);
}
#[cfg(feature = "gst")]
pull_images(
appsink,
/*Arc::new(Mutex::new(image_sender)),*/
image_sender,
args.save_images,
args.image_sample_rate_ns,
args.image_height,
args.filmstrip_length,
args.jpeg_quality,
args.image_frame_increment,
running_gstreamer_pull,
);
if args.extract_images {
pull_images(
appsink,
/*Arc::new(Mutex::new(image_sender)),*/
image_sender,
args.save_images,
args.image_sample_rate_ns,
args.image_height,
args.filmstrip_length,
args.jpeg_quality,
args.image_frame_increment,
running_gstreamer_pull,
);
}

// Watch file thread and sender/receiver for log file input
let (watch_file_sender, watch_file_receiver) = channel();
Expand Down Expand Up @@ -1783,10 +1797,12 @@ async fn rsprobe(running: Arc<AtomicBool>) {

// Stop the pipeline when done
#[cfg(feature = "gst")]
match pipeline.set_state(gst::State::Null) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Null: {}", err);
if args.extract_images {
match pipeline.set_state(gst::State::Null) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Null: {}", err);
}
}
}

Expand Down

0 comments on commit 206f46e

Please sign in to comment.