diff --git a/targets/file/Cargo.toml b/targets/file/Cargo.toml index 94d1058..72ade41 100644 --- a/targets/file/Cargo.toml +++ b/targets/file/Cargo.toml @@ -3,10 +3,14 @@ name = "emit_file" version = "0.0.0" edition = "2021" +[features] +default = ["default_writer"] +default_writer = ["emit/sval", "sval_json"] + [dependencies.emit] path = "../../" default-features = false -features = ["std", "sval"] +features = ["std"] [dependencies.sval] version = "2" @@ -14,6 +18,7 @@ version = "2" [dependencies.sval_json] version = "2" features = ["std"] +optional = true [dependencies.emit_batcher] path = "../../batcher" diff --git a/targets/file/src/internal_metrics.rs b/targets/file/src/internal_metrics.rs index b2f2b35..27b2030 100644 --- a/targets/file/src/internal_metrics.rs +++ b/targets/file/src/internal_metrics.rs @@ -54,4 +54,5 @@ metrics!(InternalMetrics { file_write_failed: Counter, file_delete: Counter, file_delete_failed: Counter, + file_format_failed: Counter, }); diff --git a/targets/file/src/lib.rs b/targets/file/src/lib.rs index 330288a..bccfdb1 100644 --- a/targets/file/src/lib.rs +++ b/targets/file/src/lib.rs @@ -6,28 +6,42 @@ use std::{ fs::{self, File}, io::{self, Write}, mem, - ops::ControlFlow, path::{Path, PathBuf}, sync::Arc, thread, }; -use emit::well_known::{KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START}; use emit_batcher::BatchError; use internal_metrics::InternalMetrics; pub type Error = Box; +#[cfg(feature = "default_writer")] pub fn set(file_set: impl AsRef) -> FileSetBuilder { FileSetBuilder::new(file_set.as_ref()) } +pub fn set_with_writer( + file_set: impl AsRef, + writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()> + + Send + + Sync + + 'static, +) -> FileSetBuilder { + FileSetBuilder::new_with_writer(file_set.as_ref(), writer) +} + pub struct FileSetBuilder { file_set: PathBuf, roll_by: RollBy, max_files: usize, max_file_size_bytes: usize, reuse_files: bool, + writer: Box< + dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()> + + Send + + Sync, + >, } #[derive(Debug, Clone, Copy)] @@ -37,14 +51,31 @@ enum RollBy { Minute, } +const DEFAULT_ROLL_BY: RollBy = RollBy::Hour; +const DEFUALT_MAX_FILES: usize = 32; +const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 1024 * 1024 * 1024; // 1GiB +const DEFAULT_REUSE_FILES: bool = false; + impl FileSetBuilder { + #[cfg(feature = "default_writer")] pub fn new(file_set: impl Into) -> Self { + Self::new_with_writer(file_set, default_writer) + } + + pub fn new_with_writer( + file_set: impl Into, + writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()> + + Send + + Sync + + 'static, + ) -> Self { FileSetBuilder { file_set: file_set.into(), - roll_by: RollBy::Hour, - max_files: 32, - max_file_size_bytes: 1024 * 1024 * 1024, // 1GiB - reuse_files: false, + roll_by: DEFAULT_ROLL_BY, + max_files: DEFUALT_MAX_FILES, + max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES, + reuse_files: DEFAULT_REUSE_FILES, + writer: Box::new(writer), } } @@ -78,6 +109,17 @@ impl FileSetBuilder { self } + pub fn writer( + mut self, + writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()> + + Send + + Sync + + 'static, + ) -> Self { + self.writer = Box::new(writer); + self + } + pub fn spawn(self) -> Result { let (dir, file_prefix, file_ext) = dir_prefix_ext(self.file_set)?; @@ -103,6 +145,7 @@ impl FileSetBuilder { Ok(FileSet { sender, metrics, + writer: self.writer, _handle: handle, }) } @@ -111,6 +154,11 @@ impl FileSetBuilder { pub struct FileSet { sender: emit_batcher::Sender, metrics: Arc, + writer: Box< + dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()> + + Send + + Sync, + >, _handle: thread::JoinHandle<()>, } @@ -127,10 +175,21 @@ impl FileSet { impl emit::Emitter for FileSet { fn emit(&self, evt: &emit::Event

) { - if let Ok(mut s) = sval_json::stream_to_string(EventValue(evt)) { - s.push('\n'); - s.shrink_to_fit(); - self.sender.send(s); + let mut buf = FileBuf::new(); + + match (self.writer)(&mut buf, &evt.erase()) { + Ok(()) => { + buf.push(b'\n'); + self.sender.send(buf.into_boxed_slice()); + } + Err(err) => { + self.metrics.file_format_failed.increment(); + + emit::warn!( + rt: emit::runtime::internal(), + "failed to format file event payload: {err}", + ) + } } } @@ -148,59 +207,111 @@ impl emit::Emitter for FileSet { } } -struct EventValue<'a, P>(&'a emit::Event<'a, P>); +pub struct FileBuf(Vec); -impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> { - fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result { - stream.record_begin(None, None, None, None)?; +impl FileBuf { + fn new() -> Self { + FileBuf(Vec::new()) + } - if let Some(extent) = self.0.extent() { - let range = extent.as_range(); + pub fn push(&mut self, byte: u8) { + self.0.push(byte) + } - if range.end != range.start { - stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?; - sval::stream_display(&mut *stream, &range.start)?; - stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?; - } + pub fn extend_from_slice(&mut self, bytes: &[u8]) { + self.0.extend_from_slice(bytes) + } - stream.record_value_begin(None, &sval::Label::new(KEY_TS))?; - sval::stream_display(&mut *stream, &range.end)?; - stream.record_value_end(None, &sval::Label::new(KEY_TS))?; - } + fn into_boxed_slice(self) -> Box<[u8]> { + self.0.into_boxed_slice() + } +} + +impl io::Write for FileBuf { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.0.write_all(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +#[cfg(feature = "default_writer")] +fn default_writer( + buf: &mut FileBuf, + evt: &emit::Event<&dyn emit::props::ErasedProps>, +) -> io::Result<()> { + use std::ops::ControlFlow; + + use emit::well_known::{KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START}; - stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?; - sval::stream_display(&mut *stream, self.0.msg())?; - stream.record_value_end(None, &sval::Label::new(KEY_MSG))?; + struct EventValue<'a, P>(&'a emit::Event<'a, P>); - stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?; - sval::stream_display(&mut *stream, self.0.tpl())?; - stream.record_value_end(None, &sval::Label::new(KEY_TPL))?; + impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> { + fn stream<'sval, S: sval::Stream<'sval> + ?Sized>( + &'sval self, + stream: &mut S, + ) -> sval::Result { + stream.record_begin(None, None, None, None)?; - self.0.props().for_each(|k, v| { - match (|| { - stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?; - stream.value_computed(&v)?; - stream.record_value_end(None, &sval::Label::new_computed(k.get()))?; + if let Some(extent) = self.0.extent() { + let range = extent.as_range(); - Ok::<(), sval::Error>(()) - })() { - Ok(()) => ControlFlow::Continue(()), - Err(_) => ControlFlow::Break(()), + if range.end != range.start { + stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?; + sval::stream_display(&mut *stream, &range.start)?; + stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?; + } + + stream.record_value_begin(None, &sval::Label::new(KEY_TS))?; + sval::stream_display(&mut *stream, &range.end)?; + stream.record_value_end(None, &sval::Label::new(KEY_TS))?; } - }); - stream.record_end(None, None, None) + stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?; + sval::stream_display(&mut *stream, self.0.msg())?; + stream.record_value_end(None, &sval::Label::new(KEY_MSG))?; + + stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?; + sval::stream_display(&mut *stream, self.0.tpl())?; + stream.record_value_end(None, &sval::Label::new(KEY_TPL))?; + + self.0.props().for_each(|k, v| { + match (|| { + stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?; + stream.value_computed(&v)?; + stream.record_value_end(None, &sval::Label::new_computed(k.get()))?; + + Ok::<(), sval::Error>(()) + })() { + Ok(()) => ControlFlow::Continue(()), + Err(_) => ControlFlow::Break(()), + } + }); + + stream.record_end(None, None, None) + } } + + sval_json::stream_to_io_write(buf, EventValue(evt)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(()) } struct EventBatch { - bufs: Vec, + bufs: Vec>, remaining_bytes: usize, index: usize, } impl emit_batcher::Channel for EventBatch { - type Item = String; + type Item = Box<[u8]>; fn new() -> Self { EventBatch { @@ -225,7 +336,7 @@ impl emit_batcher::Channel for EventBatch { } impl EventBatch { - fn current(&self) -> Option<&str> { + fn current(&self) -> Option<&[u8]> { self.bufs.get(self.index).map(|buf| &**buf) } @@ -395,7 +506,7 @@ impl Worker { let written_bytes = batch.remaining_bytes; while let Some(buf) = batch.current() { - if let Err(err) = file.write_event(buf.as_bytes()) { + if let Err(err) = file.write_event(buf) { self.metrics.file_write_failed.increment(); span.complete(|extent| { diff --git a/targets/otlp/src/client.rs b/targets/otlp/src/client.rs index ca1ad0a..49dde8f 100644 --- a/targets/otlp/src/client.rs +++ b/targets/otlp/src/client.rs @@ -670,6 +670,11 @@ impl OtlpBuilder { emit_batcher::tokio::spawn(receiver, move |batch: Channel| { let client = client.clone(); + /* + NOTE: Possible degenerate behavior here where one signal blocks others; + the logs endpoint is flaky and fails a lot, so it means traces also get + backed up waiting for retries of logs to succeed. + */ async move { let Channel { otlp_logs, diff --git a/targets/term/Cargo.toml b/targets/term/Cargo.toml index 2e4f6c8..7980fd9 100644 --- a/targets/term/Cargo.toml +++ b/targets/term/Cargo.toml @@ -3,20 +3,28 @@ name = "emit_term" version = "0.0.0" edition = "2021" +[features] +default = ["default_writer"] +default_writer = ["emit/sval", "termcolor", "time", "sval", "sval_fmt"] + [dependencies.emit] path = "../../" default-features = false -features = ["std", "sval"] +features = ["std"] [dependencies.sval] version = "2" +optional = true [dependencies.sval_fmt] version = "2" +optional = true [dependencies.time] version = "0.3" features = ["local-offset"] +optional = true [dependencies.termcolor] version = "1" +optional = true