Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
support custom writers for file target
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Apr 6, 2024
1 parent 6bfb4bb commit 52e6c63
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 48 deletions.
7 changes: 6 additions & 1 deletion targets/file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ name = "emit_file"
version = "0.0.0"
edition = "2021"

[features]
default = ["default_writer"]
default_writer = ["emit/sval", "sval_json"]

[dependencies.emit]
path = "../../"
default-features = false
features = ["std", "sval"]
features = ["std"]

[dependencies.sval]
version = "2"

[dependencies.sval_json]
version = "2"
features = ["std"]
optional = true

[dependencies.emit_batcher]
path = "../../batcher"
Expand Down
1 change: 1 addition & 0 deletions targets/file/src/internal_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ metrics!(InternalMetrics {
file_write_failed: Counter,
file_delete: Counter,
file_delete_failed: Counter,
file_format_failed: Counter,
});
203 changes: 157 additions & 46 deletions targets/file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,42 @@ use std::{
fs::{self, File},
io::{self, Write},
mem,
ops::ControlFlow,
path::{Path, PathBuf},
sync::Arc,
thread,
};

use emit::well_known::{KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START};
use emit_batcher::BatchError;
use internal_metrics::InternalMetrics;

pub type Error = Box<dyn std::error::Error + Send + Sync>;

#[cfg(feature = "default_writer")]
pub fn set(file_set: impl AsRef<Path>) -> FileSetBuilder {
FileSetBuilder::new(file_set.as_ref())
}

pub fn set_with_writer(
file_set: impl AsRef<Path>,
writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
+ Send
+ Sync
+ 'static,
) -> FileSetBuilder {
FileSetBuilder::new_with_writer(file_set.as_ref(), writer)
}

