Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MJPEG endpoint in agent #96

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ serde_json = "1.0.111"
tempfile = "3.9.0"
thiserror = "1.0.56"
tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "process"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
toml = "0.8"
tower-http = { version = "0.5.1", features = ["fs"] }
Expand Down
1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ satori-common.workspace = true
serde.workspace = true
serde_with.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tower-http.workspace = true
tracing.workspace = true
Expand Down
15 changes: 5 additions & 10 deletions agent/src/ffmpeg/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder};
use bytes::Bytes;
use futures::StreamExt;
use nix::{
sys::signal::{self, Signal},
Expand All @@ -11,7 +12,7 @@ use std::{
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::broadcast,
sync::broadcast::Sender,
task::JoinHandle,
};
use tokio_util::codec::FramedRead;
Expand All @@ -24,26 +25,20 @@ pub(crate) struct Streamer {
terminate: Arc<Mutex<bool>>,
ffmpeg_pid: Arc<Mutex<Option<Pid>>>,
handle: Option<JoinHandle<()>>,
jpeg_tx: broadcast::Sender<bytes::Bytes>,
jpeg_tx: Sender<Bytes>,
}

impl Streamer {
pub(crate) fn new(config: Config) -> Self {
let (tx, _) = broadcast::channel(8);

pub(crate) fn new(config: Config, jpeg_tx: Sender<Bytes>) -> Self {
Self {
config,
terminate: Arc::new(Mutex::new(false)),
ffmpeg_pid: Default::default(),
handle: None,
jpeg_tx: tx,
jpeg_tx,
}
}

pub(crate) fn jpeg_subscribe(&self) -> broadcast::Receiver<bytes::Bytes> {
self.jpeg_tx.subscribe()
}

#[tracing::instrument(skip_all)]
pub(crate) async fn start(&mut self) {
let config = self.config.clone();
Expand Down
59 changes: 41 additions & 18 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ mod jpeg_frame_decoder;
mod utils;

use axum::{
body::Body,
http::header,
response::{Html, IntoResponse},
routing::get,
Router,
};
use bytes::Bytes;
use bytes::{BufMut, Bytes};
use clap::Parser;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::{
Expand All @@ -19,7 +20,8 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{net::TcpListener, task::JoinSet};
use tokio::net::TcpListener;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::services::ServeDir;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -85,27 +87,30 @@ async fn main() {
// Create video output directory
fs::create_dir_all(&config.video_directory).expect("should be able to create output directory");

// Channel for JPEG frames
let (jpeg_tx, mut jpeg_rx) = tokio::sync::broadcast::channel(8);

// Start streamer
let mut streamer = ffmpeg::Streamer::new(config.clone());
let mut streamer = ffmpeg::Streamer::new(config.clone(), jpeg_tx);
streamer.start().await;

let mut tasks = JoinSet::<()>::new();

// Configure HTTP server listener
let listener = TcpListener::bind(&cli.http_server_address)
.await
.unwrap_or_else(|_| panic!("tcp listener should bind to {}", cli.http_server_address));

// Configure HTTP server endpoints
let frame_image = SharedImageData::default();
let (jpeg_multipart_tx, _) = tokio::sync::broadcast::channel::<Bytes>(8);

// Configure HTTP server routes
let app = {
let frame_image = frame_image.clone();
let jpeg_multipart_tx = jpeg_multipart_tx.clone();

Router::new()
.route("/player", get(Html(include_str!("player.html"))))
.route(
"/frame.jpg",
"/jpeg",
get(move || async move {
match frame_image.lock().unwrap().as_ref() {
Some(image) => {
Expand All @@ -115,34 +120,49 @@ async fn main() {
}
}),
)
.route(
"/mjpeg",
get(move || async move {
let stream = BroadcastStream::new(jpeg_multipart_tx.subscribe());
let body = Body::from_stream(stream);

(
[(
header::CONTENT_TYPE,
"multipart/x-mixed-replace; boundary=frame",
)],
body,
)
.into_response()
}),
)
.nest_service("/", ServeDir::new(config.video_directory.clone()))
};

// Start HTTP server
info!("Starting HTTP server on {}", cli.http_server_address);
tasks.spawn(async move {
let server_handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let mut metrics_interval = tokio::time::interval(Duration::from_secs(30));
let mut jpeg_rx = streamer.jpeg_subscribe();
loop {
tokio::select! {
Ok(image) = jpeg_rx.recv() => {
let mut body = bytes::BytesMut::new();
body.put_slice(b"--frame\r\n");
body.put_slice(format!("{}: image/jpeg\r\n", header::CONTENT_TYPE).as_bytes());
body.put_slice(format!("{}: {}\r\n", header::CONTENT_LENGTH, image.len()).as_bytes());
body.put_slice(b"\r\n");
body.put_slice(&image);
let _ = jpeg_multipart_tx.send(body.into());

frame_image.lock().unwrap().replace(image);
}
_ = metrics_interval.tick() => {
update_segment_count_metric(&config);
update_disk_usage_metric(&config);
}
task = tasks.join_next() => {
match task {
None => tokio::time::sleep(Duration::from_secs(5)).await,
Some(task) => {
info!("Task exits: {:?}", task);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Exiting");
break;
Expand All @@ -153,7 +173,10 @@ async fn main() {
// Stop streamer
streamer.stop().await;

tasks.shutdown().await;
// Stop server
info!("Stopping HTTP server");
server_handle.abort();
let _ = server_handle.await;
}

#[tracing::instrument(skip_all)]
Expand Down