Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle restarting ffmpeg in agent #52

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ regex.workspace = true
satori-common.workspace = true
semver.workspace = true
serde.workspace = true
serde_with.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["process"] }
tower-http.workspace = true
20 changes: 18 additions & 2 deletions agent/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use crate::ffmpeg::StreamerConfig;
use byte_unit::Byte;
use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds};
use std::{
fs,
path::{Path, PathBuf},
time::Duration,
};
use url::Url;

#[serde_as]
#[derive(Clone, Deserialize)]
pub(crate) struct Config {
pub(crate) video_directory: PathBuf,

pub(crate) stream: StreamerConfig,
pub(crate) stream: StreamConfig,

#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) ffmpeg_restart_delay: Duration,
}

impl Config {
@@ -37,3 +43,13 @@ where

Ok(Byte::from_bytes(result))
}

#[derive(Clone, Deserialize)]
pub(crate) struct StreamConfig {
pub(crate) url: Url,

pub(crate) ffmpeg_input_args: Vec<String>,

pub(crate) hls_segment_time: i32,
pub(crate) hls_retained_segment_count: i32,
}
13 changes: 0 additions & 13 deletions agent/src/ffmpeg/mod.rs
Original file line number Diff line number Diff line change
@@ -3,16 +3,3 @@ pub(crate) use self::streamer::Streamer;

mod version;
pub(crate) use self::version::get_ffmpeg_version;

use serde::Deserialize;
use url::Url;