pub struct FileSetBuilder {
file_set: PathBuf,
roll_by: RollBy,
max_files: usize,
max_file_size_bytes: usize,
reuse_files: bool,
writer: Box<
dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
+ Send
+ Sync,
>,
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -37,14 +51,31 @@ enum RollBy {
Minute,
}

const DEFAULT_ROLL_BY: RollBy = RollBy::Hour;
const DEFUALT_MAX_FILES: usize = 32;
const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 1024 * 1024 * 1024; // 1GiB
const DEFAULT_REUSE_FILES: bool = false;

impl FileSetBuilder {
#[cfg(feature = "default_writer")]
pub fn new(file_set: impl Into<PathBuf>) -> Self {
Self::new_with_writer(file_set, default_writer)
}

pub fn new_with_writer(
file_set: impl Into<PathBuf>,
writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
+ Send
+ Sync
+ 'static,
) -> Self {
FileSetBuilder {
file_set: file_set.into(),
roll_by: RollBy::Hour,
max_files: 32,
max_file_size_bytes: 1024 * 1024 * 1024, // 1GiB
reuse_files: false,
roll_by: DEFAULT_ROLL_BY,
max_files: DEFUALT_MAX_FILES,
max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES,
reuse_files: DEFAULT_REUSE_FILES,
writer: Box::new(writer),
}
}

Expand Down Expand Up @@ -78,6 +109,17 @@ impl FileSetBuilder {
self
}

pub fn writer(
mut self,
writer: impl Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
+ Send
+ Sync
+ 'static,
) -> Self {
self.writer = Box::new(writer);
self
}

pub fn spawn(self) -> Result<FileSet, Error> {
let (dir, file_prefix, file_ext) = dir_prefix_ext(self.file_set)?;

Expand All @@ -103,6 +145,7 @@ impl FileSetBuilder {
Ok(FileSet {
sender,
metrics,
writer: self.writer,
_handle: handle,
})
}
Expand All @@ -111,6 +154,11 @@ impl FileSetBuilder {
pub struct FileSet {
sender: emit_batcher::Sender<EventBatch>,
metrics: Arc<InternalMetrics>,
writer: Box<
dyn Fn(&mut FileBuf, &emit::Event<&dyn emit::props::ErasedProps>) -> io::Result<()>
+ Send
+ Sync,
>,
_handle: thread::JoinHandle<()>,
}

Expand All @@ -127,10 +175,21 @@ impl FileSet {

impl emit::Emitter for FileSet {
fn emit<P: emit::Props>(&self, evt: &emit::Event<P>) {
if let Ok(mut s) = sval_json::stream_to_string(EventValue(evt)) {
s.push('\n');
s.shrink_to_fit();
self.sender.send(s);
let mut buf = FileBuf::new();

match (self.writer)(&mut buf, &evt.erase()) {
Ok(()) => {
buf.push(b'\n');
self.sender.send(buf.into_boxed_slice());
}
Err(err) => {
self.metrics.file_format_failed.increment();

emit::warn!(
rt: emit::runtime::internal(),
"failed to format file event payload: {err}",
)
}
}
}

Expand All @@ -148,59 +207,111 @@ impl emit::Emitter for FileSet {
}
}

struct EventValue<'a, P>(&'a emit::Event<'a, P>);
pub struct FileBuf(Vec<u8>);

impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result {
stream.record_begin(None, None, None, None)?;
impl FileBuf {
fn new() -> Self {
FileBuf(Vec::new())
}

if let Some(extent) = self.0.extent() {
let range = extent.as_range();
pub fn push(&mut self, byte: u8) {
self.0.push(byte)
}

if range.end != range.start {
stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?;
sval::stream_display(&mut *stream, &range.start)?;
stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?;
}
pub fn extend_from_slice(&mut self, bytes: &[u8]) {
self.0.extend_from_slice(bytes)
}

stream.record_value_begin(None, &sval::Label::new(KEY_TS))?;
sval::stream_display(&mut *stream, &range.end)?;
stream.record_value_end(None, &sval::Label::new(KEY_TS))?;
}
fn into_boxed_slice(self) -> Box<[u8]> {
self.0.into_boxed_slice()
}
}

impl io::Write for FileBuf {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.0.write_all(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}

#[cfg(feature = "default_writer")]
fn default_writer(
buf: &mut FileBuf,
evt: &emit::Event<&dyn emit::props::ErasedProps>,
) -> io::Result<()> {
use std::ops::ControlFlow;

use emit::well_known::{KEY_MSG, KEY_TPL, KEY_TS, KEY_TS_START};

stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?;
sval::stream_display(&mut *stream, self.0.msg())?;
stream.record_value_end(None, &sval::Label::new(KEY_MSG))?;
struct EventValue<'a, P>(&'a emit::Event<'a, P>);

stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?;
sval::stream_display(&mut *stream, self.0.tpl())?;
stream.record_value_end(None, &sval::Label::new(KEY_TPL))?;
impl<'a, P: emit::Props> sval::Value for EventValue<'a, P> {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(
&'sval self,
stream: &mut S,
) -> sval::Result {
stream.record_begin(None, None, None, None)?;

self.0.props().for_each(|k, v| {
match (|| {
stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?;
stream.value_computed(&v)?;
stream.record_value_end(None, &sval::Label::new_computed(k.get()))?;
if let Some(extent) = self.0.extent() {
let range = extent.as_range();

Ok::<(), sval::Error>(())
})() {
Ok(()) => ControlFlow::Continue(()),
Err(_) => ControlFlow::Break(()),
if range.end != range.start {
stream.record_value_begin(None, &sval::Label::new(KEY_TS_START))?;
sval::stream_display(&mut *stream, &range.start)?;
stream.record_value_end(None, &sval::Label::new(KEY_TS_START))?;
}

stream.record_value_begin(None, &sval::Label::new(KEY_TS))?;
sval::stream_display(&mut *stream, &range.end)?;
stream.record_value_end(None, &sval::Label::new(KEY_TS))?;
}
});

stream.record_end(None, None, None)
stream.record_value_begin(None, &sval::Label::new(KEY_MSG))?;
sval::stream_display(&mut *stream, self.0.msg())?;
stream.record_value_end(None, &sval::Label::new(KEY_MSG))?;

stream.record_value_begin(None, &sval::Label::new(KEY_TPL))?;
sval::stream_display(&mut *stream, self.0.tpl())?;
stream.record_value_end(None, &sval::Label::new(KEY_TPL))?;

self.0.props().for_each(|k, v| {
match (|| {
stream.record_value_begin(None, &sval::Label::new_computed(k.get()))?;
stream.value_computed(&v)?;
stream.record_value_end(None, &sval::Label::new_computed(k.get()))?;

Ok::<(), sval::Error>(())
})() {
Ok(()) => ControlFlow::Continue(()),
Err(_) => ControlFlow::Break(()),
}
});

stream.record_end(None, None, None)
}
}

sval_json::stream_to_io_write(buf, EventValue(evt))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(())
}

struct EventBatch {
bufs: Vec<String>,
bufs: Vec<Box<[u8]>>,
remaining_bytes: usize,
index: usize,
}

impl emit_batcher::Channel for EventBatch {
type Item = String;
type Item = Box<[u8]>;

fn new() -> Self {
EventBatch {
Expand All @@ -225,7 +336,7 @@ impl emit_batcher::Channel for EventBatch {
}

impl EventBatch {
fn current(&self) -> Option<&str> {
fn current(&self) -> Option<&[u8]> {
self.bufs.get(self.index).map(|buf| &**buf)
}

Expand Down Expand Up @@ -395,7 +506,7 @@ impl Worker {
let written_bytes = batch.remaining_bytes;

while let Some(buf) = batch.current() {
if let Err(err) = file.write_event(buf.as_bytes()) {
if let Err(err) = file.write_event(buf) {
self.metrics.file_write_failed.increment();

span.complete(|extent| {
Expand Down
5 changes: 5 additions & 0 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,11 @@ impl OtlpBuilder {
emit_batcher::tokio::spawn(receiver, move |batch: Channel<PreEncoded>| {
let client = client.clone();

/*
NOTE: Possible degenerate behavior here where one signal blocks others;
the logs endpoint is flaky and fails a lot, so it means traces also get
backed up waiting for retries of logs to succeed.
*/
async move {
let Channel {
otlp_logs,
Expand Down
10 changes: 9 additions & 1 deletion targets/term/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ name = "emit_term"
version = "0.0.0"
edition = "2021"

[features]
default = ["default_writer"]
default_writer = ["emit/sval", "termcolor", "time", "sval", "sval_fmt"]

[dependencies.emit]
path = "../../"
default-features = false
features = ["std", "sval"]
features = ["std"]

[dependencies.sval]
version = "2"
optional = true

[dependencies.sval_fmt]
version = "2"
optional = true

[dependencies.time]
version = "0.3"
features = ["local-offset"]
optional = true

[dependencies.termcolor]
version = "1"
optional = true

0 comments on commit 52e6c63

Please sign in to comment.