Skip to content

Commit

Permalink
fallback to env_logger
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Nov 15, 2023
1 parent 32ffd5e commit d9769bb
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 54 deletions.
1 change: 0 additions & 1 deletion livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ message FfiEvent {
message InitializeRequest {
uint64 event_callback_ptr = 1;
bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord
uint32 max_log_batch_size = 3; // Max number of log records in a single LogBatch
}
message InitializeResponse {}

Expand Down
122 changes: 74 additions & 48 deletions livekit-ffi/src/server/logger.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,86 @@
use crate::proto;
use crate::server::FfiServer;
use env_logger;
use log::{self, Log};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};

pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub const BATCH_SIZE: usize = 16;

/// Logger that forward logs to the FfiClient when capture_logs is enabled
/// Otherwise fallback to the env_logger
pub struct FfiLogger {
server: &'static FfiServer,
log_tx: mpsc::UnboundedSender<LogMsg>,
capture_logs: AtomicBool,
env_logger: env_logger::Logger,
}

enum LogMsg {
Log(proto::LogRecord),
Flush(oneshot::Sender<()>),
}

pub struct FfiLogger {
server: &'static FfiServer,
log_tx: mpsc::UnboundedSender<LogMsg>,
impl FfiLogger {
pub fn new(server: &'static FfiServer, capture_logs: bool) -> Self {
let (log_tx, log_rx) = mpsc::unbounded_channel();
server.async_runtime.spawn(log_forward_task(server, log_rx));

let env_logger = env_logger::Builder::from_default_env().build();
FfiLogger {
server,
log_tx,
capture_logs: AtomicBool::new(capture_logs),
env_logger,
}
}
}

impl FfiLogger {
pub fn new(server: &'static FfiServer, max_batch_size: u32) -> Self {
let (log_tx, log_rx) = mpsc::unbounded_channel();
server
.async_runtime
.spawn(log_task(server, max_batch_size, log_rx));
FfiLogger { server, log_tx }
pub fn capture_logs(&self) -> bool {
self.capture_logs.load(Ordering::Acquire)
}

pub fn set_capture_logs(&self, capture: bool) {
self.capture_logs.store(capture, Ordering::Release);
}
}

async fn log_task(
server: &'static FfiServer,
max_batch_size: u32,
mut rx: mpsc::UnboundedReceiver<LogMsg>,
) {
impl Log for FfiLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if !self.capture_logs() {
return self.env_logger.enabled(metadata);
}

true // The ffi client decides what to log (FfiLogger is just forwarding)
}

fn log(&self, record: &log::Record) {
if !self.capture_logs() {
return self.env_logger.log(record);
}

self.log_tx.send(LogMsg::Log(record.into())).unwrap();
}

fn flush(&self) {
if !self.capture_logs() {
return self.env_logger.flush();
}

let (tx, mut rx) = oneshot::channel();
self.log_tx.send(LogMsg::Flush(tx)).unwrap();
let _ = self.server.async_runtime.block_on(rx); // should we block?
}
}

async fn log_forward_task(server: &'static FfiServer, mut rx: mpsc::UnboundedReceiver<LogMsg>) {
async fn flush(server: &'static FfiServer, batch: &mut Vec<proto::LogRecord>) {
if batch.is_empty() {
return;
}
let _ = server
.send_event(proto::ffi_event::Message::Logs(proto::LogBatch {
records: batch.clone(), // Avoid clone here?
Expand All @@ -42,49 +89,28 @@ async fn log_task(
batch.clear();
}

let mut batch = Vec::with_capacity(max_batch_size as usize);
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut interval = tokio::time::interval(FLUSH_INTERVAL);

loop {
tokio::select! {
msg = rx.recv() => {
if let Some(msg) = msg {

match msg {
LogMsg::Log(record) => {
batch.push(record);
}
LogMsg::Flush(tx) => {
flush(server, &mut batch).await;
let _ = tx.send(());
}
Some(msg) = rx.recv() => {
match msg {
LogMsg::Log(record) => {
batch.push(record);
}
LogMsg::Flush(tx) => {
flush(server, &mut batch).await;
let _ = tx.send(());
}

} else {
flush(server, &mut batch).await;
break; // FfiLogger dropped
}
},
},
_ = interval.tick() => {
flush(server, &mut batch).await;
}
}
}
}

impl Log for FfiLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
true // The ffi client decides what to log (FfiLogger is just forwarding)
}

fn log(&self, record: &log::Record) {
self.log_tx.send(LogMsg::Log(record.into())).unwrap();
}

fn flush(&self) {
let (tx, mut rx) = oneshot::channel();
self.log_tx.send(LogMsg::Flush(tx)).unwrap();
let _ = self.server.async_runtime.block_on(rx); // should we block?
flush(server, &mut batch).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct FfiServer {

impl Default for FfiServer {
fn default() -> Self {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// Initialize the FfiLogger

#[cfg(feature = "tracing")]
console_subscriber::init();
Expand Down
13 changes: 9 additions & 4 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use super::room::{FfiParticipant, FfiPublication, FfiTrack};
use super::{
audio_source, audio_stream, room, video_source, video_stream, FfiConfig, FfiError, FfiResult,
FfiServer,
audio_source, audio_stream, logger::FfiLogger, room, video_source, video_stream, FfiConfig,
FfiError, FfiResult, FfiServer,
};
use crate::proto;
use livekit::prelude::*;
Expand All @@ -36,13 +36,18 @@ fn on_initialize(
return Err(FfiError::AlreadyInitialized);
}

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));

// # SAFETY: The foreign language is responsible for ensuring that the callback function is valid
*server.config.lock() = Some(FfiConfig {
callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) },
});

if init.capture_logs {
let ffi_logger = FfiLogger::new(server, init.max_log_batch_size);
let _ = log::set_boxed_logger(Box::new(ffi_logger));
}

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));

Ok(proto::InitializeResponse::default())
}

Expand Down

0 comments on commit d9769bb

Please sign in to comment.