Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logs SDK] Send resource once to processor and exporter, and not for every event. #1636

Merged
merged 34 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
57fa974
initial commit
lalitb Mar 21, 2024
63fdf18
Merge branch 'main' into set-resource-optimize
lalitb Mar 25, 2024
597a2af
more changes
lalitb Mar 27, 2024
b6a16ac
Merge branch 'main' into set-resource-optimize
lalitb Mar 27, 2024
732b9d6
lint error
lalitb Mar 27, 2024
c18499a
fix lint
lalitb Mar 27, 2024
00c0eaa
lint errors, rename LogEvent to LogData ; and LogData to LogDataWithR…
lalitb Mar 27, 2024
5361996
optimize otlp
lalitb Mar 28, 2024
a0dc96e
leftover comments
lalitb Mar 28, 2024
36d9900
propagate resource
lalitb Mar 28, 2024
7ecf106
update changelog
lalitb Mar 28, 2024
f031e29
fix doc error
lalitb Mar 28, 2024
112a053
add tests
lalitb Mar 28, 2024
becd779
Remove todo
lalitb Mar 28, 2024
aaa3c89
add empty implementation for set_resource for processor and exporter
lalitb Mar 28, 2024
71a4efd
Update opentelemetry-sdk/CHANGELOG.md
lalitb Mar 28, 2024
ac4b788
Merge branch 'main' into set-resource-optimize
lalitb Mar 28, 2024
e4d5a70
leftover commit
lalitb Mar 28, 2024
2e5a2d9
Merge branch 'main' into set-resource-optimize
lalitb Mar 29, 2024
8f14674
Merge branch 'main' into set-resource-optimize
lalitb Mar 29, 2024
de7dc51
remove redundant set_resource definitions
lalitb Mar 29, 2024
0634469
avoid log data cloning
lalitb Apr 2, 2024
18645cf
Merge branch 'main' into set-resource-optimize
lalitb Apr 4, 2024
92e4864
Merge branch 'main' into set-resource-optimize
cijothomas Apr 8, 2024
38e1bf4
Merge branch 'main' into set-resource-optimize
lalitb Apr 16, 2024
f7786e1
Merge branch 'main' into set-resource-optimize
lalitb Apr 22, 2024
b220635
remove single processor optimization
lalitb Apr 25, 2024
abc7507
Merge branch 'main' into set-resource-optimize
lalitb Apr 26, 2024
98c11b1
resolve merge conflict
lalitb Apr 26, 2024
2ce3e0f
review comment - remove set_resource overwrite method
lalitb Apr 26, 2024
0aef2fc
fix merge conflict
lalitb Apr 26, 2024
dfecf91
Merge branch 'main' into set-resource-optimize
lalitb Apr 30, 2024
896f672
fix merge conflict
lalitb Apr 30, 2024
3576450
Merge branch 'main' into set-resource-optimize
cijothomas Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = build_body(batch)?;
let (body, content_type) = { build_body(batch, &self.resource)? };

Check warning on line 22 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L22

Added line #L22 was not covered by tests
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down Expand Up @@ -50,24 +50,36 @@
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}

Check warning on line 56 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L54-L56

Added lines #L54 - L56 were not covered by tests
}

#[cfg(feature = "http-proto")]
fn build_body(logs: Vec<LogData>) -> LogResult<(Vec<u8>, &'static str)> {
fn build_body(
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> LogResult<(Vec<u8>, &'static str)> {

Check warning on line 63 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L60-L63

Added lines #L60 - L63 were not covered by tests
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use prost::Message;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();

Check warning on line 69 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L66-L69

Added lines #L66 - L69 were not covered by tests

let req = ExportLogsServiceRequest {
resource_logs: logs.into_iter().map(Into::into).collect(),
};
let req = ExportLogsServiceRequest { resource_logs };

Check warning on line 71 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L71

Added line #L71 was not covered by tests
let mut buf = vec![];
req.encode(&mut buf).map_err(crate::Error::from)?;

Ok((buf, "application/x-protobuf"))
}

#[cfg(not(feature = "http-proto"))]
fn build_body(logs: Vec<LogData>) -> LogResult<(Vec<u8>, &'static str)> {
fn build_body(
logs: Vec<LogData>,
resource: &opentelemetry_sdk::Resource,
) -> LogResult<(Vec<u8>, &'static str)> {
Err(LogsError::Other(
"No http protocol configured. Enable one via `http-proto`".into(),
))
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
};
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
Expand Down Expand Up @@ -255,6 +256,9 @@
collector_endpoint: Uri,
headers: HashMap<HeaderName, HeaderValue>,
_timeout: Duration,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

impl OtlpHttpClient {
Expand All @@ -270,6 +274,7 @@
collector_endpoint,
headers,
_timeout: timeout,
resource: ResourceAttributesWithSchema::default(),

Check warning on line 277 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L277

Added line #L277 was not covered by tests
}
}
}
Expand Down
23 changes: 18 additions & 5 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::fmt;

use async_trait::async_trait;
use core::fmt;
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
Expand All @@ -12,6 +11,9 @@

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +45,7 @@
client,
interceptor,
}),
resource: Default::default(),

Check warning on line 48 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L48