#[derive(Clone, Deserialize)]
pub(crate) struct StreamerConfig {
pub(crate) url: Url,

pub(crate) ffmpeg_input_args: Vec<String>,

pub(crate) hls_segment_time: i32,
pub(crate) hls_retained_segment_count: i32,
}
204 changes: 105 additions & 99 deletions agent/src/ffmpeg/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::StreamerConfig;
use crate::Event;
use crate::config::Config;
use kagiyama::prometheus::metrics::gauge::Gauge;
use nix::{
sys::signal::{self, Signal},
unistd::{self, Pid},
@@ -9,129 +9,135 @@ use std::{
process::Stdio,
sync::{Arc, Mutex},
};
use tokio::{process::Command, sync::broadcast::Sender, task::JoinHandle};
use tracing::{debug, info};
use tokio::{process::Command, task::JoinHandle};
use tracing::{debug, info, warn};

const FFMPEG_EXIT_SIGNAL: Signal = Signal::SIGINT;

const PLAYLIST_FILENAME: &str = "stream.m3u8";

struct Inner {
events_tx: Sender<Event>,
config: StreamerConfig,
destination: PathBuf,
frame_file: PathBuf,

ffmpeg_pid: Mutex<Option<Pid>>,
}
const HLS_PLAYLIST_FILENAME: &str = "stream.m3u8";

pub(crate) struct Streamer {
inner: Arc<Inner>,
config: Config,
frame_file: PathBuf,
terminate: Arc<Mutex<bool>>,
ffmpeg_pid: Arc<Mutex<Option<Pid>>>,
ffmpeg_invocations_metric: Gauge,
}

impl Streamer {
pub(crate) fn new(
events_tx: Sender<Event>,
config: StreamerConfig,
destination: &Path,
frame_file: &Path,
) -> Self {
pub(crate) fn new(config: Config, frame_file: &Path, ffmpeg_invocations_metric: Gauge) -> Self {
Self {
inner: Arc::new(Inner {
events_tx,
config,
destination: destination.to_owned(),
frame_file: frame_file.to_owned(),
ffmpeg_pid: Default::default(),
}),
config,
frame_file: frame_file.to_owned(),
terminate: Arc::new(Mutex::new(false)),
ffmpeg_pid: Default::default(),
ffmpeg_invocations_metric,
}
}

#[allow(clippy::async_yields_async)]
#[tracing::instrument(skip_all)]
pub(crate) async fn start(&self) -> JoinHandle<()> {
let inner = self.inner.clone();
let config = self.config.clone();
let frame_file = self.frame_file.clone();
let ffmpeg_pid = self.ffmpeg_pid.clone();
let terminate = self.terminate.clone();
let ffmpeg_invocations_metric = self.ffmpeg_invocations_metric.clone();

tokio::spawn(async move {
// Start ffmpeg as a child process
let mut ffmpeg_process = unsafe {
Command::new("ffmpeg")
// Always overwrite files
.arg("-y")
// Stream config
.args(&inner.config.ffmpeg_input_args)
.arg("-i")
.arg(inner.config.url.to_string())
.arg("-c:v")
.arg("copy")
.arg("-c:a")
.arg("copy")
// HLS output stream
.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(inner.config.hls_segment_time.to_string())
.arg("-hls_list_size")
.arg(inner.config.hls_retained_segment_count.to_string())
.arg("-hls_flags")
.arg("append_list+delete_segments")
.arg("-hls_segment_filename")
.arg(
inner
.destination
.join(satori_common::SEGMENT_FILENAME_FORMAT),
)
.arg("-strftime")
.arg("1")
.arg(inner.destination.join(PLAYLIST_FILENAME))
// Output preview frames as JPEG
.arg("-vf")
.arg("fps=1")
.arg("-update")
.arg("1")
.arg(&inner.frame_file)
// Do nothing with stdin
.stdin(Stdio::null())
// Call setsid, required for correct exit signal handling
.pre_exec(|| {
unistd::setsid()?;
Ok(())
})
.spawn()
.expect("ffmpeg process should be started")
};
debug!("ffmpeg process: {:?}", ffmpeg_process);
loop {
// Start ffmpeg as a child process
let mut ffmpeg_process = unsafe {
Command::new("ffmpeg")
// Always overwrite files
.arg("-y")
// Stream config
.args(&config.stream.ffmpeg_input_args)
.arg("-i")
.arg(config.stream.url.to_string())
.arg("-c:v")
.arg("copy")
.arg("-c:a")
.arg("copy")
// HLS output stream
.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(config.stream.hls_segment_time.to_string())
.arg("-hls_list_size")
.arg(config.stream.hls_retained_segment_count.to_string())
.arg("-hls_flags")
.arg("append_list+delete_segments")
.arg("-hls_segment_filename")
.arg(
config
.video_directory
.join(satori_common::SEGMENT_FILENAME_FORMAT),
)
.arg("-strftime")
.arg("1")
.arg(config.video_directory.join(HLS_PLAYLIST_FILENAME))
// Output preview frames as JPEG
.arg("-vf")
.arg("fps=1")
.arg("-update")
.arg("1")
.arg(&frame_file)
// Do nothing with stdin
.stdin(Stdio::null())
// Call setsid, required for correct exit signal handling
.pre_exec(|| {
unistd::setsid()?;
Ok(())
})
.spawn()
.expect("ffmpeg process should be started")
};
debug!("ffmpeg process: {:?}", ffmpeg_process);

// Get and store the ffmpeg PID
*ffmpeg_pid
.lock()
.expect("ffmpeg PID lock acquire should not fail") = Some(Pid::from_raw(
ffmpeg_process
.id()
.expect("ffmpeg process should have a PID") as i32,
));
info!("ffmpeg PID: {:?}", ffmpeg_pid);

// Get and store the ffmpeg PID
*inner
.ffmpeg_pid
.lock()
.expect("ffmpeg PID lock acquire should not fail") = Some(Pid::from_raw(
ffmpeg_process
.id()
.expect("ffmpeg process should have a PID") as i32,
));
info!("ffmpeg PID: {:?}", inner.ffmpeg_pid);
// Increment ffmpeg invocation count
ffmpeg_invocations_metric.inc();

// Wait for ffmpeg process to exit
let result = ffmpeg_process.wait().await;
info!("ffmpeg exited, ok={}", result.is_ok());
*inner
.ffmpeg_pid
.lock()
.expect("ffmpeg PID lock acquire should not fail") = None;
// Wait for ffmpeg process to exit
let result = ffmpeg_process.wait().await;
info!("ffmpeg exited, ok={}", result.is_ok());
*ffmpeg_pid
.lock()
.expect("ffmpeg PID lock acquire should not fail") = None;

// Signal app shutdown
inner.events_tx.send(Event::Shutdown(Err(()))).unwrap();
let expected_shutdown = *terminate.lock().unwrap();
if expected_shutdown {
info!("Termination requested, not restarting ffmpeg");
break;
} else {
warn!(
"ffmpeg exited unexpectedly, restarting in {:?}",
config.ffmpeg_restart_delay
);
tokio::time::sleep(config.ffmpeg_restart_delay).await;
}
}
})
}

#[tracing::instrument(skip_all)]
pub(crate) async fn stop(&self) {
const FFMPEG_EXIT_SIGNAL: Signal = Signal::SIGINT;

// Set terminate flag to ensure ffmpeg is not restarted
*self.terminate.lock().unwrap() = true;

// Request ffmpeg to terminate
info!("Sending {} to ffmpeg process", FFMPEG_EXIT_SIGNAL);
if let Some(ffmpeg_pid) = *self
.inner
.ffmpeg_pid
.lock()
.expect("ffmpeg PID lock acquire should not fail")
Loading