From d9769bb3ca8e977b42096115111302c71206c116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?The=CC=81o=20Monnom?= Date: Wed, 15 Nov 2023 18:39:18 -0500 Subject: [PATCH] fallback to env_logger --- livekit-ffi/protocol/ffi.proto | 1 - livekit-ffi/src/server/logger.rs | 122 +++++++++++++++++------------ livekit-ffi/src/server/mod.rs | 2 +- livekit-ffi/src/server/requests.rs | 13 ++- 4 files changed, 84 insertions(+), 54 deletions(-) diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index c19f84dcd..3f97ff5d6 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -159,7 +159,6 @@ message FfiEvent { message InitializeRequest { uint64 event_callback_ptr = 1; bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord - uint32 max_log_batch_size = 3; // Max number of log records in a single LogBatch } message InitializeResponse {} diff --git a/livekit-ffi/src/server/logger.rs b/livekit-ffi/src/server/logger.rs index 73b1cadb6..02452c556 100644 --- a/livekit-ffi/src/server/logger.rs +++ b/livekit-ffi/src/server/logger.rs @@ -1,39 +1,86 @@ use crate::proto; use crate::server::FfiServer; +use env_logger; use log::{self, Log}; -use std::future::Future; -use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1); +pub const BATCH_SIZE: usize = 16; + +/// Logger that forward logs to the FfiClient when capture_logs is enabled +/// Otherwise fallback to the env_logger +pub struct FfiLogger { + server: &'static FfiServer, + log_tx: mpsc::UnboundedSender, + capture_logs: AtomicBool, + env_logger: env_logger::Logger, +} enum LogMsg { Log(proto::LogRecord), Flush(oneshot::Sender<()>), } -pub struct FfiLogger { - server: &'static FfiServer, - log_tx: mpsc::UnboundedSender, +impl FfiLogger { + pub fn new(server: &'static FfiServer, capture_logs: bool) -> Self { + let (log_tx, log_rx) = mpsc::unbounded_channel(); + server.async_runtime.spawn(log_forward_task(server, log_rx)); + + let env_logger = env_logger::Builder::from_default_env().build(); + FfiLogger { + server, + log_tx, + capture_logs: AtomicBool::new(capture_logs), + env_logger, + } + } } impl FfiLogger { - pub fn new(server: &'static FfiServer, max_batch_size: u32) -> Self { - let (log_tx, log_rx) = mpsc::unbounded_channel(); - server - .async_runtime - .spawn(log_task(server, max_batch_size, log_rx)); - FfiLogger { server, log_tx } + pub fn capture_logs(&self) -> bool { + self.capture_logs.load(Ordering::Acquire) + } + + pub fn set_capture_logs(&self, capture: bool) { + self.capture_logs.store(capture, Ordering::Release); } } -async fn log_task( - server: &'static FfiServer, - max_batch_size: u32, - mut rx: mpsc::UnboundedReceiver, -) { +impl Log for FfiLogger { + fn enabled(&self, metadata: &log::Metadata) -> bool { + if !self.capture_logs() { + return self.env_logger.enabled(metadata); + } + + true // The ffi client decides what to log (FfiLogger is just forwarding) + } + + fn log(&self, record: &log::Record) { + if !self.capture_logs() { + return self.env_logger.log(record); + } + + self.log_tx.send(LogMsg::Log(record.into())).unwrap(); + } + + fn flush(&self) { + if !self.capture_logs() { + return self.env_logger.flush(); + } + + let (tx, mut rx) = oneshot::channel(); + self.log_tx.send(LogMsg::Flush(tx)).unwrap(); + let _ = self.server.async_runtime.block_on(rx); // should we block? + } +} + +async fn log_forward_task(server: &'static FfiServer, mut rx: mpsc::UnboundedReceiver) { async fn flush(server: &'static FfiServer, batch: &mut Vec) { + if batch.is_empty() { + return; + } let _ = server .send_event(proto::ffi_event::Message::Logs(proto::LogBatch { records: batch.clone(), // Avoid clone here? @@ -42,49 +89,28 @@ async fn log_task( batch.clear(); } - let mut batch = Vec::with_capacity(max_batch_size as usize); + let mut batch = Vec::with_capacity(BATCH_SIZE); let mut interval = tokio::time::interval(FLUSH_INTERVAL); loop { tokio::select! { - msg = rx.recv() => { - if let Some(msg) = msg { - - match msg { - LogMsg::Log(record) => { - batch.push(record); - } - LogMsg::Flush(tx) => { - flush(server, &mut batch).await; - let _ = tx.send(()); - } + Some(msg) = rx.recv() => { + match msg { + LogMsg::Log(record) => { + batch.push(record); + } + LogMsg::Flush(tx) => { + flush(server, &mut batch).await; + let _ = tx.send(()); } - - } else { - flush(server, &mut batch).await; - break; // FfiLogger dropped } - }, + }, _ = interval.tick() => { flush(server, &mut batch).await; } } - } -} - -impl Log for FfiLogger { - fn enabled(&self, metadata: &log::Metadata) -> bool { - true // The ffi client decides what to log (FfiLogger is just forwarding) - } - - fn log(&self, record: &log::Record) { - self.log_tx.send(LogMsg::Log(record.into())).unwrap(); - } - fn flush(&self) { - let (tx, mut rx) = oneshot::channel(); - self.log_tx.send(LogMsg::Flush(tx)).unwrap(); - let _ = self.server.async_runtime.block_on(rx); // should we block? + flush(server, &mut batch).await; } } diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 7d4f54dc5..f97c26b71 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -70,7 +70,7 @@ pub struct FfiServer { impl Default for FfiServer { fn default() -> Self { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + // Initialize the FfiLogger #[cfg(feature = "tracing")] console_subscriber::init(); diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 538639f30..163897a04 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -14,8 +14,8 @@ use super::room::{FfiParticipant, FfiPublication, FfiTrack}; use super::{ - audio_source, audio_stream, room, video_source, video_stream, FfiConfig, FfiError, FfiResult, - FfiServer, + audio_source, audio_stream, logger::FfiLogger, room, video_source, video_stream, FfiConfig, + FfiError, FfiResult, FfiServer, }; use crate::proto; use livekit::prelude::*; @@ -36,13 +36,18 @@ fn on_initialize( return Err(FfiError::AlreadyInitialized); } - log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); - // # SAFETY: The foreign language is responsible for ensuring that the callback function is valid *server.config.lock() = Some(FfiConfig { callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) }, }); + if init.capture_logs { + let ffi_logger = FfiLogger::new(server, init.max_log_batch_size); + let _ = log::set_boxed_logger(Box::new(ffi_logger)); + } + + log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); + Ok(proto::InitializeResponse::default()) }