Added line #L48 was not covered by tests
}
}
}
Expand All @@ -62,13 +65,19 @@
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = {
batch
.into_iter()
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};

Check warning on line 75 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L68-L75

Added lines #L68 - L75 were not covered by tests
client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest {
resource_logs: batch.into_iter().map(Into::into).collect(),
},
ExportLogsServiceRequest { resource_logs },

Check warning on line 80 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L80

Added line #L80 was not covered by tests
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -79,4 +88,8 @@
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}

Check warning on line 94 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L92-L94

Added lines #L92 - L94 were not covered by tests
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {
todo!("set_resource");

Check warning on line 108 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L107-L108

Added lines #L107 - L108 were not covered by tests
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Recommended configuration for an OTLP exporter pipeline.
Expand Down
19 changes: 18 additions & 1 deletion opentelemetry-proto/src/transform/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@
use opentelemetry::{Array, Value};
use std::borrow::Cow;

#[cfg(any(feature = "trace", feature = "logs"))]
#[derive(Debug, Default)]
pub struct ResourceAttributesWithSchema {
pub attributes: Attributes,
pub schema_url: Option<String>,
}

#[cfg(any(feature = "trace", feature = "logs"))]
impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema {
fn from(resource: &opentelemetry_sdk::Resource) -> Self {
ResourceAttributesWithSchema {
attributes: resource_attributes(resource),
schema_url: resource.schema_url().map(ToString::to_string),
}
}

Check warning on line 39 in opentelemetry-proto/src/transform/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/common.rs#L34-L39

Added lines #L34 - L39 were not covered by tests
}

#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;

Expand Down Expand Up @@ -52,7 +69,7 @@
}

/// Wrapper type for Vec<`KeyValue`>
#[derive(Default)]
#[derive(Default, Debug)]
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);

impl From<Vec<opentelemetry::KeyValue>> for Attributes {
Expand Down
26 changes: 17 additions & 9 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
resource::v1::Resource,
Attributes,
},
transform::common::{to_nanos, tonic::resource_attributes},
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};

Expand Down Expand Up @@ -110,18 +110,26 @@
}
}

impl From<opentelemetry_sdk::export::logs::LogData> for ResourceLogs {
fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self {
impl
From<(
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
fn from(
data: (
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
),
) -> Self {
let (log_data, resource) = data;

Check warning on line 126 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L119-L126

Added lines #L119 - L126 were not covered by tests
ResourceLogs {
resource: Some(Resource {
attributes: resource_attributes(&log_data.resource).0,
attributes: resource.attributes.0.clone(),

Check warning on line 129 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L129

Added line #L129 was not covered by tests
dropped_attributes_count: 0,
}),
schema_url: log_data
.resource
.schema_url()
.map(Into::into)
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap(),

Check warning on line 132 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L132

Added line #L132 was not covered by tests
scope_logs: vec![ScopeLogs {
schema_url: log_data
.instrumentation
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
- **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and
`ProcessResourceDetector` resource detectors, use the
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Send resource attributes once to
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
processor and exporter, and not for every event.
- Add `set_resource` method in LogProcessor an Exporter
lalitb marked this conversation as resolved.
Show resolved Hide resolved
- Propagate resource attributes to processor and exporter during LoggerProvider creation.

## v0.22.1

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ impl LogExporter for VoidExporter {
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
LogResult::Ok(())
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
lalitb marked this conversation as resolved.
Show resolved Hide resolved
}

fn log_benchmark_group<F: Fn(&dyn Logger)>(c: &mut Criterion, name: &str, f: F) {
Expand Down
11 changes: 5 additions & 6 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use opentelemetry::{
logs::{LogError, LogRecord, LogResult},
InstrumentationLibrary,
};
use std::{borrow::Cow, fmt::Debug};
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
Expand All @@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug {
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, resource: &Resource);
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

/// `LogData` associates a [`LogRecord`] with a [`Resource`] and
/// [`InstrumentationLibrary`].
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
/// Resource for the emitter who produced this `LogData`.
pub resource: Cow<'static, Resource>,
/// Instrumentation details for the emitter who produced this `LogData`.
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ impl Builder {

/// Create a new provider from this configuration.
pub fn build(self) -> LoggerProvider {
// invoke set_resource on all the processors
for processor in &self.processors {
processor.set_resource(&self.config.resource);
}
LoggerProvider {
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
Expand Down Expand Up @@ -200,7 +204,6 @@ impl opentelemetry::logs::Logger for Logger {
/// Emit a `LogRecord`.
fn emit(&self, record: LogRecord) {
let provider = self.provider();
let config = provider.config();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
cx.has_active_span()
Expand All @@ -213,7 +216,6 @@ impl opentelemetry::logs::Logger for Logger {
}
let data = LogData {
record,
resource: config.resource.clone(),
instrumentation: self.instrumentation_library().clone(),
};
p.emit(data);
Expand Down Expand Up @@ -363,5 +365,9 @@ mod tests {
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}

fn set_resource(&self, _resource: &crate::Resource) {
lalitb marked this conversation as resolved.
Show resolved Hide resolved
// nothing to do.
}
}
}
Loading
Loading