Skip to content

Commit

Permalink
Refactor JPEG handling in agent
Browse files Browse the repository at this point in the history
Removes the need for an intermediate file by reading JPEG data from
ffmpeg via stdout.

Also makes way for MJPEG endpoint.
  • Loading branch information
DanNixon committed Jan 22, 2024
1 parent efc41c8 commit acc9436
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 77 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ serde_json = "1.0.111"
tempfile = "3.9.0"
thiserror = "1.0.56"
tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "process"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
toml = "0.8"
tower-http = { version = "0.5.1", features = ["fs"] }
tracing = "0.1"
Expand Down
4 changes: 3 additions & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ edition.workspace = true
[dependencies]
axum.workspace = true
byte-unit.workspace = true
bytes.workspace = true
clap.workspace = true
futures.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
nix.workspace = true
regex.workspace = true
satori-common.workspace = true
serde.workspace = true
serde_with.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower-http.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Expand Down
51 changes: 34 additions & 17 deletions agent/src/ffmpeg/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,55 @@
use crate::config::Config;
use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder};
use futures::StreamExt;
use nix::{
sys::signal::{self, Signal},
unistd::{self, Pid},
};
use std::{
path::{Path, PathBuf},
process::Stdio,
sync::{Arc, Mutex},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::broadcast,
task::JoinHandle,
};
use tracing::{debug, info, warn};
use tokio_util::codec::FramedRead;
use tracing::{debug, error, info, warn};

const HLS_PLAYLIST_FILENAME: &str = "stream.m3u8";

pub(crate) struct Streamer {
config: Config,
frame_file: PathBuf,
terminate: Arc<Mutex<bool>>,
ffmpeg_pid: Arc<Mutex<Option<Pid>>>,
handle: Option<JoinHandle<()>>,
jpeg_tx: broadcast::Sender<bytes::Bytes>,
}

