From 5248f5444dc73293cc68a8055d708fb1d6c2ccca Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Sun, 21 Jan 2024 18:47:05 +0000 Subject: [PATCH] tidy --- agent/src/ffmpeg/streamer.rs | 5 +- agent/src/jpeg_frame_decoder.rs | 101 +++++++++++++++++++++++++++++++ agent/src/main.rs | 12 +++- agent/src/utils.rs | 102 -------------------------------- 4 files changed, 113 insertions(+), 107 deletions(-) create mode 100644 agent/src/jpeg_frame_decoder.rs diff --git a/agent/src/ffmpeg/streamer.rs b/agent/src/ffmpeg/streamer.rs index 7cde90b1..c6388d0a 100644 --- a/agent/src/ffmpeg/streamer.rs +++ b/agent/src/ffmpeg/streamer.rs @@ -1,4 +1,4 @@ -use crate::config::Config; +use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder}; use futures::StreamExt; use nix::{ sys::signal::{self, Signal}, @@ -126,8 +126,7 @@ impl Streamer { metrics::counter!(crate::METRIC_FFMPEG_INVOCATIONS, 1); let stdout = ffmpeg_process.stdout.take().unwrap(); - let mut stdout_frame = - FramedRead::new(stdout, crate::utils::JpegFrameDecoder::default()); + let mut stdout_frame = FramedRead::new(stdout, JpegFrameDecoder::default()); let stderr = ffmpeg_process.stderr.take().unwrap(); let mut stderr_reader = BufReader::new(stderr).lines(); diff --git a/agent/src/jpeg_frame_decoder.rs b/agent/src/jpeg_frame_decoder.rs new file mode 100644 index 00000000..9ad143f9 --- /dev/null +++ b/agent/src/jpeg_frame_decoder.rs @@ -0,0 +1,101 @@ +use bytes::{Buf, Bytes, BytesMut}; +use tokio_util::codec::Decoder; + +pub(crate) struct JpegFrameDecoder; + +impl JpegFrameDecoder { + pub(crate) fn default() -> Self { + Self {} + } +} + +impl Decoder for JpegFrameDecoder { + type Item = Bytes; + type Error = std::io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if let Some(idx) = find_first_jpeg_eoi(buf) { + let image_buf = buf.copy_to_bytes(idx + 2); + Ok(Some(image_buf)) + } else { + Ok(None) + } + } +} + +fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option { + if bytes.len() < 2 { + return None; + } + + (0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn decoder_none() { + let mut decoder = JpegFrameDecoder::default(); + + let data = [0xFF, 0xD8, 1, 1, 1]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + assert!(res.unwrap().is_none()); + } + + #[test] + fn decoder_one() { + let mut decoder = JpegFrameDecoder::default(); + + let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); + } + + #[test] + fn decoder_multiple() { + let mut decoder = JpegFrameDecoder::default(); + + let data = [ + 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, + ]; + let mut data = BytesMut::from(&data[..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); + + let res = decoder.decode(&mut data); + let res = res.unwrap().unwrap(); + assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]); + } + + #[test] + fn find_first_jpeg_eoi_none() { + let data = [0xFF, 0xD8, 1, 1, 1]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, None) + } + + #[test] + fn find_first_jpeg_eoi_single() { + let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, Some(5)) + } + + #[test] + fn find_first_jpeg_eoi_multiple() { + let data = [ + 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, + ]; + let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); + assert_eq!(pos, Some(5)) + } +} diff --git a/agent/src/main.rs b/agent/src/main.rs index 2c0f3dc6..998dcd73 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,5 +1,6 @@ mod config; mod ffmpeg; +mod jpeg_frame_decoder; mod utils; use axum::{ @@ -8,9 +9,16 @@ use axum::{ routing::get, Router, }; +use bytes::Bytes; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; -use std::{fs, net::SocketAddr, path::PathBuf, time::Duration}; +use std::{ + fs, + net::SocketAddr, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::{net::TcpListener, task::JoinSet}; use tower_http::services::ServeDir; use tracing::{debug, info, warn}; @@ -19,7 +27,7 @@ const METRIC_DISK_USAGE: &str = "satori_agent_disk_usage"; const METRIC_FFMPEG_INVOCATIONS: &str = "satori_agent_ffmpeg_invocations"; const METRIC_SEGMENTS: &str = "satori_agent_segments"; -type SharedImageData = std::sync::Arc>>; +type SharedImageData = Arc>>; /// Run the camera agent. /// diff --git a/agent/src/utils.rs b/agent/src/utils.rs index 8668570f..a12cb1d2 100644 --- a/agent/src/utils.rs +++ b/agent/src/utils.rs @@ -19,105 +19,3 @@ where Ok(Byte::from_bytes(result)) } - -use bytes::{Buf, Bytes, BytesMut}; -use tokio_util::codec::Decoder; - -pub(crate) struct JpegFrameDecoder; - -impl JpegFrameDecoder { - pub(crate) fn default() -> Self { - Self {} - } -} - -impl Decoder for JpegFrameDecoder { - type Item = Bytes; - type Error = std::io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - if let Some(idx) = find_first_jpeg_eoi(buf) { - let image_buf = buf.copy_to_bytes(idx + 2); - Ok(Some(image_buf)) - } else { - Ok(None) - } - } -} - -fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option { - if bytes.len() < 2 { - return None; - } - - (0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn decoder_none() { - let mut decoder = JpegFrameDecoder::default(); - - let data = [0xFF, 0xD8, 1, 1, 1]; - let mut data = BytesMut::from(&data[..]); - - let res = decoder.decode(&mut data); - assert!(res.unwrap().is_none()); - } - - #[test] - fn decoder_one() { - let mut decoder = JpegFrameDecoder::default(); - - let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; - let mut data = BytesMut::from(&data[..]); - - let res = decoder.decode(&mut data); - let res = res.unwrap().unwrap(); - assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); - } - - #[test] - fn decoder_multiple() { - let mut decoder = JpegFrameDecoder::default(); - - let data = [ - 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, - ]; - let mut data = BytesMut::from(&data[..]); - - let res = decoder.decode(&mut data); - let res = res.unwrap().unwrap(); - assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]); - - let res = decoder.decode(&mut data); - let res = res.unwrap().unwrap(); - assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]); - } - - #[test] - fn find_first_jpeg_eoi_none() { - let data = [0xFF, 0xD8, 1, 1, 1]; - let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); - assert_eq!(pos, None) - } - - #[test] - fn find_first_jpeg_eoi_single() { - let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9]; - let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); - assert_eq!(pos, Some(5)) - } - - #[test] - fn find_first_jpeg_eoi_multiple() { - let data = [ - 0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9, - ]; - let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..])); - assert_eq!(pos, Some(5)) - } -}