Skip to content

Commit 389c2a4

Browse files
committed
Improve ffmpeg stdout/stderr handling in agent
Output to log with sensible prefix rather than inheriting parent stdout/stderr and somewhat polluting output.
1 parent ee0ca9e commit 389c2a4

File tree

1 file changed

+50
-13
lines changed

1 file changed

+50
-13
lines changed

agent/src/ffmpeg/streamer.rs

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ use std::{
99
process::Stdio,
1010
sync::{Arc, Mutex},
1111
};
12-
use tokio::{process::Command, task::JoinHandle};
12+
use tokio::{
13+
io::{AsyncBufReadExt, BufReader},
14+
process::Command,
15+
task::JoinHandle,
16+
};
1317
use tracing::{debug, info, warn};
1418

1519
const HLS_PLAYLIST_FILENAME: &str = "stream.m3u8";
@@ -84,6 +88,9 @@ impl Streamer {
8488
.arg(&frame_file)
8589
// Do nothing with stdin
8690
.stdin(Stdio::null())
91+
// Capture stdout and stderr
92+
.stdout(Stdio::piped())
93+
.stderr(Stdio::piped())
8794
// Call setsid, required for correct exit signal handling
8895
.pre_exec(|| {
8996
unistd::setsid()?;
@@ -95,22 +102,52 @@ impl Streamer {
95102
debug!("ffmpeg process: {:?}", ffmpeg_process);
96103

97104
// Get and store the ffmpeg PID
98-
*ffmpeg_pid
99-
.lock()
100-
.expect("ffmpeg PID lock acquire should not fail") = Some(Pid::from_raw(
101-
ffmpeg_process
102-
.id()
103-
.expect("ffmpeg process should have a PID") as i32,
104-
));
105-
info!("ffmpeg PID: {:?}", ffmpeg_pid);
105+
*ffmpeg_pid.lock().unwrap() = {
106+
let pid = Pid::from_raw(
107+
ffmpeg_process
108+
.id()
109+
.expect("ffmpeg process should have a PID")
110+
as i32,
111+
);
112+
info!("ffmpeg PID: {:?}", pid);
113+
Some(pid)
114+
};
106115

107116
// Increment ffmpeg invocation count
108117
ffmpeg_invocations_metric.inc();
109118

110-
// Wait for ffmpeg process to exit
111-
let result = ffmpeg_process.wait().await;
112-
info!("ffmpeg exited, ok={}", result.is_ok());
113-
*ffmpeg_pid.lock().unwrap() = None;
119+
let stdout = ffmpeg_process.stdout.take().unwrap();
120+
let stderr = ffmpeg_process.stderr.take().unwrap();
121+
122+
let mut stdout_reader = BufReader::new(stdout).lines();
123+
let mut stderr_reader = BufReader::new(stderr).lines();
124+
125+
loop {
126+
tokio::select! {
127+
// Output stdout to log with prefix
128+
line = stdout_reader.next_line() => {
129+
match line {
130+
Ok(Some(line)) => info!("ffmpeg stdout: {line}"),
131+
Err(_) => break,
132+
_ => (),
133+
}
134+
}
135+
// Output stderr to log with prefix
136+
line = stderr_reader.next_line() => {
137+
match line {
138+
Ok(Some(line)) => info!("ffmpeg stderr: {line}"),
139+
Err(_) => break,
140+
_ => (),
141+
}
142+
}
143+
// Wait for ffmpeg process to exit
144+
result = ffmpeg_process.wait() => {
145+
info!("ffmpeg exited, ok={}", result.is_ok());
146+
*ffmpeg_pid.lock().unwrap() = None;
147+
break;
148+
}
149+
}
150+
}
114151

115152
let expected_shutdown = *terminate.lock().unwrap();
116153
if expected_shutdown {

0 commit comments

Comments
 (0)