Skip to content

Commit

Permalink
Support LogProcessors chaining through mutable reference (open-teleme…
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed May 23, 2024
1 parent 241c674 commit fd93c32
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 54 deletions.
6 changes: 3 additions & 3 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
| noop_layer_disabled | 12 ns |
| noop_layer_enabled | 25 ns |
| ot_layer_disabled | 19 ns |
| ot_layer_enabled | 588 ns |
| ot_layer_enabled | 446 ns |
*/

use async_trait::async_trait;
Expand All @@ -33,7 +33,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&mut self, _: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
LogResult::Ok(())
}

Expand All @@ -54,7 +54,7 @@ impl NoopProcessor {
}

impl LogProcessor for NoopProcessor {
fn emit(&self, _: LogData) {
fn emit(&self, _: &mut LogData) {
// no-op
}

Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
let client = self
.client
.lock()
Expand All @@ -19,7 +19,13 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = inner
Expand All @@ -65,9 +65,11 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

// TODO: Avoid cloning here.
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
logger provider.
- Removed dependency on `ordered-float`.

- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking
change for LogProcessor developers. If the processor needs to invoke the exporter
asynchronously, it should clone the data to ensure it can be safely processed without
lifetime issues. Any changes made to the log data before cloning in this method will be
reflected in the next log processor in the chain, as well as to the exporter.
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
Update `LogExporter::export() method to accept a batch of log data, which can be either a
reference or owned `LogData`. If the exporter needs to process the log data
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

## v0.23.0

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! run with `$ cargo bench --bench log --features=logs -- --exact <test_name>` to run specific test for logs
//! So to run test named "full-log-with-attributes/with-context" you would run `$ cargo bench --bench log --features=logs -- --exact full-log-with-attributes/with-context`
//! To run all tests for logs you would run `$ cargo bench --bench log --features=logs`
//!
use std::collections::HashMap;
use std::time::SystemTime;

Expand All @@ -19,7 +24,7 @@ struct VoidExporter;

#[async_trait]
impl LogExporter for VoidExporter {
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
async fn export<'a>(&mut self, _batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
LogResult::Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use opentelemetry::{
logs::{LogError, LogResult},
InstrumentationLibrary,
};
use std::borrow::Cow;
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "logs_level_enabled")]
Expand Down
29 changes: 15 additions & 14 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,21 @@ impl opentelemetry::logs::Logger for Logger {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});
let mut log_record = record;
if let Some(ref trace_context) = trace_context {
log_record.trace_context = Some(trace_context.clone());
}
if log_record.observed_timestamp.is_none() {
log_record.observed_timestamp = Some(SystemTime::now());
}

let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
};

for p in processors {
let mut cloned_record = record.clone();
if let Some(ref trace_context) = trace_context {
cloned_record.trace_context = Some(trace_context.clone());
}
if cloned_record.observed_timestamp.is_none() {
cloned_record.observed_timestamp = Some(SystemTime::now());
}
let data = LogData {
record: cloned_record,
instrumentation: self.instrumentation_library().clone(),
};
p.emit(data);
p.emit(&mut data);
}
}

Expand Down Expand Up @@ -326,7 +327,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: LogData) {
fn emit(&self, _data: &mut LogData) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -561,7 +562,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: LogData) {
fn emit(&self, _data: &mut LogData) {
// nothing to do.
}

Expand Down
Loading

0 comments on commit fd93c32

Please sign in to comment.