Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Trace SDK] Send resource once to processor and exporter, and not with every span #1830

Merged
merged 18 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,

Check warning on line 310 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L310

Added line #L310 was not covered by tests
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;

let req = ExportTraceServiceRequest {
resource_spans: spans.into_iter().map(Into::into).collect(),
};
let resource_spans = spans
.into_iter()
.map(|log_event| (log_event, resource).into())
lalitb marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

let req = ExportTraceServiceRequest { resource_spans };

Check warning on line 319 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L314-L319

Added lines #L314 - L319 were not covered by tests
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch) {
let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {

Check warning on line 24 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L24

Added line #L24 was not covered by tests
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down Expand Up @@ -66,4 +66,8 @@
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}

Check warning on line 72 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L70-L72

Added lines #L70 - L72 were not covered by tests
}
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::BoxInterceptor;
pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

Expand Down
21 changes: 18 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +46,7 @@ impl TonicTracesClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -66,14 +70,21 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| (log_data, &self.resource))
lalitb marked this conversation as resolved.
Show resolved Hide resolved
.map(Into::into)
.collect()
};

Box::pin(async move {
client
.export(Request::from_parts(
metadata,
extensions,
ExportTraceServiceRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
},
ExportTraceServiceRequest { resource_spans },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
}
}
2 changes: 1 addition & 1 deletion opentelemetry-proto/src/proto/opentelemetry-proto
lalitb marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 5 additions & 9 deletions opentelemetry-proto/src/transform/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod tonic {
use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
use crate::transform::common::{
to_nanos,
tonic::{resource_attributes, Attributes},
tonic::{Attributes, ResourceAttributesWithSchema},
};
use opentelemetry::trace;
use opentelemetry::trace::{Link, SpanId, SpanKind};
Expand Down Expand Up @@ -45,19 +45,15 @@ pub mod tonic {
}
}

impl From<SpanData> for ResourceSpans {
fn from(source_span: SpanData) -> Self {
impl From<(SpanData, &ResourceAttributesWithSchema)> for ResourceSpans {
fn from((source_span, resource): (SpanData, &ResourceAttributesWithSchema)) -> Self {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(&source_span.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_spans: vec![ScopeSpans {
schema_url: source_span
.instrumentation_lib
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

- **Breaking** [#1830](https://github.com/open-telemetry/opentelemetry-rust/pull/1830/files) [Traces SDK] Improves
performance by sending Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
of Processor, Exporter, the following are *BREAKING* changes.
- Implement `set_resource` method in your custom SpanProcessor, which invokes exporter's `set_resource`.
- Implement `set_resource` method in your custom SpanExporter. This method should save the resource object
in original or serialized format, to be merged with every span event during export.
- `SpanData` doesn't have the resource attributes. The `SpanExporter::export()` method needs to merge it
with the earlier preserved resource before export.

- **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans.

## v0.23.0
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn get_span_data() -> Vec<SpanData> {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub trait SpanExporter: Send + Sync + Debug {
fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> {
Box::pin(async { Ok(()) })
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down Expand Up @@ -92,8 +95,6 @@ pub struct SpanData {
pub links: crate::trace::SpanLinks,
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}
8 changes: 8 additions & 0 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{TraceError, TraceResult};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -51,6 +52,7 @@ use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct InMemorySpanExporter {
spans: Arc<Mutex<Vec<SpanData>>>,
resource: Arc<Mutex<Resource>>,
}

impl Default for InMemorySpanExporter {
Expand Down Expand Up @@ -85,6 +87,7 @@ impl InMemorySpanExporterBuilder {
pub fn build(&self) -> InMemorySpanExporter {
InMemorySpanExporter {
spans: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::default())),
}
}
}
Expand Down Expand Up @@ -142,4 +145,9 @@ impl SpanExporter for InMemorySpanExporter {
fn shutdown(&mut self) {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
*res_guard = resource.clone();
lalitb marked this conversation as resolved.
Show resolved Hide resolved
}
}
4 changes: 1 addition & 3 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, SpanEvents, SpanLinks},
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use futures_util::future::BoxFuture;
Expand All @@ -14,7 +14,6 @@ use opentelemetry::trace::{
use std::fmt::{Display, Formatter};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
SpanData {
span_context: SpanContext::new(
TraceId::from_u128(1),
Expand All @@ -33,7 +32,6 @@ pub fn new_test_export_span_data() -> SpanData {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
instrumentation_lib: InstrumentationLibrary::default(),
}
}
Expand Down
17 changes: 13 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@
}
}

// Create a new vector to hold the modified processors
let mut processors = self.processors;

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: self.processors,
config,
}),
inner: Arc::new(TracerProviderInner { processors, config }),
}
}
}
Expand Down Expand Up @@ -267,6 +272,10 @@
fn shutdown(&self) -> TraceResult<()> {
self.force_flush()
}

fn set_resource(&mut self, _: &Resource) {
lalitb marked this conversation as resolved.
Show resolved Hide resolved
unimplemented!()

Check warning on line 277 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L276-L277

Added lines #L276 - L277 were not covered by tests
}
}

#[test]
Expand Down
10 changes: 2 additions & 8 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! is possible to change its name, set its `Attributes`, and add `Links` and `Events`.
//! These cannot be changed after the `Span`'s end time has been set.
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::KeyValue;
use std::borrow::Cow;
Expand Down Expand Up @@ -77,11 +76,11 @@ impl Span {
/// overhead.
pub fn exported_data(&self) -> Option<crate::export::trace::SpanData> {
let (span_context, tracer) = (self.span_context.clone(), &self.tracer);
let resource = self.tracer.provider()?.config().resource.clone();
//let resource = self.tracer.provider()?.config().resource.clone();
lalitb marked this conversation as resolved.
Show resolved Hide resolved

self.data
.as_ref()
.map(|data| build_export_data(data.clone(), span_context, resource, tracer))
.map(|data| build_export_data(data.clone(), span_context, tracer))
}
}

Expand Down Expand Up @@ -225,17 +224,14 @@ impl Span {
processor.on_end(build_export_data(
data,
self.span_context.clone(),
provider.config().resource.clone(),
&self.tracer,
));
}
processors => {
let config = provider.config();
for processor in processors {
processor.on_end(build_export_data(
data.clone(),
self.span_context.clone(),
config.resource.clone(),
&self.tracer,
));
}
Expand All @@ -254,7 +250,6 @@ impl Drop for Span {
fn build_export_data(
data: SpanData,
span_context: SpanContext,
resource: Cow<'static, Resource>,
tracer: &crate::trace::Tracer,
) -> crate::export::trace::SpanData {
crate::export::trace::SpanData {
Expand All @@ -269,7 +264,6 @@ fn build_export_data(
events: data.events,
links: data.links,
status: data.status,
resource,
instrumentation_lib: tracer.instrumentation_library().clone(),
}
}
Expand Down
Loading
Loading