impl Streamer {
pub(crate) fn new(config: Config, frame_file: &Path) -> Self {
pub(crate) fn new(config: Config) -> Self {
let (tx, _) = broadcast::channel(8);

Self {
config,
frame_file: frame_file.to_owned(),
terminate: Arc::new(Mutex::new(false)),
ffmpeg_pid: Default::default(),
handle: None,
jpeg_tx: tx,
}
}

pub(crate) fn jpeg_subscribe(&self) -> broadcast::Receiver<bytes::Bytes> {
self.jpeg_tx.subscribe()
}

#[tracing::instrument(skip_all)]
pub(crate) async fn start(&mut self) {
let config = self.config.clone();
let frame_file = self.frame_file.clone();
let ffmpeg_pid = self.ffmpeg_pid.clone();
let terminate = self.terminate.clone();
let jpeg_tx = self.jpeg_tx.clone();

self.handle = Some(tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -79,9 +87,11 @@ impl Streamer {
// Output preview frames as JPEG
.arg("-vf")
.arg("fps=1")
.arg("-f")
.arg("image2")
.arg("-update")
.arg("1")
.arg(&frame_file)
.arg("pipe:1")
// Do nothing with stdin
.stdin(Stdio::null())
// Capture stdout and stderr
Expand Down Expand Up @@ -113,26 +123,33 @@ impl Streamer {
metrics::counter!(crate::METRIC_FFMPEG_INVOCATIONS, 1);

let stdout = ffmpeg_process.stdout.take().unwrap();
let stderr = ffmpeg_process.stderr.take().unwrap();
let mut stdout_frame = FramedRead::new(stdout, JpegFrameDecoder);

let mut stdout_reader = BufReader::new(stdout).lines();
let stderr = ffmpeg_process.stderr.take().unwrap();
let mut stderr_reader = BufReader::new(stderr).lines();

loop {
tokio::select! {
// Output stdout to log with prefix
line = stdout_reader.next_line() => {
match line {
Ok(Some(line)) => info!("ffmpeg stdout: {line}"),
Err(_) => break,
_ => (),
// Handle JPEG data via stdout
Some(frame) = stdout_frame.next() => {
match frame {
Ok(frame) => {
debug!("Got JPEG frame ({} bytes)", frame.len());
if let Err(e ) = jpeg_tx.send(frame) {
error!("JPEG frame channel error: {}", e);
}
}
Err(e) => error!("ffmpeg stdout frame errror: {:?}", e),
}
}
// Output stderr to log with prefix
line = stderr_reader.next_line() => {
match line {
Ok(Some(line)) => info!("ffmpeg stderr: {line}"),
Err(_) => break,
Err(e) => {
warn!("ffmpeg stderr closed: {e}");
break;
},
_ => (),
}
}
Expand Down
98 changes: 98 additions & 0 deletions agent/src/jpeg_frame_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use bytes::{Buf, Bytes, BytesMut};
use tokio_util::codec::Decoder;

const JPEG_EOI_LENGTH: usize = 2;

#[derive(Default)]
pub(crate) struct JpegFrameDecoder;

impl Decoder for JpegFrameDecoder {
type Item = Bytes;
type Error = std::io::Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(idx) = find_first_jpeg_eoi(buf) {
let image_buf = buf.copy_to_bytes(idx + JPEG_EOI_LENGTH);
Ok(Some(image_buf))
} else {
Ok(None)
}
}
}

fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option<usize> {
if bytes.len() < JPEG_EOI_LENGTH {
None
} else {
(0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9)
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn decoder_none() {
let mut decoder = JpegFrameDecoder;

let data = [0xFF, 0xD8, 1, 1, 1];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
assert!(res.unwrap().is_none());
}

#[test]
fn decoder_one() {
let mut decoder = JpegFrameDecoder;

let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);
}

#[test]
fn decoder_multiple() {
let mut decoder = JpegFrameDecoder;

let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]);
}

#[test]
fn find_first_jpeg_eoi_none() {
let data = [0xFF, 0xD8, 1, 1, 1];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, None)
}

#[test]
fn find_first_jpeg_eoi_single() {
let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}

#[test]
fn find_first_jpeg_eoi_multiple() {
let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}
}
89 changes: 68 additions & 21 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
mod config;
mod ffmpeg;
mod server;
mod jpeg_frame_decoder;
mod utils;

use axum::{
http::header,
response::{Html, IntoResponse},
routing::get,
Router,
};
use bytes::Bytes;
use clap::Parser;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::{fs, net::SocketAddr, path::PathBuf, time::Duration};
use std::{
fs,
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{net::TcpListener, task::JoinSet};
use tower_http::services::ServeDir;
use tracing::{debug, info, warn};

const METRIC_DISK_USAGE: &str = "satori_agent_disk_usage";
const METRIC_FFMPEG_INVOCATIONS: &str = "satori_agent_ffmpeg_invocations";
const METRIC_SEGMENTS: &str = "satori_agent_segments";

type SharedImageData = Arc<Mutex<Option<Bytes>>>;

/// Run the camera agent.
///
/// Handles restreaming a single camera as HLS with history.
Expand Down Expand Up @@ -68,33 +85,64 @@ async fn main() {
// Create video output directory
fs::create_dir_all(&config.video_directory).expect("should be able to create output directory");

// Generate a random filename for the frame JPEG
let frame_file = tempfile::Builder::new()
.prefix("satori-")
.suffix(".jpg")
.tempfile()
.unwrap();

// Start HTTP server
let mut server = server::Server::new(
cli.http_server_address,
config.clone(),
frame_file.path().to_owned(),
)
.await;

// Start streamer
let mut streamer = ffmpeg::Streamer::new(config.clone(), frame_file.path());
let mut streamer = ffmpeg::Streamer::new(config.clone());
streamer.start().await;

let mut metrics_interval = tokio::time::interval(Duration::from_secs(30));
let mut tasks = JoinSet::<()>::new();

// Configure HTTP server listener
let listener = TcpListener::bind(&cli.http_server_address)
.await
.unwrap_or_else(|_| panic!("tcp listener should bind to {}", cli.http_server_address));

let frame_image = SharedImageData::default();

// Configure HTTP server routes
let app = {
let frame_image = frame_image.clone();

Router::new()
.route("/player", get(Html(include_str!("player.html"))))
.route(
"/frame.jpg",
get(move || async move {
match frame_image.lock().unwrap().as_ref() {
Some(image) => {
([(header::CONTENT_TYPE, "image/jpeg")], image.clone()).into_response()
}
None => axum::http::StatusCode::NOT_FOUND.into_response(),
}
}),
)
.nest_service("/", ServeDir::new(config.video_directory.clone()))
};

// Start HTTP server
info!("Starting HTTP server on {}", cli.http_server_address);
tasks.spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let mut metrics_interval = tokio::time::interval(Duration::from_secs(30));
let mut jpeg_rx = streamer.jpeg_subscribe();
loop {
tokio::select! {
Ok(image) = jpeg_rx.recv() => {
frame_image.lock().unwrap().replace(image);
}
_ = metrics_interval.tick() => {
update_segment_count_metric(&config);
update_disk_usage_metric(&config);
}
task = tasks.join_next() => {
match task {
None => tokio::time::sleep(Duration::from_secs(5)).await,
Some(task) => {
info!("Task exits: {:?}", task);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Exiting");
break;
Expand All @@ -105,8 +153,7 @@ async fn main() {
// Stop streamer
streamer.stop().await;

// Stop HTTP server
server.stop().await;
tasks.shutdown().await;
}

#[tracing::instrument(skip_all)]
Expand Down
Loading

0 comments on commit acc9436

Please sign in to comment.