Skip to content

Commit

Permalink
initial ffilogger
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Nov 15, 2023
1 parent 4a0d511 commit 32ffd5e
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 3 deletions.
30 changes: 28 additions & 2 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,17 @@ message FfiEvent {
UpdateLocalMetadataCallback update_local_metadata = 12;
UpdateLocalNameCallback update_local_name = 13;
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
}
}

// Setup the callback where the foreign language can receive events
// and responses to asynchronous requests
message InitializeRequest { uint64 event_callback_ptr = 1; }
message InitializeRequest {
uint64 event_callback_ptr = 1;
bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord
uint32 max_log_batch_size = 3; // Max number of log records in a single LogBatch
}
message InitializeResponse {}

// Stop all rooms synchronously (Do we need async here?).
Expand All @@ -172,5 +177,26 @@ message DisposeCallback {
uint64 async_id = 1;
}

// TODO(theomonnom): Debug messages (Print handles, forward logs).
enum LogLevel {
LOG_ERROR = 0;
LOG_WARN = 1;
LOG_INFO = 2;
LOG_DEBUG = 3;
LOG_TRACE = 4;
}

message LogRecord {
LogLevel level = 1;
string target = 2; // e.g "livekit", "libwebrtc", "tokio-tungstenite", etc...
optional string module_path = 3;
optional string file = 4;
optional uint32 line = 5;
string message = 6;
}

message LogBatch {
repeated LogRecord records = 1;
}

// TODO(theomonnom): Debug messages (Print handles).

68 changes: 67 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3155,7 +3155,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3191,6 +3191,8 @@ pub mod ffi_event {
UpdateLocalName(super::UpdateLocalNameCallback),
#[prost(message, tag="14")]
GetStats(super::GetStatsCallback),
#[prost(message, tag="15")]
Logs(super::LogBatch),
}
}
/// Setup the callback where the foreign language can receive events
Expand All @@ -3200,6 +3202,12 @@ pub mod ffi_event {
pub struct InitializeRequest {
#[prost(uint64, tag="1")]
pub event_callback_ptr: u64,
/// When true, the FfiServer will forward logs using LogRecord
#[prost(bool, tag="2")]
pub capture_logs: bool,
/// Max number of log records in a single LogBatch
#[prost(uint32, tag="3")]
pub max_log_batch_size: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3227,4 +3235,62 @@ pub struct DisposeCallback {
#[prost(uint64, tag="1")]
pub async_id: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LogRecord {
#[prost(enumeration="LogLevel", tag="1")]
pub level: i32,
/// e.g "livekit", "libwebrtc", "tokio-tungstenite", etc...
#[prost(string, tag="2")]
pub target: ::prost::alloc::string::String,
#[prost(string, optional, tag="3")]
pub module_path: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag="4")]
pub file: ::core::option::Option<::prost::alloc::string::String>,
#[prost(uint32, optional, tag="5")]
pub line: ::core::option::Option<u32>,
#[prost(string, tag="6")]
pub message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LogBatch {
#[prost(message, repeated, tag="1")]
pub records: ::prost::alloc::vec::Vec<LogRecord>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum LogLevel {
LogError = 0,
LogWarn = 1,
LogInfo = 2,
LogDebug = 3,
LogTrace = 4,
}
impl LogLevel {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
LogLevel::LogError => "LOG_ERROR",
LogLevel::LogWarn => "LOG_WARN",
LogLevel::LogInfo => "LOG_INFO",
LogLevel::LogDebug => "LOG_DEBUG",
LogLevel::LogTrace => "LOG_TRACE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LOG_ERROR" => Some(Self::LogError),
"LOG_WARN" => Some(Self::LogWarn),
"LOG_INFO" => Some(Self::LogInfo),
"LOG_DEBUG" => Some(Self::LogDebug),
"LOG_TRACE" => Some(Self::LogTrace),
_ => None,
}
}
}
// @@protoc_insertion_point(module)
114 changes: 114 additions & 0 deletions livekit-ffi/src/server/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::proto;
use crate::server::FfiServer;
use log::{self, Log};
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};

pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1);

enum LogMsg {
Log(proto::LogRecord),
Flush(oneshot::Sender<()>),
}

pub struct FfiLogger {
server: &'static FfiServer,
log_tx: mpsc::UnboundedSender<LogMsg>,
}

impl FfiLogger {
pub fn new(server: &'static FfiServer, max_batch_size: u32) -> Self {
let (log_tx, log_rx) = mpsc::unbounded_channel();
server
.async_runtime
.spawn(log_task(server, max_batch_size, log_rx));
FfiLogger { server, log_tx }
}
}

async fn log_task(
server: &'static FfiServer,
max_batch_size: u32,
mut rx: mpsc::UnboundedReceiver<LogMsg>,
) {
async fn flush(server: &'static FfiServer, batch: &mut Vec<proto::LogRecord>) {
let _ = server
.send_event(proto::ffi_event::Message::Logs(proto::LogBatch {
records: batch.clone(), // Avoid clone here?
}))
.await;
batch.clear();
}

let mut batch = Vec::with_capacity(max_batch_size as usize);
let mut interval = tokio::time::interval(FLUSH_INTERVAL);

loop {
tokio::select! {
msg = rx.recv() => {
if let Some(msg) = msg {

match msg {
LogMsg::Log(record) => {
batch.push(record);
}
LogMsg::Flush(tx) => {
flush(server, &mut batch).await;
let _ = tx.send(());
}
}

} else {
flush(server, &mut batch).await;
break; // FfiLogger dropped
}
},
_ = interval.tick() => {
flush(server, &mut batch).await;
}
}
}
}

impl Log for FfiLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
true // The ffi client decides what to log (FfiLogger is just forwarding)
}

fn log(&self, record: &log::Record) {
self.log_tx.send(LogMsg::Log(record.into())).unwrap();
}

fn flush(&self) {
let (tx, mut rx) = oneshot::channel();
self.log_tx.send(LogMsg::Flush(tx)).unwrap();
let _ = self.server.async_runtime.block_on(rx); // should we block?
}
}

impl From<&log::Record<'_>> for proto::LogRecord {
fn from(record: &log::Record) -> Self {
proto::LogRecord {
level: proto::LogLevel::from(record.level()).into(),
target: record.target().to_string(),
module_path: record.module_path().map(|s| s.to_string()),
file: record.file().map(|s| s.to_string()),
line: record.line(),
message: record.args().to_string(), // Display trait
}
}
}

impl From<log::Level> for proto::LogLevel {
fn from(level: log::Level) -> Self {
match level {
log::Level::Error => Self::LogError,
log::Level::Warn => Self::LogWarn,
log::Level::Info => Self::LogInfo,
log::Level::Debug => Self::LogDebug,
log::Level::Trace => Self::LogTrace,
}
}
}
2 changes: 2 additions & 0 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;

pub mod audio_source;
pub mod audio_stream;
pub mod logger;
pub mod requests;
pub mod room;
pub mod video_source;
Expand Down Expand Up @@ -133,6 +134,7 @@ impl FfiServer {
.as_ref()
.map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn))?;

// TODO(theomonnom): Don't reallocate
let message = proto::FfiEvent {
message: Some(message),
}
Expand Down

0 comments on commit 32ffd5e

Please sign in to comment.