Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Trace SDK] Send resource once to processor and exporter, and not with every span #1830

Merged
merged 18 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,18 @@ impl OtlpHttpClient {
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;

let req = ExportTraceServiceRequest {
resource_spans: spans.into_iter().map(Into::into).collect(),
use opentelemetry_proto::tonic::{
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
};

let resource_spans = spans
.into_iter()
.map(|span| ResourceSpans::new(span, resource))
.collect::<Vec<_>>();

let req = ExportTraceServiceRequest { resource_spans };
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch) {
let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down Expand Up @@ -66,4 +66,8 @@ impl SpanExporter for OtlpHttpClient {
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::BoxInterceptor;
pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

Expand Down
21 changes: 18 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +47,7 @@ impl TonicTracesClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -66,14 +71,20 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
.collect()
};

Box::pin(async move {
client
.export(Request::from_parts(
metadata,
extensions,
ExportTraceServiceRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
},
ExportTraceServiceRequest { resource_spans },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
}
}
14 changes: 5 additions & 9 deletions opentelemetry-proto/src/transform/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod tonic {
use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
use crate::transform::common::{
to_nanos,
tonic::{resource_attributes, Attributes},
tonic::{Attributes, ResourceAttributesWithSchema},
};
use opentelemetry::trace;
use opentelemetry::trace::{Link, SpanId, SpanKind};
Expand Down Expand Up @@ -45,19 +45,15 @@ pub mod tonic {
}
}

impl From<SpanData> for ResourceSpans {
fn from(source_span: SpanData) -> Self {
impl ResourceSpans {
pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(&source_span.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_spans: vec![ScopeSpans {
schema_url: source_span
.instrumentation_lib
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

- **Breaking** [#1830](https://github.com/open-telemetry/opentelemetry-rust/pull/1830/files) [Traces SDK] Improves
performance by sending Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
of Processor, Exporter, the following are *BREAKING* changes.
- Implement `set_resource` method in your custom SpanProcessor, which invokes exporter's `set_resource`.
- Implement `set_resource` method in your custom SpanExporter. This method should save the resource object
in original or serialized format, to be merged with every span event during export.
- `SpanData` doesn't have the resource attributes. The `SpanExporter::export()` method needs to merge it
with the earlier preserved resource before export.

- **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans.

## v0.23.0
Expand Down
3 changes: 0 additions & 3 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;
Expand All @@ -34,7 +32,6 @@ fn get_span_data() -> Vec<SpanData> {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub trait SpanExporter: Send + Sync + Debug {
fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> {
Box::pin(async { Ok(()) })
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down Expand Up @@ -92,8 +95,6 @@ pub struct SpanData {
pub links: crate::trace::SpanLinks,
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}
10 changes: 10 additions & 0 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{TraceError, TraceResult};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -51,6 +52,7 @@ use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct InMemorySpanExporter {
spans: Arc<Mutex<Vec<SpanData>>>,
resource: Arc<Mutex<Resource>>,
}

impl Default for InMemorySpanExporter {
Expand Down Expand Up @@ -85,6 +87,7 @@ impl InMemorySpanExporterBuilder {
pub fn build(&self) -> InMemorySpanExporter {
InMemorySpanExporter {
spans: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::default())),
}
}
}
Expand Down Expand Up @@ -142,4 +145,11 @@ impl SpanExporter for InMemorySpanExporter {
fn shutdown(&mut self) {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_guard| *res_guard = resource.clone())
.expect("Resource lock poisoned");
}
}
4 changes: 1 addition & 3 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, SpanEvents, SpanLinks},
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use futures_util::future::BoxFuture;
Expand All @@ -14,7 +14,6 @@ use opentelemetry::trace::{
use std::fmt::{Display, Formatter};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
SpanData {
span_context: SpanContext::new(
TraceId::from_u128(1),
Expand All @@ -33,7 +32,6 @@ pub fn new_test_export_span_data() -> SpanData {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
instrumentation_lib: InstrumentationLibrary::default(),
}
}
Expand Down
13 changes: 9 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ impl Builder {
}
}

// Create a new vector to hold the modified processors
let mut processors = self.processors;

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: self.processors,
config,
}),
inner: Arc::new(TracerProviderInner { processors, config }),
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! is possible to change its name, set its `Attributes`, and add `Links` and `Events`.
//! These cannot be changed after the `Span`'s end time has been set.
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::KeyValue;
use std::borrow::Cow;
Expand Down Expand Up @@ -77,11 +76,10 @@ impl Span {
/// overhead.
pub fn exported_data(&self) -> Option<crate::export::trace::SpanData> {
let (span_context, tracer) = (self.span_context.clone(), &self.tracer);
let resource = self.tracer.provider()?.config().resource.clone();

self.data
.as_ref()
.map(|data| build_export_data(data.clone(), span_context, resource, tracer))
.map(|data| build_export_data(data.clone(), span_context, tracer))
}
}

Expand Down Expand Up @@ -225,17 +223,14 @@ impl Span {
processor.on_end(build_export_data(
data,
self.span_context.clone(),
provider.config().resource.clone(),
&self.tracer,
));
}
processors => {
let config = provider.config();
for processor in processors {
processor.on_end(build_export_data(
data.clone(),
self.span_context.clone(),
config.resource.clone(),
&self.tracer,
));
}
Expand All @@ -254,7 +249,6 @@ impl Drop for Span {
fn build_export_data(
data: SpanData,
span_context: SpanContext,
resource: Cow<'static, Resource>,
tracer: &crate::trace::Tracer,
) -> crate::export::trace::SpanData {
crate::export::trace::SpanData {
Expand All @@ -269,7 +263,6 @@ fn build_export_data(
events: data.events,
links: data.links,
status: data.status,
resource,
instrumentation_lib: tracer.instrumentation_library().clone(),
}
}
Expand Down
Loading
Loading