Skip to content

Commit

Permalink
Add MJPEG endpoint in agent
Browse files Browse the repository at this point in the history
  • Loading branch information
DanNixon committed Jan 23, 2024
1 parent acc9436 commit eaff144
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ serde_json = "1.0.111"
tempfile = "3.9.0"
thiserror = "1.0.56"
tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "process"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
toml = "0.8"
tower-http = { version = "0.5.1", features = ["fs"] }
Expand Down
1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ satori-common.workspace = true
serde.workspace = true
serde_with.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tower-http.workspace = true
tracing.workspace = true
Expand Down
15 changes: 5 additions & 10 deletions agent/src/ffmpeg/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder};
use bytes::Bytes;
use futures::StreamExt;
use nix::{
sys::signal::{self, Signal},
Expand All @@ -11,7 +12,7 @@ use std::{
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::broadcast,
sync::broadcast::Sender,
task::JoinHandle,
};
use tokio_util::codec::FramedRead;
Expand All @@ -24,26 +25,20 @@ pub(crate) struct Streamer {
terminate: Arc<Mutex<bool>>,
ffmpeg_pid: Arc<Mutex<Option<Pid>>>,
handle: Option<JoinHandle<()>>,
jpeg_tx: broadcast::Sender<bytes::Bytes>,
jpeg_tx: Sender<Bytes>,
}

impl Streamer {
pub(crate) fn new(config: Config) -> Self {
let (tx, _) = broadcast::channel(8);

pub(crate) fn new(config: Config, jpeg_tx: Sender<Bytes>) -> Self {
Self {
config,
terminate: Arc::new(Mutex::new(false)),
ffmpeg_pid: Default::default(),
handle: None,
jpeg_tx: tx,
jpeg_tx,
}
}

pub(crate) fn jpeg_subscribe(&self) -> broadcast::Receiver<bytes::Bytes> {
self.jpeg_tx.subscribe()
}

#[tracing::instrument(skip_all)]
pub(crate) async fn start(&mut self) {
let config = self.config.clone();
Expand Down
59 changes: 41 additions & 18 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ mod jpeg_frame_decoder;
mod utils;

use axum::{
body::Body,
http::header,
response::{Html, IntoResponse},
routing::get,
Router,
};
use bytes::Bytes;
use bytes::{BufMut, Bytes};
use clap::Parser;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::{
Expand All @@ -19,7 +20,8 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{net::TcpListener, task::JoinSet};
use tokio::net::TcpListener;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::services::ServeDir;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -85,27 +87,30 @@ async fn main() {
// Create video output directory
fs::create_dir_all(&config.video_directory).expect("should be able to create output directory");

// Channel for JPEG frames
let (jpeg_tx, mut jpeg_rx) = tokio::sync::broadcast::channel(8);

// Start streamer
let mut streamer = ffmpeg::Streamer::new(config.clone());
let mut streamer = ffmpeg::Streamer::new(config.clone(), jpeg_tx);
streamer.start().await;

let mut tasks = JoinSet::<()>::new();

// Configure HTTP server listener
let listener = TcpListener::bind(&cli.http_server_address)
.await
.unwrap_or_else(|_| panic!("tcp listener should bind to {}", cli.http_server_address));

// Configure HTTP server endpoints
let frame_image = SharedImageData::default();
let (jpeg_multipart_tx, _) = tokio::sync::broadcast::channel::<Bytes>(8);

// Configure HTTP server routes
let app = {
let frame_image = frame_image.clone();
let jpeg_multipart_tx = jpeg_multipart_tx.clone();

Router::new()
.route("/player", get(Html(include_str!("player.html"))))
.route(
"/frame.jpg",
"/jpeg",
get(move || async move {
match frame_image.lock().unwrap().as_ref() {
Some(image) => {
Expand All @@ -115,34 +120,49 @@ async fn main() {
}
}),
)
.route(
"/mjpeg",
get(move || async move {
let stream = BroadcastStream::new(jpeg_multipart_tx.subscribe());
let body = Body::from_stream(stream);

(
[(
header::CONTENT_TYPE,
"multipart/x-mixed-replace; boundary=frame",
)],
body,
)
.into_response()
}),
)
.nest_service("/", ServeDir::new(config.video_directory.clone()))
};

// Start HTTP server
info!("Starting HTTP server on {}", cli.http_server_address);
tasks.spawn(async move {
let server_handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let mut metrics_interval = tokio::time::interval(Duration::from_secs(30));
let mut jpeg_rx = streamer.jpeg_subscribe();
loop {
tokio::select! {
Ok(image) = jpeg_rx.recv() => {
let mut body = bytes::BytesMut::new();
body.put_slice(b"--frame\r\n");
body.put_slice(format!("{}: image/jpeg\r\n", header::CONTENT_TYPE).as_bytes());
body.put_slice(format!("{}: {}\r\n", header::CONTENT_LENGTH, image.len()).as_bytes());
body.put_slice(b"\r\n");
body.put_slice(&image);
let _ = jpeg_multipart_tx.send(body.into());

frame_image.lock().unwrap().replace(image);
}
_ = metrics_interval.tick() => {
update_segment_count_metric(&config);
update_disk_usage_metric(&config);
}
task = tasks.join_next() => {
match task {
None => tokio::time::sleep(Duration::from_secs(5)).await,
Some(task) => {
info!("Task exits: {:?}", task);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Exiting");
break;
Expand All @@ -153,7 +173,10 @@ async fn main() {
// Stop streamer
streamer.stop().await;

tasks.shutdown().await;
// Stop server
info!("Stopping HTTP server");
server_handle.abort();
let _ = server_handle.await;
}

#[tracing::instrument(skip_all)]
Expand Down

0 comments on commit eaff144

Please sign in to comment.