diff --git a/Cargo.lock b/Cargo.lock index d7e75e1b..a4398a78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2298,7 +2298,9 @@ version = "0.3.0" dependencies = [ "axum", "byte-unit", + "bytes", "clap", + "futures", "metrics", "metrics-exporter-prometheus", "nix", @@ -2306,8 +2308,8 @@ dependencies = [ "satori-common", "serde", "serde_with", - "tempfile", "tokio", + "tokio-util", "tower-http", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 8ed1309c..39b9f87d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ serde_json = "1.0.111" tempfile = "3.9.0" thiserror = "1.0.56" tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "process"] } +tokio-util = { version = "0.7.10", features = ["codec"] } toml = "0.8" tower-http = { version = "0.5.1", features = ["fs"] } tracing = "0.1" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 3932939f..6ed54834 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -7,7 +7,9 @@ edition.workspace = true [dependencies] axum.workspace = true byte-unit.workspace = true +bytes.workspace = true clap.workspace = true +futures.workspace = true metrics.workspace = true metrics-exporter-prometheus.workspace = true nix.workspace = true @@ -15,8 +17,8 @@ regex.workspace = true satori-common.workspace = true serde.workspace = true serde_with.workspace = true -tempfile.workspace = true tokio.workspace = true +tokio-util.workspace = true tower-http.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/agent/src/ffmpeg/streamer.rs b/agent/src/ffmpeg/streamer.rs index 77580a8e..73686a5b 100644 --- a/agent/src/ffmpeg/streamer.rs +++ b/agent/src/ffmpeg/streamer.rs @@ -1,47 +1,55 @@ -use crate::config::Config; +use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder}; +use futures::StreamExt; use nix::{ sys::signal::{self, Signal}, unistd::{self, Pid}, }; use std::{ - path::{Path, PathBuf}, process::Stdio, sync::{Arc, Mutex}, }; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, + sync::broadcast, task::JoinHandle, }; -use tracing::{debug, info, warn}; +use tokio_util::codec::FramedRead; +use tracing::{debug, error, info, warn}; const HLS_PLAYLIST_FILENAME: &str = "stream.m3u8"; pub(crate) struct Streamer { config: Config, - frame_file: PathBuf, terminate: Arc>, ffmpeg_pid: Arc>>, handle: Option>, + jpeg_tx: broadcast::Sender, } impl Streamer { - pub(crate) fn new(config: Config, frame_file: &Path) -> Self { + pub(crate) fn new(config: Config) -> Self { + let (tx, _) = broadcast::channel(8); + Self { config, - frame_file: frame_file.to_owned(), terminate: Arc::new(Mutex::new(false)), ffmpeg_pid: Default::default(), handle: None, + jpeg_tx: tx, } } + pub(crate) fn jpeg_subscribe(&self) -> broadcast::Receiver { + self.jpeg_tx.subscribe() + } + #[tracing::instrument(skip_all)] pub(crate) async fn start(&mut self) { let config = self.config.clone(); - let frame_file = self.frame_file.clone(); let ffmpeg_pid = self.ffmpeg_pid.clone(); let terminate = self.terminate.clone(); + let jpeg_tx = self.jpeg_tx.clone(); self.handle = Some(tokio::spawn(async move { loop { @@ -79,9 +87,11 @@ impl Streamer { // Output preview frames as JPEG .arg("-vf") .arg("fps=1") + .arg("-f") + .arg("image2") .arg("-update") .arg("1") - .arg(&frame_file) + .arg("pipe:1") // Do nothing with stdin .stdin(Stdio::null()) // Capture stdout and stderr @@ -113,26 +123,33 @@ impl Streamer { metrics::counter!(crate::METRIC_FFMPEG_INVOCATIONS, 1); let stdout = ffmpeg_process.stdout.take().unwrap(); - let stderr = ffmpeg_process.stderr.take().unwrap(); + let mut stdout_frame = FramedRead::new(stdout, JpegFrameDecoder); - let mut stdout_reader = BufReader::new(stdout).lines(); + let stderr = ffmpeg_process.stderr.take().unwrap(); let mut stderr_reader = BufReader::new(stderr).lines(); loop { tokio::select! { - // Output stdout to log with prefix - line = stdout_reader.next_line() => { - match line { - Ok(Some(line)) => info!("ffmpeg stdout: {line}"), - Err(_) => break, - _ => (), + // Handle JPEG data via stdout + Some(frame) = stdout_frame.next() => { + match frame { + Ok(frame) => { + debug!("Got JPEG frame ({} bytes)", frame.len()); + if let Err(e ) = jpeg_tx.send(frame) { + error!("JPEG frame channel error: {}", e); + } + } + Err(e) => error!("ffmpeg stdout frame errror: {:?}", e), } } // Output stderr to log with prefix line = stderr_reader.next_line() => { match line { Ok(Some(line)) => info!("ffmpeg stderr: {line}"), - Err(_) => break, + Err(e) => { + warn!("ffmpeg stderr closed: {e}"); + break; + }, _ => (), } } diff --git a/agent/src/jpeg_frame_decoder.rs b/agent/src/jpeg_frame_decoder.rs new file mode 100644 index 00000000..ea9197c9 --- /dev/null +++ b/agent/src/jpeg_frame_decoder.rs @@ -0,0 +1,98 @@ +use bytes::{Buf, Bytes, BytesMut}; +use tokio_util::codec::Decoder; + +const JPEG_EOI_LENGTH: usize = 2; + +#[derive(Default)] +pub(crate) struct JpegFrameDecoder; + +impl Decoder for JpegFrameDecoder { + type Item = Bytes; + type Error = std::io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if let Some(idx) = find_first_jpeg_eoi(buf) { + let image_buf = buf.copy_to_bytes(idx + JPEG_EOI_LENGTH); + Ok(Some(image_buf)) + } else { + Ok(None) + } + } +} + +fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option { + if bytes.len() < JPEG_EOI_LENGTH { + None + } else { + (0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn decoder_none() { + let mut decoder = JpegFrameDecoder; + + let data = [0xFF, 0xD8, 1, 1, 1]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + assert!(res.unwrap().is_none()); + } + + #[test] + fn decoder_one() { + let mut decoder = JpegFrameDecoder; + + let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); + } + + #[test] + fn decoder_multiple() { + let mut decoder = JpegFrameDecoder; + + let data = [ + 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, + ]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]); + } + + #[test] + fn find_first_jpeg_eoi_none() { + let data = [0xFF, 0xD8, 1, 1, 1]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, None) + } + + #[test] + fn find_first_jpeg_eoi_single() { + let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, Some(5)) + } + + #[test] + fn find_first_jpeg_eoi_multiple() { + let data = [ + 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, + ]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, Some(5)) + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 2ecf5a32..998dcd73 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,17 +1,34 @@ mod config; mod ffmpeg; -mod server; +mod jpeg_frame_decoder; mod utils; +use axum::{ + http::header, + response::{Html, IntoResponse}, + routing::get, + Router, +}; +use bytes::Bytes; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; -use std::{fs, net::SocketAddr, path::PathBuf, time::Duration}; +use std::{ + fs, + net::SocketAddr, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{net::TcpListener, task::JoinSet}; +use tower_http::services::ServeDir; use tracing::{debug, info, warn}; const METRIC_DISK_USAGE: &str = "satori_agent_disk_usage"; const METRIC_FFMPEG_INVOCATIONS: &str = "satori_agent_ffmpeg_invocations"; const METRIC_SEGMENTS: &str = "satori_agent_segments"; +type SharedImageData = Arc>>; + /// Run the camera agent. /// /// Handles restreaming a single camera as HLS with history. @@ -68,33 +85,64 @@ async fn main() { // Create video output directory fs::create_dir_all(&config.video_directory).expect("should be able to create output directory"); - // Generate a random filename for the frame JPEG - let frame_file = tempfile::Builder::new() - .prefix("satori-") - .suffix(".jpg") - .tempfile() - .unwrap(); - - // Start HTTP server - let mut server = server::Server::new( - cli.http_server_address, - config.clone(), - frame_file.path().to_owned(), - ) - .await; - // Start streamer - let mut streamer = ffmpeg::Streamer::new(config.clone(), frame_file.path()); + let mut streamer = ffmpeg::Streamer::new(config.clone()); streamer.start().await; - let mut metrics_interval = tokio::time::interval(Duration::from_secs(30)); + let mut tasks = JoinSet::<()>::new(); + // Configure HTTP server listener + let listener = TcpListener::bind(&cli.http_server_address) + .await + .unwrap_or_else(|_| panic!("tcp listener should bind to {}", cli.http_server_address)); + + let frame_image = SharedImageData::default(); + + // Configure HTTP server routes + let app = { + let frame_image = frame_image.clone(); + + Router::new() + .route("/player", get(Html(include_str!("player.html")))) + .route( + "/frame.jpg", + get(move || async move { + match frame_image.lock().unwrap().as_ref() { + Some(image) => { + ([(header::CONTENT_TYPE, "image/jpeg")], image.clone()).into_response() + } + None => axum::http::StatusCode::NOT_FOUND.into_response(), + } + }), + ) + .nest_service("/", ServeDir::new(config.video_directory.clone())) + }; + + // Start HTTP server + info!("Starting HTTP server on {}", cli.http_server_address); + tasks.spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let mut metrics_interval = tokio::time::interval(Duration::from_secs(30)); + let mut jpeg_rx = streamer.jpeg_subscribe(); loop { tokio::select! { + Ok(image) = jpeg_rx.recv() => { + frame_image.lock().unwrap().replace(image); + } _ = metrics_interval.tick() => { update_segment_count_metric(&config); update_disk_usage_metric(&config); } + task = tasks.join_next() => { + match task { + None => tokio::time::sleep(Duration::from_secs(5)).await, + Some(task) => { + info!("Task exits: {:?}", task); + } + } + } _ = tokio::signal::ctrl_c() => { info!("Exiting"); break; @@ -105,8 +153,7 @@ async fn main() { // Stop streamer streamer.stop().await; - // Stop HTTP server - server.stop().await; + tasks.shutdown().await; } #[tracing::instrument(skip_all)] diff --git a/agent/src/server.rs b/agent/src/server.rs deleted file mode 100644 index 8140b7eb..00000000 --- a/agent/src/server.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::config::Config; -use axum::{response::Html, routing::get, Router}; -use std::{net::SocketAddr, path::PathBuf}; -use tokio::{net::TcpListener, task::JoinHandle}; -use tower_http::services::{ServeDir, ServeFile}; -use tracing::info; - -pub(super) struct Server { - handle: Option>, -} - -impl Server { - pub(super) async fn new(address: SocketAddr, config: Config, frame_file: PathBuf) -> Self { - let listener = TcpListener::bind(&address) - .await - .unwrap_or_else(|_| panic!("tcp listener should bind to {address}")); - - let app = Router::new() - .route("/player", get(Html(include_str!("player.html")))) - .nest_service("/frame.jpg", ServeFile::new(frame_file)) - .nest_service("/", ServeDir::new(config.video_directory)); - - info!("Starting HTTP server on {}", address); - let handle = Some(tokio::spawn(async move { - axum::serve(listener, app).await.unwrap(); - })); - - Self { handle } - } - - pub(super) async fn stop(&mut self) { - info!("Stopping HTTP server"); - let handle = self.handle.take().unwrap(); - handle.abort(); - let _ = handle.await; - } -}