Skip to content

Commit

Permalink
tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
DanNixon committed Jan 21, 2024
1 parent 8b09ea4 commit 5248f54
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 107 deletions.
5 changes: 2 additions & 3 deletions agent/src/ffmpeg/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::Config;
use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder};
use futures::StreamExt;
use nix::{
sys::signal::{self, Signal},
Expand Down Expand Up @@ -126,8 +126,7 @@ impl Streamer {
metrics::counter!(crate::METRIC_FFMPEG_INVOCATIONS, 1);

let stdout = ffmpeg_process.stdout.take().unwrap();
let mut stdout_frame =
FramedRead::new(stdout, crate::utils::JpegFrameDecoder::default());
let mut stdout_frame = FramedRead::new(stdout, JpegFrameDecoder::default());

let stderr = ffmpeg_process.stderr.take().unwrap();
let mut stderr_reader = BufReader::new(stderr).lines();
Expand Down
101 changes: 101 additions & 0 deletions agent/src/jpeg_frame_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use bytes::{Buf, Bytes, BytesMut};
use tokio_util::codec::Decoder;

pub(crate) struct JpegFrameDecoder;

impl JpegFrameDecoder {
pub(crate) fn default() -> Self {
Self {}
}
}

impl Decoder for JpegFrameDecoder {
type Item = Bytes;
type Error = std::io::Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(idx) = find_first_jpeg_eoi(buf) {
let image_buf = buf.copy_to_bytes(idx + 2);
Ok(Some(image_buf))
} else {
Ok(None)
}
}
}

fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option<usize> {
if bytes.len() < 2 {
return None;
}

(0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn decoder_none() {
let mut decoder = JpegFrameDecoder::default();

let data = [0xFF, 0xD8, 1, 1, 1];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
assert!(res.unwrap().is_none());
}

#[test]
fn decoder_one() {
let mut decoder = JpegFrameDecoder::default();

let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);
}

#[test]
fn decoder_multiple() {
let mut decoder = JpegFrameDecoder::default();

let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]);
}

#[test]
fn find_first_jpeg_eoi_none() {
let data = [0xFF, 0xD8, 1, 1, 1];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, None)
}

#[test]
fn find_first_jpeg_eoi_single() {
let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}

#[test]
fn find_first_jpeg_eoi_multiple() {
let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}
}
12 changes: 10 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod config;
mod ffmpeg;
mod jpeg_frame_decoder;
mod utils;

use axum::{
Expand All @@ -8,9 +9,16 @@ use axum::{
routing::get,
Router,
};
use bytes::Bytes;
use clap::Parser;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::{fs, net::SocketAddr, path::PathBuf, time::Duration};
use std::{
fs,
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{net::TcpListener, task::JoinSet};
use tower_http::services::ServeDir;
use tracing::{debug, info, warn};
Expand All @@ -19,7 +27,7 @@ const METRIC_DISK_USAGE: &str = "satori_agent_disk_usage";
const METRIC_FFMPEG_INVOCATIONS: &str = "satori_agent_ffmpeg_invocations";
const METRIC_SEGMENTS: &str = "satori_agent_segments";

type SharedImageData = std::sync::Arc<std::sync::Mutex<Option<bytes::Bytes>>>;
type SharedImageData = Arc<Mutex<Option<Bytes>>>;

/// Run the camera agent.
///
Expand Down
102 changes: 0 additions & 102 deletions agent/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,105 +19,3 @@ where

Ok(Byte::from_bytes(result))
}

use bytes::{Buf, Bytes, BytesMut};
use tokio_util::codec::Decoder;

pub(crate) struct JpegFrameDecoder;

impl JpegFrameDecoder {
pub(crate) fn default() -> Self {
Self {}
}
}

impl Decoder for JpegFrameDecoder {
type Item = Bytes;
type Error = std::io::Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(idx) = find_first_jpeg_eoi(buf) {
let image_buf = buf.copy_to_bytes(idx + 2);
Ok(Some(image_buf))
} else {
Ok(None)
}
}
}

fn find_first_jpeg_eoi(bytes: &BytesMut) -> Option<usize> {
if bytes.len() < 2 {
return None;
}

(0..bytes.len() - 1).find(|&i| bytes[i] == 0xFF && bytes[i + 1] == 0xD9)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn decoder_none() {
let mut decoder = JpegFrameDecoder::default();

let data = [0xFF, 0xD8, 1, 1, 1];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
assert!(res.unwrap().is_none());
}

#[test]
fn decoder_one() {
let mut decoder = JpegFrameDecoder::default();

let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);
}

#[test]
fn decoder_multiple() {
let mut decoder = JpegFrameDecoder::default();

let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let mut data = BytesMut::from(&data[..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 1, 1, 1, 0xFF, 0xD9][..]);

let res = decoder.decode(&mut data);
let res = res.unwrap().unwrap();
assert_eq!(res, [0xFFu8, 0xD8, 2, 2, 2, 0xFF, 0xD9][..]);
}

#[test]
fn find_first_jpeg_eoi_none() {
let data = [0xFF, 0xD8, 1, 1, 1];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, None)
}

#[test]
fn find_first_jpeg_eoi_single() {
let data = [0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}

#[test]
fn find_first_jpeg_eoi_multiple() {
let data = [
0xFF, 0xD8, 1, 1, 1, 0xFF, 0xD9, 0xFF, 0xD8, 2, 2, 2, 0xFF, 0xD9,
];
let pos = find_first_jpeg_eoi(&BytesMut::from(&data[..]));
assert_eq!(pos, Some(5))
}
}

0 comments on commit 5248f54

Please sign in to comment.