From b1040aaf893e93a84aaacd81979399d6a5989a8d Mon Sep 17 00:00:00 2001 From: bryn Date: Thu, 3 Oct 2024 13:36:14 +0100 Subject: [PATCH] Add `experimental_datadog_agent_sampling` This mode will change the behaviour of the router for tracing in the following ways: * Spans are never dropped, instead they are converted to RecordOnly. * Spans that are sent to otlp and datadog exporters will always look like they have been sampled. * The `sampling.priority` attribute is populated on spans. * `psr` is populated in trace state. * `m` is populated in trace state. --- ...datadog_upstream_sampling_decision_test.md | 48 + ...nfiguration__tests__schema_generation.snap | 6 + apollo-router/src/plugins/telemetry/config.rs | 30 +- apollo-router/src/plugins/telemetry/mod.rs | 110 ++- .../src/plugins/telemetry/otel/layer.rs | 4 +- .../src/plugins/telemetry/otel/tracer.rs | 9 +- ....field_instrumentation_sampler.router.yaml | 11 + .../tracing/datadog/agent_sampling.rs | 376 ++++++++ .../tracing/{datadog.rs => datadog/mod.rs} | 36 +- .../tracing/datadog/span_processor.rs | 133 +++ .../datadog_exporter/exporter/model/v05.rs | 21 +- .../telemetry/tracing/datadog_exporter/mod.rs | 185 ++-- .../src/plugins/telemetry/tracing/mod.rs | 8 + .../src/plugins/telemetry/tracing/otlp.rs | 27 +- apollo-router/tests/common.rs | 59 +- apollo-router/tests/integration/mod.rs | 9 + .../tests/integration/telemetry/datadog.rs | 418 ++++++++- .../telemetry/fixtures/datadog.router.yaml | 1 + ...atadog_agent_sampling_disabled.router.yaml | 23 + .../datadog_default_span_names.router.yaml | 1 + .../datadog_no_parent_sampler.router.yaml | 28 + .../fixtures/datadog_no_sample.router.yaml | 1 + .../datadog_override_span_names.router.yaml | 1 + ...tadog_override_span_names_late.router.yaml | 1 + ...tadog_resource_mapping_default.router.yaml | 1 + ...adog_resource_mapping_override.router.yaml | 1 + .../telemetry/fixtures/otlp.router.yaml | 14 +- .../otlp_datadog_agent_no_sample.router.yaml | 42 + .../otlp_datadog_agent_sample.router.yaml | 42 + ...datadog_agent_sample_no_sample.router.yaml | 42 + .../otlp_datadog_propagation.router.yaml | 39 + ...p_datadog_propagation_no_agent.router.yaml | 38 + ..._propagation_no_parent_sampler.router.yaml | 40 + ...request_with_zipkin_propagator.router.yaml | 40 + .../otlp_no_parent_sampler.router.yaml | 25 + .../tests/integration/telemetry/jaeger.rs | 8 +- .../tests/integration/telemetry/otlp.rs | 876 +++++++++++++++--- .../telemetry/exporters/tracing/datadog.mdx | 65 +- 38 files changed, 2488 insertions(+), 331 deletions(-) create mode 100644 .changesets/fix_bryn_datadog_upstream_sampling_decision_test.md create mode 100644 apollo-router/src/plugins/telemetry/testdata/config.field_instrumentation_sampler.router.yaml create mode 100644 apollo-router/src/plugins/telemetry/tracing/datadog/agent_sampling.rs rename apollo-router/src/plugins/telemetry/tracing/{datadog.rs => datadog/mod.rs} (93%) create mode 100644 apollo-router/src/plugins/telemetry/tracing/datadog/span_processor.rs create mode 100644 apollo-router/tests/integration/telemetry/fixtures/datadog_agent_sampling_disabled.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/datadog_no_parent_sampler.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_no_sample.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample_no_sample.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_agent.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_parent_sampler.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_no_parent_sampler.router.yaml diff --git a/.changesets/fix_bryn_datadog_upstream_sampling_decision_test.md b/.changesets/fix_bryn_datadog_upstream_sampling_decision_test.md new file mode 100644 index 00000000000..bb6447a66bf --- /dev/null +++ b/.changesets/fix_bryn_datadog_upstream_sampling_decision_test.md @@ -0,0 +1,48 @@ +### Respect x-datadog-sampling-priority ([PR #6017](https://github.com/apollographql/router/pull/6017)) + +This PR consists of two fixes: +#### Datadog priority sampling resolution is not lost. + +Previously a `x-datadog-sampling-priority` of `-1` would be converted to `0` for downstream requests and `2` would be converted to `1`. + +#### The sampler option in the `telemetry.exporters.tracing.common.sampler` is not datadog aware. + +To get accurate APM metrics all spans must be sent to the datadog agent with a `psr` or `sampling.priority` attribute set appropriately to record the sampling decision. + +`preview_datadog_agent_sampling` option in the router.yaml enables this behavior and should be used when exporting to the datadog agent via OTLP or datadog native. + +```yaml +telemetry: + exporters: + tracing: + common: + # Only 10 percent of spans will be forwarded from the Datadog agent to Datadog. Experiment to find a value that is good for you! + sampler: 0.1 + # Send all spans to the Datadog agent. + preview_datadog_agent_sampling: true + + # Example OTLP exporter configuration + otlp: + enabled: true + # Optional batch processor setting, this will enable the batch processor to send concurrent requests in a high load scenario. + batch_processor: + max_concurrent_exports: 100 + + # Example Datadog native exporter configuration + datadog: + enabled: true + + # Optional batch processor setting, this will enable the batch processor to send concurrent requests in a high load scenario. + batch_processor: + max_concurrent_exports: 100 +``` + +By using these options, you can decrease your Datadog bill as you will only be sending a percentage of spans from the Datadog agent to datadog. + +> [!IMPORTANT] +> Users must enable `preview_datadog_agent_sampling` to get accurate APM metrics. + +> [!IMPORTANT] +> Sending all spans to the datadog agent may require that you tweak the `batch_processor` settings in your exporter config. This applies to both OTLP and the Datadog native exporter. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/6017 diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 4eba0206d0a..af25ac3158a 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -7306,6 +7306,12 @@ expression: "&schema" "description": "Whether to use parent based sampling", "type": "boolean" }, + "preview_datadog_agent_sampling": { + "default": null, + "description": "Use datadog agent sampling. This means that all spans will be sent to the Datadog agent and the `sampling.priority` attribute will be used to control if the span will then be sent to Datadog", + "nullable": true, + "type": "boolean" + }, "resource": { "additionalProperties": { "$ref": "#/definitions/AttributeValue", diff --git a/apollo-router/src/plugins/telemetry/config.rs b/apollo-router/src/plugins/telemetry/config.rs index 4c9be011356..8dc84e85c03 100644 --- a/apollo-router/src/plugins/telemetry/config.rs +++ b/apollo-router/src/plugins/telemetry/config.rs @@ -24,6 +24,7 @@ use super::*; use crate::plugin::serde::deserialize_option_header_name; use crate::plugins::telemetry::metrics; use crate::plugins::telemetry::resource::ConfigResource; +use crate::plugins::telemetry::tracing::datadog::DatadogAgentSampling; use crate::Configuration; #[derive(thiserror::Error, Debug)] @@ -347,6 +348,9 @@ pub(crate) struct TracingCommon { pub(crate) service_namespace: Option, /// The sampler, always_on, always_off or a decimal between 0.0 and 1.0 pub(crate) sampler: SamplerOption, + /// Use datadog agent sampling. This means that all spans will be sent to the Datadog agent + /// and the `sampling.priority` attribute will be used to control if the span will then be sent to Datadog + pub(crate) preview_datadog_agent_sampling: Option, /// Whether to use parent based sampling pub(crate) parent_based_sampler: bool, /// The maximum events per span before discarding @@ -401,6 +405,7 @@ impl Default for TracingCommon { service_name: Default::default(), service_namespace: Default::default(), sampler: default_sampler(), + preview_datadog_agent_sampling: None, parent_based_sampler: default_parent_based_sampler(), max_events_per_span: default_max_events_per_span(), max_attributes_per_span: default_max_attributes_per_span(), @@ -668,8 +673,15 @@ impl From<&TracingCommon> for opentelemetry::sdk::trace::Config { if config.parent_based_sampler { sampler = parent_based(sampler); } + if config.preview_datadog_agent_sampling.unwrap_or_default() { + common = common.with_sampler(DatadogAgentSampling::new( + sampler, + config.parent_based_sampler, + )); + } else { + common = common.with_sampler(sampler); + } - common = common.with_sampler(sampler); common = common.with_max_events_per_span(config.max_events_per_span); common = common.with_max_attributes_per_span(config.max_attributes_per_span); common = common.with_max_links_per_span(config.max_links_per_span); @@ -688,6 +700,22 @@ fn parent_based(sampler: opentelemetry::sdk::trace::Sampler) -> opentelemetry::s impl Conf { pub(crate) fn calculate_field_level_instrumentation_ratio(&self) -> Result { + // Because when datadog is enabled the global sampling is overriden to always_on + if self + .exporters + .tracing + .common + .preview_datadog_agent_sampling + .unwrap_or_default() + { + let field_ratio = match &self.apollo.field_level_instrumentation_sampler { + SamplerOption::TraceIdRatioBased(ratio) => *ratio, + SamplerOption::Always(Sampler::AlwaysOn) => 1.0, + SamplerOption::Always(Sampler::AlwaysOff) => 0.0, + }; + + return Ok(field_ratio); + } Ok( match ( &self.exporters.tracing.common.sampler, diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 1478261be50..fa6fa6494f8 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -286,6 +286,20 @@ impl Plugin for Telemetry { .expect("otel error handler lock poisoned, fatal"); let mut config = init.config; + // This code would have enabled datadog agent sampling by default, but for now we will leave it as opt-in. + // If the datadog exporter is enabled then enable the agent sampler. + // If users are using otlp export then they will need to set this explicitly in their config. + // + // if config.exporters.tracing.datadog.enabled() + // && config + // .exporters + // .tracing + // .common + // .preview_datadog_agent_sampling + // .is_none() + // { + // config.exporters.tracing.common.preview_datadog_agent_sampling = Some(true); + // } config.instrumentation.spans.update_defaults(); config.instrumentation.instruments.update_defaults(); config.exporters.logging.validate()?; @@ -866,7 +880,21 @@ impl Telemetry { // Only apply things if we were executing in the context of a vanilla the Apollo executable. // Users that are rolling their own routers will need to set up telemetry themselves. if let Some(hot_tracer) = OPENTELEMETRY_TRACER_HANDLE.get() { - otel::layer::configure(&self.sampling_filter_ratio); + // If the datadog agent sampling is enabled, then we cannot presample the spans + // Therefore we set presampling to always on and let the regular sampler do the work. + // Effectively, we are disabling the presampling. + if self + .config + .exporters + .tracing + .common + .preview_datadog_agent_sampling + .unwrap_or_default() + { + otel::layer::configure(&SamplerOption::Always(Sampler::AlwaysOn)); + } else { + otel::layer::configure(&self.sampling_filter_ratio); + } // The reason that this has to happen here is that we are interacting with global state. // If we do this logic during plugin init then if a subsequent plugin fails to init then we @@ -889,7 +917,8 @@ impl Telemetry { Self::checked_global_tracer_shutdown(last_provider); - opentelemetry::global::set_text_map_propagator(Self::create_propagator(&self.config)); + let propagator = Self::create_propagator(&self.config); + opentelemetry::global::set_text_map_propagator(propagator); } activation.reload_metrics(); @@ -934,9 +963,6 @@ impl Telemetry { if propagation.zipkin || tracing.zipkin.enabled { propagators.push(Box::::default()); } - if propagation.datadog || tracing.datadog.enabled() { - propagators.push(Box::::default()); - } if propagation.aws_xray { propagators.push(Box::::default()); } @@ -946,6 +972,9 @@ impl Telemetry { propagation.request.format.clone(), ))); } + if propagation.datadog || tracing.datadog.enabled() { + propagators.push(Box::::default()); + } TextMapCompositePropagator::new(propagators) } @@ -957,9 +986,14 @@ impl Telemetry { let spans_config = &config.instrumentation.spans; let mut common = tracing_config.common.clone(); let mut sampler = common.sampler.clone(); - // set it to AlwaysOn: it is now done in the SamplingFilter, so whatever is sent to an exporter - // should be accepted - common.sampler = SamplerOption::Always(Sampler::AlwaysOn); + + // To enable pre-sampling to work we need to disable regular sampling. + // This is because the pre-sampler will sample the spans before they sent to the regular sampler + // If the datadog agent sampling is enabled, then we cannot pre-sample the spans because even if the sampling decision is made to drop + // DatadogAgentSampler will modify the decision to RecordAndSample and instead use the sampling.priority attribute to decide if the span should be sampled or not. + if !common.preview_datadog_agent_sampling.unwrap_or_default() { + common.sampler = SamplerOption::Always(Sampler::AlwaysOn); + } let mut builder = opentelemetry::sdk::trace::TracerProvider::builder().with_config((&common).into()); @@ -2130,6 +2164,8 @@ mod tests { use std::collections::HashMap; use std::fmt::Debug; use std::ops::DerefMut; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -2187,6 +2223,7 @@ mod tests { use crate::plugins::demand_control::COST_STRATEGY_KEY; use crate::plugins::telemetry::config::TraceIdFormat; use crate::plugins::telemetry::handle_error_internal; + use crate::plugins::telemetry::EnableSubgraphFtv1; use crate::services::router::body::get_body_bytes; use crate::services::RouterRequest; use crate::services::RouterResponse; @@ -2832,6 +2869,63 @@ mod tests { .await; } + #[tokio::test] + async fn test_field_instrumentation_sampler_with_preview_datadog_agent_sampling() { + let plugin = create_plugin_with_config(include_str!( + "testdata/config.field_instrumentation_sampler.router.yaml" + )) + .await; + + let ftv1_counter = Arc::new(AtomicUsize::new(0)); + let ftv1_counter_cloned = ftv1_counter.clone(); + + let mut mock_request_service = MockSupergraphService::new(); + mock_request_service + .expect_call() + .times(10) + .returning(move |req: SupergraphRequest| { + if req + .context + .extensions() + .with_lock(|lock| lock.contains_key::()) + { + ftv1_counter_cloned.fetch_add(1, Ordering::Relaxed); + } + Ok(SupergraphResponse::fake_builder() + .context(req.context) + .status_code(StatusCode::OK) + .header("content-type", "application/json") + .data(json!({"errors": [{"message": "nope"}]})) + .build() + .unwrap()) + }); + let mut request_supergraph_service = + plugin.supergraph_service(BoxService::new(mock_request_service)); + + for _ in 0..10 { + let supergraph_req = SupergraphRequest::fake_builder() + .header("x-custom", "TEST") + .header("conditional-custom", "X") + .header("custom-length", "55") + .header("content-length", "55") + .header("content-type", "application/graphql") + .query("Query test { me {name} }") + .operation_name("test".to_string()); + let _router_response = request_supergraph_service + .ready() + .await + .unwrap() + .call(supergraph_req.build().unwrap()) + .await + .unwrap() + .next_response() + .await + .unwrap(); + } + // It should be 100% because when we set preview_datadog_agent_sampling, we only take the value of field_level_instrumentation_sampler + assert_eq!(ftv1_counter.load(Ordering::Relaxed), 10); + } + #[tokio::test] async fn test_subgraph_metrics_ok() { async { diff --git a/apollo-router/src/plugins/telemetry/otel/layer.rs b/apollo-router/src/plugins/telemetry/otel/layer.rs index 866bf50a350..86415d2b4dc 100644 --- a/apollo-router/src/plugins/telemetry/otel/layer.rs +++ b/apollo-router/src/plugins/telemetry/otel/layer.rs @@ -677,13 +677,13 @@ pub(crate) fn configure(sampler: &SamplerOption) { }, }; - SPAN_SAMPLING_RATE.store(f64::to_bits(ratio), Ordering::Relaxed); + SPAN_SAMPLING_RATE.store(f64::to_bits(ratio), Ordering::SeqCst); } impl OpenTelemetryLayer { fn sample(&self) -> bool { let s: f64 = thread_rng().gen_range(0.0..=1.0); - s <= f64::from_bits(SPAN_SAMPLING_RATE.load(Ordering::Relaxed)) + s <= f64::from_bits(SPAN_SAMPLING_RATE.load(Ordering::SeqCst)) } } diff --git a/apollo-router/src/plugins/telemetry/otel/tracer.rs b/apollo-router/src/plugins/telemetry/otel/tracer.rs index 463fd8cb2ce..6b11bab9ad3 100644 --- a/apollo-router/src/plugins/telemetry/otel/tracer.rs +++ b/apollo-router/src/plugins/telemetry/otel/tracer.rs @@ -16,7 +16,6 @@ use opentelemetry_sdk::trace::Tracer as SdkTracer; use opentelemetry_sdk::trace::TracerProvider as SdkTracerProvider; use super::OtelData; -use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; /// An interface for authors of OpenTelemetry SDKs to build pre-sampled tracers. /// @@ -81,6 +80,7 @@ impl PreSampledTracer for SdkTracer { let parent_cx = &data.parent_cx; let builder = &mut data.builder; + // If we have a parent span that means we have a parent span coming from a propagator // Gather trace state let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); @@ -159,12 +159,7 @@ fn process_sampling_result( decision: SamplingDecision::RecordAndSample, trace_state, .. - } => Some(( - trace_flags | TraceFlags::SAMPLED, - trace_state - .with_priority_sampling(true) - .with_measuring(true), - )), + } => Some((trace_flags | TraceFlags::SAMPLED, trace_state.clone())), } } diff --git a/apollo-router/src/plugins/telemetry/testdata/config.field_instrumentation_sampler.router.yaml b/apollo-router/src/plugins/telemetry/testdata/config.field_instrumentation_sampler.router.yaml new file mode 100644 index 00000000000..54f4167b22d --- /dev/null +++ b/apollo-router/src/plugins/telemetry/testdata/config.field_instrumentation_sampler.router.yaml @@ -0,0 +1,11 @@ +telemetry: + instrumentation: + spans: + mode: spec_compliant + apollo: + field_level_instrumentation_sampler: 1.0 + exporters: + tracing: + common: + preview_datadog_agent_sampling: true + sampler: 0.5 \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/agent_sampling.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/agent_sampling.rs new file mode 100644 index 00000000000..2fc04e94bd9 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/agent_sampling.rs @@ -0,0 +1,376 @@ +use opentelemetry_api::trace::Link; +use opentelemetry_api::trace::SamplingDecision; +use opentelemetry_api::trace::SamplingResult; +use opentelemetry_api::trace::SpanKind; +use opentelemetry_api::trace::TraceId; +use opentelemetry_api::Key; +use opentelemetry_api::KeyValue; +use opentelemetry_api::OrderMap; +use opentelemetry_api::Value; +use opentelemetry_sdk::trace::ShouldSample; + +use crate::plugins::telemetry::tracing::datadog_exporter::propagator::SamplingPriority; +use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; + +/// The Datadog Agent Sampler +/// +/// This sampler overrides the sampling decision to ensure that spans are recorded even if they were originally dropped. +/// It performs the following tasks: +/// 1. Ensures the appropriate trace state is set +/// 2. Adds the sampling.priority attribute to the span +/// +/// The sampler can be configured to use parent-based sampling for consistent trace sampling. +/// +#[derive(Debug, Clone)] +pub(crate) struct DatadogAgentSampling { + /// The underlying sampler used for initial sampling decisions + pub(crate) sampler: opentelemetry::sdk::trace::Sampler, + /// Flag to enable parent-based sampling for consistent trace sampling + pub(crate) parent_based_sampler: bool, +} +impl DatadogAgentSampling { + /// Creates a new DatadogAgentSampling instance + /// + /// # Arguments + /// * `sampler` - The underlying sampler to use for initial sampling decisions + /// * `parent_based_sampler` - Whether to use parent-based sampling for consistent trace sampling + pub(crate) fn new( + sampler: opentelemetry::sdk::trace::Sampler, + parent_based_sampler: bool, + ) -> Self { + Self { + sampler, + parent_based_sampler, + } + } +} + +impl ShouldSample for DatadogAgentSampling { + fn should_sample( + &self, + parent_context: Option<&opentelemetry_api::Context>, + trace_id: TraceId, + name: &str, + span_kind: &SpanKind, + attributes: &OrderMap, + links: &[Link], + ) -> SamplingResult { + let mut result = self.sampler.should_sample( + parent_context, + trace_id, + name, + span_kind, + attributes, + links, + ); + // Override the sampling decision to record and make sure that the trace state is set correctly + // if either parent sampling is disabled or it has not been populated by a propagator. + // The propagator gets first dibs on setting the trace state, so if it sets it, we don't override it unless we are not parent based. + match result.decision { + SamplingDecision::Drop | SamplingDecision::RecordOnly => { + result.decision = SamplingDecision::RecordOnly; + if !self.parent_based_sampler || result.trace_state.sampling_priority().is_none() { + result.trace_state = result + .trace_state + .with_priority_sampling(SamplingPriority::AutoReject) + } + } + SamplingDecision::RecordAndSample => { + if !self.parent_based_sampler || result.trace_state.sampling_priority().is_none() { + result.trace_state = result + .trace_state + .with_priority_sampling(SamplingPriority::AutoKeep) + } + } + } + + // We always want to measure + result.trace_state = result.trace_state.with_measuring(true); + // We always want to set the sampling.priority attribute in case we are communicating with the agent via otlp. + // Reverse engineered from https://github.com/DataDog/datadog-agent/blob/c692f62423f93988b008b669008f9199a5ad196b/pkg/trace/api/otlp.go#L502 + result.attributes.push(KeyValue::new( + "sampling.priority", + Value::I64( + result + .trace_state + .sampling_priority() + .expect("sampling priority") + .as_i64(), + ), + )); + result + } +} +#[cfg(test)] +mod tests { + use buildstructor::Builder; + use opentelemetry::sdk::trace::Sampler; + use opentelemetry::trace::TraceState; + use opentelemetry_api::trace::Link; + use opentelemetry_api::trace::SamplingDecision; + use opentelemetry_api::trace::SamplingResult; + use opentelemetry_api::trace::SpanContext; + use opentelemetry_api::trace::SpanId; + use opentelemetry_api::trace::SpanKind; + use opentelemetry_api::trace::TraceContextExt; + use opentelemetry_api::trace::TraceFlags; + use opentelemetry_api::trace::TraceId; + use opentelemetry_api::Context; + use opentelemetry_api::Key; + use opentelemetry_api::OrderMap; + use opentelemetry_api::Value; + use opentelemetry_sdk::trace::ShouldSample; + + use crate::plugins::telemetry::tracing::datadog::DatadogAgentSampling; + use crate::plugins::telemetry::tracing::datadog_exporter::propagator::SamplingPriority; + use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; + + #[derive(Debug, Clone, Builder)] + struct StubSampler { + decision: SamplingDecision, + } + + impl ShouldSample for StubSampler { + fn should_sample( + &self, + _parent_context: Option<&Context>, + _trace_id: TraceId, + _name: &str, + _span_kind: &SpanKind, + _attributes: &OrderMap, + _links: &[Link], + ) -> SamplingResult { + SamplingResult { + decision: self.decision.clone(), + attributes: Vec::new(), + trace_state: Default::default(), + } + } + } + + #[test] + fn test_should_sample_drop() { + // Test case where the sampling decision is Drop + let sampler = StubSampler::builder() + .decision(SamplingDecision::Drop) + .build(); + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), false); + + let result = datadog_sampler.should_sample( + None, + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Verify that the decision is RecordOnly (converted from Drop) + assert_eq!(result.decision, SamplingDecision::RecordOnly); + // Verify that the sampling priority is set to AutoReject + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::AutoReject) + ); + // Verify that the sampling.priority attribute is set correctly + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::AutoReject.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } + + #[test] + fn test_should_sample_record_only() { + let sampler = StubSampler::builder() + .decision(SamplingDecision::RecordOnly) + .build(); + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), false); + + let result = datadog_sampler.should_sample( + None, + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Record only should remain as record only + assert_eq!(result.decision, SamplingDecision::RecordOnly); + + // Verify that the sampling priority is set to AutoReject so the trace won't be transmitted to Datadog + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::AutoReject) + ); + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::AutoReject.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } + + #[test] + fn test_should_sample_record_and_sample() { + let sampler = StubSampler::builder() + .decision(SamplingDecision::RecordAndSample) + .build(); + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), false); + + let result = datadog_sampler.should_sample( + None, + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Record and sample should remain as record and sample + assert_eq!(result.decision, SamplingDecision::RecordAndSample); + + // Verify that the sampling priority is set to AutoKeep so the trace will be transmitted to Datadog + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::AutoKeep) + ); + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::AutoKeep.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } + + #[test] + fn test_should_sample_with_parent_based_sampler() { + let sampler = StubSampler::builder() + .decision(SamplingDecision::RecordAndSample) + .build(); + + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), true); + + let result = datadog_sampler.should_sample( + Some(&Context::new()), + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Record and sample should remain as record and sample + assert_eq!(result.decision, SamplingDecision::RecordAndSample); + + // Verify that the sampling priority is set to AutoKeep so the trace will be transmitted to Datadog + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::AutoKeep) + ); + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::AutoKeep.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } + + #[test] + fn test_trace_state_already_populated_record_and_sample() { + let sampler = StubSampler::builder() + .decision(SamplingDecision::RecordAndSample) + .build(); + + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), true); + + let result = datadog_sampler.should_sample( + Some(&Context::new().with_remote_span_context(SpanContext::new( + TraceId::from_u128(1), + SpanId::from_u64(1), + TraceFlags::SAMPLED, + true, + TraceState::default().with_priority_sampling(SamplingPriority::UserReject), + ))), + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Record and sample should remain as record and sample + assert_eq!(result.decision, SamplingDecision::RecordAndSample); + + // Verify that the sampling priority is not overridden + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::UserReject) + ); + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::UserReject.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } + + #[test] + fn test_trace_state_already_populated_record_drop() { + let sampler = StubSampler::builder() + .decision(SamplingDecision::Drop) + .build(); + + let datadog_sampler = + DatadogAgentSampling::new(Sampler::ParentBased(Box::new(sampler)), true); + + let result = datadog_sampler.should_sample( + Some(&Context::new().with_remote_span_context(SpanContext::new( + TraceId::from_u128(1), + SpanId::from_u64(1), + TraceFlags::default(), + true, + TraceState::default().with_priority_sampling(SamplingPriority::UserReject), + ))), + TraceId::from_u128(1), + "test_span", + &SpanKind::Internal, + &OrderMap::new(), + &[], + ); + + // Drop is converted to RecordOnly + assert_eq!(result.decision, SamplingDecision::RecordOnly); + + // Verify that the sampling priority is not overridden + assert_eq!( + result.trace_state.sampling_priority(), + Some(SamplingPriority::UserReject) + ); + assert!(result + .attributes + .iter() + .any(|kv| kv.key.as_str() == "sampling.priority" + && kv.value == Value::I64(SamplingPriority::UserReject.as_i64()))); + + // Verify that measuring is enabled + assert!(result.trace_state.measuring_enabled()); + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs similarity index 93% rename from apollo-router/src/plugins/telemetry/tracing/datadog.rs rename to apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs index 4574b529ff0..66fc09f108b 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs @@ -1,15 +1,18 @@ //! Configuration for datadog tracing. +mod agent_sampling; +mod span_processor; + use std::fmt::Debug; use std::fmt::Formatter; use std::time::Duration; +pub(crate) use agent_sampling::DatadogAgentSampling; use ahash::HashMap; use ahash::HashMapExt; use futures::future::BoxFuture; use http::Uri; use opentelemetry::sdk; -use opentelemetry::sdk::trace::BatchSpanProcessor; use opentelemetry::sdk::trace::Builder; use opentelemetry::Value; use opentelemetry_api::trace::SpanContext; @@ -23,6 +26,7 @@ use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use opentelemetry_semantic_conventions::resource::SERVICE_VERSION; use schemars::JsonSchema; use serde::Deserialize; +pub(crate) use span_processor::DatadogSpanProcessor; use tower::BoxError; use crate::plugins::telemetry::config::GenericWith; @@ -210,18 +214,24 @@ impl TracingConfigurator for Config { let mut span_metrics = default_span_metrics(); span_metrics.extend(self.span_metrics.clone()); - Ok(builder.with_span_processor( - BatchSpanProcessor::builder( - ExporterWrapper { - delegate: exporter, - span_metrics, - }, - opentelemetry::runtime::Tokio, - ) - .with_batch_config(self.batch_processor.clone().into()) - .build() - .filtered(), - )) + let batch_processor = opentelemetry::sdk::trace::BatchSpanProcessor::builder( + ExporterWrapper { + delegate: exporter, + span_metrics, + }, + opentelemetry::runtime::Tokio, + ) + .with_batch_config(self.batch_processor.clone().into()) + .build() + .filtered(); + + Ok( + if trace.preview_datadog_agent_sampling.unwrap_or_default() { + builder.with_span_processor(batch_processor.always_sampled()) + } else { + builder.with_span_processor(batch_processor) + }, + ) } } diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/span_processor.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/span_processor.rs new file mode 100644 index 00000000000..7c879c310a8 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/span_processor.rs @@ -0,0 +1,133 @@ +use opentelemetry_api::trace::SpanContext; +use opentelemetry_api::trace::TraceResult; +use opentelemetry_api::Context; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::trace::Span; +use opentelemetry_sdk::trace::SpanProcessor; + +/// When using the Datadog agent we need spans to always be exported. However, the batch span processor will only export spans that are sampled. +/// This wrapper will override the trace flags to always sample. +/// THe datadog exporter itself will look at the `sampling.priority` trace context attribute to determine if the span should be sampled. +#[derive(Debug)] +pub(crate) struct DatadogSpanProcessor { + delegate: T, +} + +impl DatadogSpanProcessor { + pub(crate) fn new(delegate: T) -> Self { + Self { delegate } + } +} + +impl SpanProcessor for DatadogSpanProcessor { + fn on_start(&self, span: &mut Span, cx: &Context) { + self.delegate.on_start(span, cx) + } + + fn on_end(&self, mut span: SpanData) { + // Note that the trace state for measuring and sampling priority is handled in the AgentSampler + // The only purpose of this span processor is to ensure that a span can pass through a batch processor. + let new_trace_flags = span.span_context.trace_flags().with_sampled(true); + span.span_context = SpanContext::new( + span.span_context.trace_id(), + span.span_context.span_id(), + new_trace_flags, + span.span_context.is_remote(), + span.span_context.trace_state().clone(), + ); + self.delegate.on_end(span) + } + + fn force_flush(&self) -> TraceResult<()> { + self.delegate.force_flush() + } + + fn shutdown(&mut self) -> TraceResult<()> { + self.delegate.shutdown() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::Mutex; + use std::time::SystemTime; + + use opentelemetry_api::trace::SpanId; + use opentelemetry_api::trace::SpanKind; + use opentelemetry_api::trace::TraceFlags; + use opentelemetry_api::trace::TraceId; + use opentelemetry_api::Context; + use opentelemetry_sdk::trace::EvictedHashMap; + use opentelemetry_sdk::trace::EvictedQueue; + use opentelemetry_sdk::trace::SpanProcessor; + + use super::*; + + #[derive(Debug, Clone)] + struct MockSpanProcessor { + spans: Arc>>, + } + + impl MockSpanProcessor { + fn new() -> Self { + Self { + spans: Default::default(), + } + } + } + + impl SpanProcessor for MockSpanProcessor { + fn on_start(&self, _span: &mut Span, _cx: &Context) {} + + fn on_end(&self, span: SpanData) { + self.spans.lock().unwrap().push(span); + } + + fn force_flush(&self) -> TraceResult<()> { + Ok(()) + } + + fn shutdown(&mut self) -> TraceResult<()> { + Ok(()) + } + } + + #[test] + fn test_on_end_updates_trace_flags() { + let mock_processor = MockSpanProcessor::new(); + let processor = DatadogSpanProcessor::new(mock_processor.clone()); + let span_context = SpanContext::new( + TraceId::from_u128(1), + SpanId::from_u64(1), + TraceFlags::default(), + false, + Default::default(), + ); + let span_data = SpanData { + span_context, + parent_span_id: SpanId::from_u64(1), + span_kind: SpanKind::Client, + name: Default::default(), + start_time: SystemTime::now(), + end_time: SystemTime::now(), + attributes: EvictedHashMap::new(32, 32), + events: EvictedQueue::new(32), + links: EvictedQueue::new(32), + status: Default::default(), + resource: Default::default(), + instrumentation_lib: Default::default(), + }; + + processor.on_end(span_data.clone()); + + // Verify that the trace flags are updated to sampled + let updated_trace_flags = span_data.span_context.trace_flags().with_sampled(true); + let stored_spans = mock_processor.spans.lock().unwrap(); + assert_eq!(stored_spans.len(), 1); + assert_eq!( + stored_spans[0].span_context.trace_flags(), + updated_trace_flags + ); + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs index fd1590966e8..e11bc9ed785 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/exporter/model/v05.rs @@ -8,6 +8,7 @@ use super::unified_tags::UnifiedTags; use crate::plugins::telemetry::tracing::datadog_exporter::exporter::intern::StringInterner; use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::DD_MEASURED_KEY; use crate::plugins::telemetry::tracing::datadog_exporter::exporter::model::SAMPLING_PRIORITY_KEY; +use crate::plugins::telemetry::tracing::datadog_exporter::propagator::SamplingPriority; use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; use crate::plugins::telemetry::tracing::datadog_exporter::Error; use crate::plugins::telemetry::tracing::datadog_exporter::ModelConfig; @@ -129,10 +130,22 @@ fn write_unified_tag<'a>( } fn get_sampling_priority(span: &SpanData) -> f64 { - if span.span_context.trace_state().priority_sampling_enabled() { - 1.0 - } else { - 0.0 + match span + .span_context + .trace_state() + .sampling_priority() + .unwrap_or_else(|| { + // Datadog sampling has not been set, revert to traceflags + if span.span_context.trace_flags().is_sampled() { + SamplingPriority::AutoKeep + } else { + SamplingPriority::AutoReject + } + }) { + SamplingPriority::UserReject => -1.0, + SamplingPriority::AutoReject => 0.0, + SamplingPriority::AutoKeep => 1.0, + SamplingPriority::UserKeep => 2.0, } } diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs index 1c586d48c84..97f547da810 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog_exporter/mod.rs @@ -158,6 +158,8 @@ pub use propagator::DatadogTraceState; pub use propagator::DatadogTraceStateBuilder; pub(crate) mod propagator { + use std::fmt::Display; + use once_cell::sync::Lazy; use opentelemetry::propagation::text_map_propagator::FieldIter; use opentelemetry::propagation::Extractor; @@ -176,7 +178,7 @@ pub(crate) mod propagator { const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority"; const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02); - const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr"; + pub(crate) const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr"; pub(crate) const TRACE_STATE_MEASURE: &str = "m"; pub(crate) const TRACE_STATE_TRUE_VALUE: &str = "1"; pub(crate) const TRACE_STATE_FALSE_VALUE: &str = "0"; @@ -191,8 +193,8 @@ pub(crate) mod propagator { #[derive(Default)] pub struct DatadogTraceStateBuilder { - priority_sampling: bool, - measuring: bool, + sampling_priority: SamplingPriority, + measuring: Option, } fn boolean_to_trace_state_flag(value: bool) -> &'static str { @@ -209,33 +211,39 @@ pub(crate) mod propagator { #[allow(clippy::needless_update)] impl DatadogTraceStateBuilder { - pub fn with_priority_sampling(self, enabled: bool) -> Self { + pub fn with_priority_sampling(self, sampling_priority: SamplingPriority) -> Self { Self { - priority_sampling: enabled, + sampling_priority, ..self } } pub fn with_measuring(self, enabled: bool) -> Self { Self { - measuring: enabled, + measuring: Some(enabled), ..self } } pub fn build(self) -> TraceState { - let values = [ - ( - TRACE_STATE_MEASURE, - boolean_to_trace_state_flag(self.measuring), - ), - ( + if let Some(measuring) = self.measuring { + let values = [ + (TRACE_STATE_MEASURE, boolean_to_trace_state_flag(measuring)), + ( + TRACE_STATE_PRIORITY_SAMPLING, + &self.sampling_priority.to_string(), + ), + ]; + + TraceState::from_key_value(values).unwrap_or_default() + } else { + let values = [( TRACE_STATE_PRIORITY_SAMPLING, - boolean_to_trace_state_flag(self.priority_sampling), - ), - ]; + &self.sampling_priority.to_string(), + )]; - TraceState::from_key_value(values).unwrap_or_default() + TraceState::from_key_value(values).unwrap_or_default() + } } } @@ -244,9 +252,9 @@ pub(crate) mod propagator { fn measuring_enabled(&self) -> bool; - fn with_priority_sampling(&self, enabled: bool) -> TraceState; + fn with_priority_sampling(&self, sampling_priority: SamplingPriority) -> TraceState; - fn priority_sampling_enabled(&self) -> bool; + fn sampling_priority(&self) -> Option; } impl DatadogTraceState for TraceState { @@ -261,30 +269,77 @@ pub(crate) mod propagator { .unwrap_or_default() } - fn with_priority_sampling(&self, enabled: bool) -> TraceState { - self.insert( - TRACE_STATE_PRIORITY_SAMPLING, - boolean_to_trace_state_flag(enabled), - ) - .unwrap_or_else(|_err| self.clone()) + fn with_priority_sampling(&self, sampling_priority: SamplingPriority) -> TraceState { + self.insert(TRACE_STATE_PRIORITY_SAMPLING, sampling_priority.to_string()) + .unwrap_or_else(|_err| self.clone()) } - fn priority_sampling_enabled(&self) -> bool { - self.get(TRACE_STATE_PRIORITY_SAMPLING) - .map(trace_flag_to_boolean) - .unwrap_or_default() + fn sampling_priority(&self) -> Option { + self.get(TRACE_STATE_PRIORITY_SAMPLING).map(|value| { + SamplingPriority::try_from(value).unwrap_or(SamplingPriority::AutoReject) + }) } } - enum SamplingPriority { + #[derive(Default, Debug, Eq, PartialEq)] + pub(crate) enum SamplingPriority { UserReject = -1, + #[default] AutoReject = 0, AutoKeep = 1, UserKeep = 2, } + impl SamplingPriority { + pub(crate) fn as_i64(&self) -> i64 { + match self { + SamplingPriority::UserReject => -1, + SamplingPriority::AutoReject => 0, + SamplingPriority::AutoKeep => 1, + SamplingPriority::UserKeep => 2, + } + } + } + + impl Display for SamplingPriority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let value = match self { + SamplingPriority::UserReject => -1, + SamplingPriority::AutoReject => 0, + SamplingPriority::AutoKeep => 1, + SamplingPriority::UserKeep => 2, + }; + write!(f, "{}", value) + } + } + + impl SamplingPriority { + pub fn as_str(&self) -> &'static str { + match self { + SamplingPriority::UserReject => "-1", + SamplingPriority::AutoReject => "0", + SamplingPriority::AutoKeep => "1", + SamplingPriority::UserKeep => "2", + } + } + } + + impl TryFrom<&str> for SamplingPriority { + type Error = ExtractError; + + fn try_from(value: &str) -> Result { + match value { + "-1" => Ok(SamplingPriority::UserReject), + "0" => Ok(SamplingPriority::AutoReject), + "1" => Ok(SamplingPriority::AutoKeep), + "2" => Ok(SamplingPriority::UserKeep), + _ => Err(ExtractError::SamplingPriority), + } + } + } + #[derive(Debug)] - enum ExtractError { + pub(crate) enum ExtractError { TraceId, SpanId, SamplingPriority, @@ -311,16 +366,7 @@ pub(crate) mod propagator { } fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) { - if trace_flags & TRACE_FLAG_DEFERRED == TRACE_FLAG_DEFERRED { - (TraceState::default(), trace_flags) - } else { - ( - DatadogTraceStateBuilder::default() - .with_priority_sampling(trace_flags.is_sampled()) - .build(), - TraceFlags::SAMPLED, - ) - } + (TraceState::default(), trace_flags) } impl DatadogPropagator { @@ -343,23 +389,6 @@ pub(crate) mod propagator { .map_err(|_| ExtractError::SpanId) } - fn extract_sampling_priority( - &self, - sampling_priority: &str, - ) -> Result { - let i = sampling_priority - .parse::() - .map_err(|_| ExtractError::SamplingPriority)?; - - match i { - -1 => Ok(SamplingPriority::UserReject), - 0 => Ok(SamplingPriority::AutoReject), - 1 => Ok(SamplingPriority::AutoKeep), - 2 => Ok(SamplingPriority::UserKeep), - _ => Err(ExtractError::SamplingPriority), - } - } - fn extract_span_context( &self, extractor: &dyn Extractor, @@ -371,11 +400,11 @@ pub(crate) mod propagator { let span_id = self .extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or("")) .unwrap_or(SpanId::INVALID); - let sampling_priority = self.extract_sampling_priority( - extractor - .get(DATADOG_SAMPLING_PRIORITY_HEADER) - .unwrap_or(""), - ); + let sampling_priority = extractor + .get(DATADOG_SAMPLING_PRIORITY_HEADER) + .unwrap_or("") + .try_into(); + let sampled = match sampling_priority { Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => { TraceFlags::default() @@ -387,7 +416,10 @@ pub(crate) mod propagator { Err(_) => TRACE_FLAG_DEFERRED, }; - let (trace_state, trace_flags) = create_trace_state_and_flags(sampled); + let (mut trace_state, trace_flags) = create_trace_state_and_flags(sampled); + if let Ok(sampling_priority) = sampling_priority { + trace_state = trace_state.with_priority_sampling(sampling_priority); + } Ok(SpanContext::new( trace_id, @@ -399,14 +431,6 @@ pub(crate) mod propagator { } } - fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority { - if span_context.trace_state().priority_sampling_enabled() { - SamplingPriority::AutoKeep - } else { - SamplingPriority::AutoReject - } - } - impl TextMapPropagator for DatadogPropagator { fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) { let span = cx.span(); @@ -422,8 +446,11 @@ pub(crate) mod propagator { ); if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED { - let sampling_priority = get_sampling_priority(span_context); - + // The sampling priority + let sampling_priority = span_context + .trace_state() + .sampling_priority() + .unwrap_or_default(); injector.set( DATADOG_SAMPLING_PRIORITY_HEADER, (sampling_priority as i32).to_string(), @@ -460,8 +487,10 @@ pub(crate) mod propagator { (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), - (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), - (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "-1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::UserReject).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::AutoReject).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::AutoKeep).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "2")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::UserKeep).build())), ] } @@ -473,8 +502,10 @@ pub(crate) mod propagator { (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())), (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())), (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())), - (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())), - (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "-1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::UserReject).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::AutoReject).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::AutoKeep).build())), + (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "2")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(SamplingPriority::UserKeep).build())), ] } diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index 0172f3e0941..d2dc62b1381 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -18,6 +18,7 @@ use tower::BoxError; use super::config_new::spans::Spans; use super::formatters::APOLLO_PRIVATE_PREFIX; use crate::plugins::telemetry::config::TracingCommon; +use crate::plugins::telemetry::tracing::datadog::DatadogSpanProcessor; pub(crate) mod apollo; pub(crate) mod apollo_telemetry; @@ -91,6 +92,7 @@ where Self: Sized + SpanProcessor, { fn filtered(self) -> ApolloFilterSpanProcessor; + fn always_sampled(self) -> DatadogSpanProcessor; } impl SpanProcessorExt for T @@ -100,6 +102,12 @@ where fn filtered(self) -> ApolloFilterSpanProcessor { ApolloFilterSpanProcessor { delegate: self } } + + /// This span processor will always send spans to the exporter even if they are not sampled. This is useful for the datadog agent which + /// uses spans for metrics. + fn always_sampled(self) -> DatadogSpanProcessor { + DatadogSpanProcessor::new(self) + } } /// Batch processor configuration diff --git a/apollo-router/src/plugins/telemetry/tracing/otlp.rs b/apollo-router/src/plugins/telemetry/tracing/otlp.rs index be294427f2a..9a61075e5fd 100644 --- a/apollo-router/src/plugins/telemetry/tracing/otlp.rs +++ b/apollo-router/src/plugins/telemetry/tracing/otlp.rs @@ -20,20 +20,23 @@ impl TracingConfigurator for super::super::otlp::Config { fn apply( &self, builder: Builder, - _common: &TracingCommon, + common: &TracingCommon, _spans_config: &Spans, ) -> Result { - tracing::info!("Configuring Otlp tracing: {}", self.batch_processor); let exporter: SpanExporterBuilder = self.exporter(TelemetryDataKind::Traces)?; - - Ok(builder.with_span_processor( - BatchSpanProcessor::builder( - exporter.build_span_exporter()?, - opentelemetry::runtime::Tokio, - ) - .with_batch_config(self.batch_processor.clone().into()) - .build() - .filtered(), - )) + let batch_span_processor = BatchSpanProcessor::builder( + exporter.build_span_exporter()?, + opentelemetry::runtime::Tokio, + ) + .with_batch_config(self.batch_processor.clone().into()) + .build() + .filtered(); + Ok( + if common.preview_datadog_agent_sampling.unwrap_or_default() { + builder.with_span_processor(batch_span_processor.always_sampled()) + } else { + builder.with_span_processor(batch_span_processor) + }, + ) } } diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index 826a377e042..3c222ba3d67 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use std::net::TcpListener; use std::path::PathBuf; use std::process::Stdio; +use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -18,6 +19,7 @@ use fred::types::Scanner; use futures::StreamExt; use http::header::ACCEPT; use http::header::CONTENT_TYPE; +use http::HeaderName; use http::HeaderValue; use mediatype::names::BOUNDARY; use mediatype::names::FORM_DATA; @@ -33,6 +35,7 @@ use opentelemetry::sdk::trace::TracerProvider; use opentelemetry::sdk::Resource; use opentelemetry::testing::trace::NoopSpanExporter; use opentelemetry::trace::TraceContextExt; +use opentelemetry_api::trace::SpanContext; use opentelemetry_api::trace::TraceId; use opentelemetry_api::trace::TracerProvider as OtherTracerProvider; use opentelemetry_api::Context; @@ -126,7 +129,7 @@ impl Respond for TracedResponder { pub enum Telemetry { Jaeger, Otlp { - endpoint: String, + endpoint: Option, }, Datadog, Zipkin, @@ -156,7 +159,9 @@ impl Telemetry { .build(), ) .build(), - Telemetry::Otlp { endpoint } => TracerProvider::builder() + Telemetry::Otlp { + endpoint: Some(endpoint), + } => TracerProvider::builder() .with_config(config) .with_span_processor( BatchSpanProcessor::builder( @@ -201,7 +206,7 @@ impl Telemetry { .build(), ) .build(), - Telemetry::None => TracerProvider::builder() + Telemetry::None | Telemetry::Otlp { endpoint: None } => TracerProvider::builder() .with_config(config) .with_simple_exporter(NoopSpanExporter::default()) .build(), @@ -258,7 +263,29 @@ impl Telemetry { } Telemetry::Datadog => { let propagator = opentelemetry_datadog::DatadogPropagator::new(); - propagator.extract(&headers) + let mut context = propagator.extract(&headers); + // We're going to override the sampled so that we can test sampling priority + if let Some(psr) = headers.get("x-datadog-sampling-priority") { + let state = context + .span() + .span_context() + .trace_state() + .insert("psr", psr.to_string()) + .expect("psr"); + context = context.with_remote_span_context(SpanContext::new( + context.span().span_context().trace_id(), + context.span().span_context().span_id(), + context + .span() + .span_context() + .trace_flags() + .with_sampled(true), + true, + state, + )); + } + + context } Telemetry::Otlp { .. } => { let propagator = opentelemetry::sdk::propagation::TraceContextPropagator::default(); @@ -568,7 +595,7 @@ impl IntegrationTest { async move { let client = reqwest::Client::new(); - let mut builder = client + let builder = client .post(url) .header( CONTENT_TYPE, @@ -579,14 +606,19 @@ impl IntegrationTest { .header("x-my-header", "test") .header("head", "test"); + let mut request = builder.json(&query).build().unwrap(); + telemetry.inject_context(&mut request); + if let Some(headers) = headers { for (name, value) in headers { - builder = builder.header(name, value); + request.headers_mut().remove(&name); + request.headers_mut().append( + HeaderName::from_str(&name).expect("header was invalid"), + value.try_into().expect("header was invalid"), + ); } } - let mut request = builder.json(&query).build().unwrap(); - telemetry.inject_context(&mut request); request.headers_mut().remove(ACCEPT); match client.execute(request).await { Ok(response) => (span_id, response), @@ -605,6 +637,7 @@ impl IntegrationTest { pub fn execute_untraced_query( &self, query: &Value, + headers: Option>, ) -> impl std::future::Future { assert!( self.router.is_some(), @@ -626,6 +659,16 @@ impl IntegrationTest { .unwrap(); request.headers_mut().remove(ACCEPT); + if let Some(headers) = headers { + for (name, value) in headers { + request.headers_mut().remove(&name); + request.headers_mut().append( + HeaderName::from_str(&name).expect("header was invalid"), + value.try_into().expect("header was invalid"), + ); + } + } + match client.execute(request).await { Ok(response) => ( TraceId::from_hex( diff --git a/apollo-router/tests/integration/mod.rs b/apollo-router/tests/integration/mod.rs index c383b5348f0..06b77f688f9 100644 --- a/apollo-router/tests/integration/mod.rs +++ b/apollo-router/tests/integration/mod.rs @@ -39,3 +39,12 @@ impl ValueExt for Value { self.as_str().map(|s| s.to_string()) } } + +impl ValueExt for &Value { + fn select_path<'a>(&'a self, path: &str) -> Result, BoxError> { + Ok(Selector::new().str_path(path)?.value(self).select()?) + } + fn as_string(&self) -> Option { + self.as_str().map(|s| s.to_string()) + } +} diff --git a/apollo-router/tests/integration/telemetry/datadog.rs b/apollo-router/tests/integration/telemetry/datadog.rs index 6aed76ff6da..39757ee3895 100644 --- a/apollo-router/tests/integration/telemetry/datadog.rs +++ b/apollo-router/tests/integration/telemetry/datadog.rs @@ -2,17 +2,18 @@ extern crate core; use std::collections::HashMap; use std::collections::HashSet; -use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use anyhow::anyhow; +use opentelemetry_api::trace::SpanContext; use opentelemetry_api::trace::TraceContextExt; use opentelemetry_api::trace::TraceId; +use opentelemetry_api::Context; use serde_json::json; use serde_json::Value; use tower::BoxError; -use tracing::Span; -use tracing_opentelemetry::OpenTelemetrySpanExt; use wiremock::ResponseTemplate; use crate::integration::common::graph_os_enabled; @@ -28,6 +29,9 @@ struct TraceSpec { span_names: HashSet<&'static str>, measured_spans: HashSet<&'static str>, unmeasured_spans: HashSet<&'static str>, + priority_sampled: Option<&'static str>, + // Not the metrics but the otel attribute + no_priority_sampled_attribute: Option, } #[tokio::test(flavor = "multi_thread")] @@ -35,8 +39,8 @@ async fn test_no_sample() -> Result<(), BoxError> { if !graph_os_enabled() { return Ok(()); } - let subgraph_was_sampled = std::sync::Arc::new(AtomicBool::new(false)); - let subgraph_was_sampled_callback = subgraph_was_sampled.clone(); + let context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let context_clone = context.clone(); let mut router = IntegrationTest::builder() .telemetry(Telemetry::Datadog) .config(include_str!("fixtures/datadog_no_sample.router.yaml")) @@ -44,8 +48,10 @@ async fn test_no_sample() -> Result<(), BoxError> { json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), )) .subgraph_callback(Box::new(move || { - let sampled = Span::current().context().span().span_context().is_sampled(); - subgraph_was_sampled_callback.store(sampled, std::sync::atomic::Ordering::SeqCst); + let context = Context::current(); + let span = context.span(); + let span_context = span.span_context(); + *context_clone.lock().expect("poisoned") = Some(span_context.clone()); })) .build() .await; @@ -54,14 +60,318 @@ async fn test_no_sample() -> Result<(), BoxError> { router.assert_started().await; let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); - let (_id, result) = router.execute_untraced_query(&query).await; + let (_id, result) = router.execute_untraced_query(&query, None).await; router.graceful_shutdown().await; assert!(result.status().is_success()); - assert!(!subgraph_was_sampled.load(std::sync::atomic::Ordering::SeqCst)); + let context = context + .lock() + .expect("poisoned") + .as_ref() + .expect("state") + .clone(); + assert!(context.is_sampled()); + assert_eq!(context.trace_state().get("psr"), Some("0")); Ok(()) } +// We want to check we're able to override the behavior of preview_datadog_agent_sampling configuration even if we set a datadog exporter +#[tokio::test(flavor = "multi_thread")] +async fn test_sampling_datadog_agent_disabled() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let context_clone = context.clone(); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Datadog) + .config(include_str!( + "fixtures/datadog_agent_sampling_disabled.router.yaml" + )) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .subgraph_callback(Box::new(move || { + let context = Context::current(); + let span = context.span(); + let span_context = span.span_context(); + *context_clone.lock().expect("poisoned") = Some(span_context.clone()); + })) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, result) = router.execute_untraced_query(&query, None).await; + router.graceful_shutdown().await; + assert!(result.status().is_success()); + let _context = context + .lock() + .expect("poisoned") + .as_ref() + .expect("state") + .clone(); + + tokio::time::sleep(Duration::from_secs(2)).await; + TraceSpec::builder() + .services([].into()) + .build() + .validate_trace(id) + .await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_priority_sampling_propagated() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let context_clone = context.clone(); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Datadog) + .config(include_str!("fixtures/datadog.router.yaml")) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .subgraph_callback(Box::new(move || { + let context = Context::current(); + let span = context.span(); + let span_context = span.span_context(); + *context_clone.lock().expect("poisoned") = Some(span_context.clone()); + })) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Parent based sampling. psr MUST be populated with the value that we pass in. + test_psr( + &context, + &mut router, + Some("-1"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("-1") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("0"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("0") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("1"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("2"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("2") + .build(), + ) + .await?; + + // No psr was passed in the router is free to set it. This will be 1 as we are going to sample here. + test_psr( + &context, + &mut router, + None, + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + + router.graceful_shutdown().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_priority_sampling_propagated_otel_request() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let context_clone = context.clone(); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { endpoint: None }) + .config(include_str!("fixtures/datadog.router.yaml")) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .subgraph_callback(Box::new(move || { + let context = Context::current(); + let span = context.span(); + let span_context = span.span_context(); + *context_clone.lock().expect("poisoned") = Some(span_context.clone()); + })) + .build() + .await; + + router.start().await; + router.assert_started().await; + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, result) = router.execute_query(&query).await; + assert_eq!( + result + .headers() + .get("apollo-custom-trace-id") + .unwrap() + .to_str() + .unwrap(), + id.to_datadog() + ); + TraceSpec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build() + .validate_trace(id) + .await?; + + router.graceful_shutdown().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_priority_sampling_no_parent_propagated() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let context = std::sync::Arc::new(std::sync::Mutex::new(None)); + let context_clone = context.clone(); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Datadog) + .config(include_str!( + "fixtures/datadog_no_parent_sampler.router.yaml" + )) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .subgraph_callback(Box::new(move || { + let context = Context::current(); + let span = context.span(); + let span_context = span.span_context(); + *context_clone.lock().expect("poisoned") = Some(span_context.clone()); + })) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // The router will ignore the upstream PSR as parent based sampling is disabled. + test_psr( + &context, + &mut router, + Some("-1"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("0"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("1"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + test_psr( + &context, + &mut router, + Some("2"), + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + + test_psr( + &context, + &mut router, + None, + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build(), + ) + .await?; + + router.graceful_shutdown().await; + + Ok(()) +} + +async fn test_psr( + context: &Arc>>, + router: &mut IntegrationTest, + psr: Option<&str>, + trace_spec: TraceSpec, +) -> Result<(), BoxError> { + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let headers = if let Some(psr) = psr { + vec![("x-datadog-sampling-priority".to_string(), psr.to_string())] + } else { + vec![] + }; + let (id, result) = router + .execute_query_with_headers(&query, headers.into_iter().collect()) + .await; + + assert!(result.status().is_success()); + let context = context + .lock() + .expect("poisoned") + .as_ref() + .expect("state") + .clone(); + + assert_eq!( + context.trace_state().get("psr"), + trace_spec.priority_sampled + ); + trace_spec.validate_trace(id).await?; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_default_span_names() -> Result<(), BoxError> { if !graph_os_enabled() { @@ -506,7 +816,7 @@ impl TraceSpec { async fn validate_trace(&self, id: TraceId) -> Result<(), BoxError> { let datadog_id = id.to_datadog(); let url = format!("http://localhost:8126/test/traces?trace_ids={datadog_id}"); - for _ in 0..10 { + for _ in 0..20 { if self.find_valid_trace(&url).await.is_ok() { return Ok(()); } @@ -533,11 +843,12 @@ impl TraceSpec { tracing::debug!("{}", serde_json::to_string_pretty(&trace)?); self.verify_trace_participants(&trace)?; self.verify_spans_present(&trace)?; - self.validate_measured_spans(&trace)?; + self.verify_measured_spans(&trace)?; self.verify_operation_name(&trace)?; self.verify_priority_sampled(&trace)?; + self.verify_priority_sampled_attribute(&trace)?; self.verify_version(&trace)?; - self.validate_span_kinds(&trace)?; + self.verify_span_kinds(&trace)?; Ok(()) } @@ -556,7 +867,7 @@ impl TraceSpec { Ok(()) } - fn validate_measured_spans(&self, trace: &Value) -> Result<(), BoxError> { + fn verify_measured_spans(&self, trace: &Value) -> Result<(), BoxError> { for expected in &self.measured_spans { assert!( self.measured_span(trace, expected)?, @@ -591,11 +902,13 @@ impl TraceSpec { .unwrap_or_default()) } - fn validate_span_kinds(&self, trace: &Value) -> Result<(), BoxError> { + fn verify_span_kinds(&self, trace: &Value) -> Result<(), BoxError> { // Validate that the span.kind has been propagated. We can just do this for a selection of spans. - self.validate_span_kind(trace, "router", "server")?; - self.validate_span_kind(trace, "supergraph", "internal")?; - self.validate_span_kind(trace, "http_request", "client")?; + if self.services.contains("router") { + self.validate_span_kind(trace, "router", "server")?; + self.validate_span_kind(trace, "supergraph", "internal")?; + self.validate_span_kind(trace, "http_request", "client")?; + } Ok(()) } @@ -652,19 +965,24 @@ impl TraceSpec { trace.select_path(&format!("$..[?(@.name == '{}')].meta.['span.kind']", name))?; let binding = binding1.first().or(binding2.first()); - assert!( - binding.is_some(), - "span.kind missing or incorrect {}, {}", - name, - trace - ); - assert_eq!( - binding - .expect("expected binding") - .as_str() - .expect("expected string"), - kind - ); + if binding.is_none() { + return Err(BoxError::from(format!( + "span.kind missing or incorrect {}, {}", + name, trace + ))); + } + + let binding = binding + .expect("expected binding") + .as_str() + .expect("expected string"); + if binding != kind { + return Err(BoxError::from(format!( + "span.kind mismatch, expected {} got {}", + kind, binding + ))); + } + Ok(()) } @@ -685,17 +1003,35 @@ impl TraceSpec { } fn verify_priority_sampled(&self, trace: &Value) -> Result<(), BoxError> { - let binding = trace.select_path("$.._sampling_priority_v1")?; - let sampling_priority = binding.first(); - // having this priority set to 1.0 everytime is not a problem as we're doing pre sampling in the full telemetry stack - // So basically if the trace was not sampled it wouldn't get to this stage and so nothing would be sent - assert_eq!( - sampling_priority - .expect("sampling priority expected") - .as_f64() - .expect("sampling priority must be a number"), - 1.0 - ); + if let Some(psr) = self.priority_sampled { + let binding = + trace.select_path("$..[?(@.service=='router')].metrics._sampling_priority_v1")?; + if binding.is_empty() { + return Err(BoxError::from("missing sampling priority")); + } + for sampling_priority in binding { + assert_eq!( + sampling_priority + .as_f64() + .expect("psr not string") + .to_string(), + psr + ); + } + } + Ok(()) + } + + fn verify_priority_sampled_attribute(&self, trace: &Value) -> Result<(), BoxError> { + if self.no_priority_sampled_attribute.unwrap_or_default() { + let binding = + trace.select_path("$..[?(@.service=='router')].meta['sampling.priority']")?; + if binding.is_empty() { + return Ok(()); + } else { + return Err(BoxError::from("sampling priority attribute exists")); + } + } Ok(()) } } diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml index d6ecc666079..0f0f50dd788 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog.router.yaml @@ -13,6 +13,7 @@ telemetry: resource: env: local1 service.version: router_version_override + preview_datadog_agent_sampling: true datadog: enabled: true batch_processor: diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_agent_sampling_disabled.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_agent_sampling_disabled.router.yaml new file mode 100644 index 00000000000..49b1528c94f --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_agent_sampling_disabled.router.yaml @@ -0,0 +1,23 @@ +telemetry: + apollo: + field_level_instrumentation_sampler: always_off + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + # NOT always_off to allow us to test a sampling probability of zero + sampler: 0.0 + preview_datadog_agent_sampling: false + datadog: + enabled: true + batch_processor: + scheduled_delay: 100ms + fixed_span_names: false + enable_span_mapping: false + instrumentation: + spans: + mode: spec_compliant + diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_default_span_names.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_default_span_names.router.yaml index 67c2c070e60..e874c00fabe 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_default_span_names.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_default_span_names.router.yaml @@ -7,6 +7,7 @@ telemetry: format: datadog common: service_name: router + preview_datadog_agent_sampling: true datadog: enabled: true batch_processor: diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_no_parent_sampler.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_no_parent_sampler.router.yaml new file mode 100644 index 00000000000..2e9c634dd90 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_no_parent_sampler.router.yaml @@ -0,0 +1,28 @@ +telemetry: + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + format: datadog + propagation: + trace_context: true + jaeger: true + common: + service_name: router + parent_based_sampler: false + resource: + env: local1 + service.version: router_version_override + preview_datadog_agent_sampling: true + datadog: + enabled: true + batch_processor: + scheduled_delay: 100ms + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_no_sample.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_no_sample.router.yaml index d89d1043462..19af041c560 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_no_sample.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_no_sample.router.yaml @@ -11,6 +11,7 @@ telemetry: service_name: router # NOT always_off to allow us to test a sampling probability of zero sampler: 0.0 + preview_datadog_agent_sampling: true datadog: enabled: true batch_processor: diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names.router.yaml index 7d5e1ff2e18..bb793301d0a 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names.router.yaml @@ -7,6 +7,7 @@ telemetry: format: datadog common: service_name: router + preview_datadog_agent_sampling: true datadog: enabled: true # Span mapping will always override the span name as far as the test agent is concerned diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names_late.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names_late.router.yaml index dda383a784e..821662b5be8 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names_late.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_override_span_names_late.router.yaml @@ -7,6 +7,7 @@ telemetry: format: datadog common: service_name: router + preview_datadog_agent_sampling: true datadog: enabled: true # Span mapping will always override the span name as far as the test agent is concerned diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml index 96160b18316..0603e72c9cd 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_default.router.yaml @@ -7,6 +7,7 @@ telemetry: format: datadog common: service_name: router + preview_datadog_agent_sampling: true datadog: enabled: true enable_span_mapping: true diff --git a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_override.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_override.router.yaml index a01c44fc61a..5eba22068b3 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_override.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/datadog_resource_mapping_override.router.yaml @@ -7,6 +7,7 @@ telemetry: format: datadog common: service_name: router + preview_datadog_agent_sampling: true datadog: enabled: true enable_span_mapping: true diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml index f4484786f42..aa56c66187b 100644 --- a/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp.router.yaml @@ -9,7 +9,7 @@ telemetry: otlp: enabled: true protocol: http - endpoint: /traces + endpoint: batch_processor: scheduled_delay: 10ms metrics: @@ -22,3 +22,15 @@ telemetry: batch_processor: scheduled_delay: 10ms + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_no_sample.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_no_sample.router.yaml new file mode 100644 index 00000000000..77529f500d9 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_no_sample.router.yaml @@ -0,0 +1,42 @@ +telemetry: + apollo: + field_level_instrumentation_sampler: always_off + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + preview_datadog_agent_sampling: true + sampler: 0.0 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample.router.yaml new file mode 100644 index 00000000000..6b1f32f71f5 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample.router.yaml @@ -0,0 +1,42 @@ +telemetry: + apollo: + field_level_instrumentation_sampler: always_off + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + preview_datadog_agent_sampling: true + sampler: 1.0 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample_no_sample.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample_no_sample.router.yaml new file mode 100644 index 00000000000..77529f500d9 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_agent_sample_no_sample.router.yaml @@ -0,0 +1,42 @@ +telemetry: + apollo: + field_level_instrumentation_sampler: always_off + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + preview_datadog_agent_sampling: true + sampler: 0.0 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation.router.yaml new file mode 100644 index 00000000000..7352f3d6202 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation.router.yaml @@ -0,0 +1,39 @@ +telemetry: + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + preview_datadog_agent_sampling: true + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_agent.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_agent.router.yaml new file mode 100644 index 00000000000..08323073f31 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_agent.router.yaml @@ -0,0 +1,38 @@ +telemetry: + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_parent_sampler.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_parent_sampler.router.yaml new file mode 100644 index 00000000000..7fd47f096be --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_propagation_no_parent_sampler.router.yaml @@ -0,0 +1,40 @@ +telemetry: + exporters: + tracing: + propagation: + datadog: true + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + parent_based_sampler: false + preview_datadog_agent_sampling: true + service_name: router + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml new file mode 100644 index 00000000000..4e31e0d1d63 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml @@ -0,0 +1,40 @@ +telemetry: + apollo: + field_level_instrumentation_sampler: always_off + exporters: + tracing: + propagation: + zipkin: true + trace_context: true + common: + service_name: router + preview_datadog_agent_sampling: true + sampler: 1.0 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true + + subgraph: + attributes: + otel.name: + subgraph_operation_name: string \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_no_parent_sampler.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_no_parent_sampler.router.yaml new file mode 100644 index 00000000000..5fdf22e0d6f --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_no_parent_sampler.router.yaml @@ -0,0 +1,25 @@ +telemetry: + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + parent_based_sampler: false + otlp: + enabled: true + protocol: http + endpoint: /traces + batch_processor: + scheduled_delay: 10ms + metrics: + common: + service_name: router + otlp: + enabled: true + endpoint: /metrics + protocol: http + batch_processor: + scheduled_delay: 10ms + diff --git a/apollo-router/tests/integration/telemetry/jaeger.rs b/apollo-router/tests/integration/telemetry/jaeger.rs index fcf59e4ef59..c9e79bd22ad 100644 --- a/apollo-router/tests/integration/telemetry/jaeger.rs +++ b/apollo-router/tests/integration/telemetry/jaeger.rs @@ -90,7 +90,7 @@ async fn test_local_root() -> Result<(), BoxError> { router.assert_started().await; let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); - let (id, result) = router.execute_untraced_query(&query).await; + let (id, result) = router.execute_untraced_query(&query, None).await; assert!(!result .headers() .get("apollo-custom-trace-id") @@ -121,7 +121,7 @@ async fn test_local_root_no_sample() -> Result<(), BoxError> { router.assert_started().await; let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); - let (_, response) = router.execute_untraced_query(&query).await; + let (_, response) = router.execute_untraced_query(&query, None).await; assert!(response.headers().get("apollo-custom-trace-id").is_some()); router.graceful_shutdown().await; @@ -141,7 +141,7 @@ async fn test_local_root_50_percent_sample() -> Result<(), BoxError> { let query = json!({"query":"query ExampleQuery {topProducts{name}}\n","variables":{}, "operationName": "ExampleQuery"}); for _ in 0..100 { - let (id, result) = router.execute_untraced_query(&query).await; + let (id, result) = router.execute_untraced_query(&query, None).await; if result.headers().get("apollo-custom-trace-id").is_some() && validate_trace( @@ -177,7 +177,7 @@ async fn test_no_telemetry() -> Result<(), BoxError> { router.assert_started().await; let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); - let (_, response) = router.execute_untraced_query(&query).await; + let (_, response) = router.execute_untraced_query(&query, None).await; assert!(response.headers().get("apollo-custom-trace-id").is_none()); router.graceful_shutdown().await; diff --git a/apollo-router/tests/integration/telemetry/otlp.rs b/apollo-router/tests/integration/telemetry/otlp.rs index 7eae04f5675..0ba9178cec9 100644 --- a/apollo-router/tests/integration/telemetry/otlp.rs +++ b/apollo-router/tests/integration/telemetry/otlp.rs @@ -1,10 +1,10 @@ extern crate core; +use std::collections::HashMap; use std::collections::HashSet; use std::time::Duration; use anyhow::anyhow; -use itertools::Itertools; use opentelemetry_api::trace::TraceId; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceResponse; @@ -18,37 +18,22 @@ use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; +use crate::integration::common::graph_os_enabled; use crate::integration::common::Telemetry; use crate::integration::IntegrationTest; use crate::integration::ValueExt; #[tokio::test(flavor = "multi_thread")] async fn test_basic() -> Result<(), BoxError> { - let mock_server = wiremock::MockServer::start().await; - Mock::given(method("POST")) - .and(path("/traces")) - .respond_with(ResponseTemplate::new(200).set_body_raw( - ExportTraceServiceResponse::default().encode_to_vec(), - "application/x-protobuf", - )) - .expect(1..) - .mount(&mock_server) - .await; - Mock::given(method("POST")) - .and(path("/metrics")) - .respond_with(ResponseTemplate::new(200).set_body_raw( - ExportMetricsServiceResponse::default().encode_to_vec(), - "application/x-protobuf", - )) - .expect(1..) - .mount(&mock_server) - .await; - + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; let config = include_str!("fixtures/otlp.router.yaml") .replace("", &mock_server.uri()); let mut router = IntegrationTest::builder() .telemetry(Telemetry::Otlp { - endpoint: format!("{}/traces", mock_server.uri()), + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), }) .config(&config) .build() @@ -65,15 +50,31 @@ async fn test_basic() -> Result<(), BoxError> { .get("apollo-custom-trace-id") .unwrap() .is_empty()); - validate_telemetry( - &mock_server, - id, - &query, - Some("ExampleQuery"), - &["client", "router", "subgraph"], - false, - ) - .await?; + Spec::builder() + .operation_name("ExampleQuery") + .services(["client", "router", "subgraph"].into()) + .span_names( + [ + "query_planning", + "client_request", + "ExampleQuery__products__0", + "fetch", + "execution", + "query ExampleQuery", + "subgraph server", + "parse_query", + "http_request", + ] + .into(), + ) + .build() + .validate_trace(id, &mock_server) + .await?; + Spec::builder() + .service("router") + .build() + .validate_metrics(&mock_server) + .await?; router.touch_config().await; router.assert_reloaded().await; } @@ -81,146 +82,745 @@ async fn test_basic() -> Result<(), BoxError> { Ok(()) } -async fn validate_telemetry( - mock_server: &MockServer, - _id: TraceId, - query: &Value, - operation_name: Option<&str>, - services: &[&'static str], - custom_span_instrumentation: bool, +#[tokio::test(flavor = "multi_thread")] +async fn test_otlp_request_with_datadog_propagator() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_propagation.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(&config) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_query(&query).await; + Spec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build() + .validate_trace(id, &mock_server) + .await?; + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_otlp_request_with_datadog_propagator_no_agent() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_propagation_no_agent.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(&config) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_query(&query).await; + Spec::builder() + .services(["client", "router", "subgraph"].into()) + .build() + .validate_trace(id, &mock_server) + .await?; + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_otlp_request_with_zipkin_trace_context_propagator_with_datadog( ) -> Result<(), BoxError> { - for _ in 0..10 { - let trace_valid = find_valid_trace( - mock_server, - query, - operation_name, - services, - custom_span_instrumentation, - ) + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(&config) + .build() .await; - let metrics_valid = find_valid_metrics(mock_server, query, operation_name, services).await; + router.start().await; + router.assert_started().await; - if metrics_valid.is_ok() && trace_valid.is_ok() { - return Ok(()); - } + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_query(&query).await; + + Spec::builder() + .services(["client", "router", "subgraph"].into()) + .priority_sampled("1") + .build() + .validate_trace(id, &mock_server) + .await?; + // ---------------------- zipkin propagator with unsampled trace + // Testing for an unsampled trace, so it should be sent to the otlp exporter with sampling priority set 0 + // But it shouldn't send the trace to subgraph as the trace is originally not sampled, the main goal is to measure it at the DD agent level + let id = TraceId::from_hex("80f198ee56343ba864fe8b2a57d3eff7").unwrap(); + let headers: HashMap = [ + ( + "X-B3-TraceId".to_string(), + "80f198ee56343ba864fe8b2a57d3eff7".to_string(), + ), + ( + "X-B3-ParentSpanId".to_string(), + "05e3ac9a4f6e3b90".to_string(), + ), + ("X-B3-SpanId".to_string(), "e457b5a2e4d86bd1".to_string()), + ("X-B3-Sampled".to_string(), "0".to_string()), + ] + .into(); + + let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("0") + .build() + .validate_trace(id, &mock_server) + .await?; + // ---------------------- trace context propagation + // Testing for a trace containing the right tracestate with m and psr for DD and a sampled trace, so it should be sent to the otlp exporter with sampling priority set to 1 + // And it should also send the trace to subgraph as the trace is sampled + let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319c").unwrap(); + let headers: HashMap = [ + ( + "traceparent".to_string(), + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string(), + ), + ("tracestate".to_string(), "m=1,psr=1".to_string()), + ] + .into(); + + let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await; + Spec::builder() + .services(["router", "subgraph"].into()) + .priority_sampled("1") + .build() + .validate_trace(id, &mock_server) + .await?; + // ---------------------- + // Testing for a trace containing the right tracestate with m and psr for DD and an unsampled trace, so it should be sent to the otlp exporter with sampling priority set to 0 + // But it shouldn't send the trace to subgraph as the trace is originally not sampled, the main goal is to measure it at the DD agent level + let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319d").unwrap(); + let headers: HashMap = [ + ( + "traceparent".to_string(), + "00-0af7651916cd43dd8448eb211c80319d-b7ad6b7169203331-00".to_string(), + ), + ("tracestate".to_string(), "m=1,psr=0".to_string()), + ] + .into(); + + let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("0") + .build() + .validate_trace(id, &mock_server) + .await?; + // ---------------------- + // Testing for a trace containing a tracestate m and psr with psr set to 1 for DD and an unsampled trace, so it should be sent to the otlp exporter with sampling priority set to 1 + // It should not send the trace to the subgraph as we didn't use the datadog propagator and therefore the trace will remain unsampled. + let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319e").unwrap(); + let headers: HashMap = [ + ( + "traceparent".to_string(), + "00-0af7651916cd43dd8448eb211c80319e-b7ad6b7169203331-00".to_string(), + ), + ("tracestate".to_string(), "m=1,psr=1".to_string()), + ] + .into(); + + let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build() + .validate_trace(id, &mock_server) + .await?; + + // Be careful if you add the same kind of test crafting your own trace id, make sure to increment the previous trace id by 1 if not you'll receive all the previous spans tested with the same trace id before + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_untraced_request_no_sample_datadog_agent() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_agent_no_sample.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder().config(&config).build().await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_untraced_query(&query, None).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("0") + .build() + .validate_trace(id, &mock_server) + .await?; + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_untraced_request_sample_datadog_agent() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_agent_sample.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder().config(&config).build().await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_untraced_query(&query, None).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build() + .validate_trace(id, &mock_server) + .await?; + router.graceful_shutdown().await; + Ok(()) +} - tokio::time::sleep(Duration::from_millis(100)).await; +#[tokio::test(flavor = "multi_thread")] +async fn test_untraced_request_sample_datadog_agent_unsampled() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); } - find_valid_trace( - mock_server, - query, - operation_name, - services, - custom_span_instrumentation, + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_agent_sample_no_sample.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(&config) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let (id, _) = router.execute_untraced_query(&query, None).await; + Spec::builder() + .services(["router"].into()) + .priority_sampled("0") + .build() + .validate_trace(id, &mock_server) + .await?; + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_priority_sampling_propagated() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_propagation.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + // We're using datadog propagation as this is what we are trying to test. + .telemetry(Telemetry::Datadog) + .config(config) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Parent based sampling. psr MUST be populated with the value that we pass in. + test_psr( + &mut router, + Some("-1"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("-1") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("0"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("0") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("1"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("2"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("2") + .build(), + &mock_server, + ) + .await?; + + // No psr was passed in the router is free to set it. This will be 1 as we are going to sample here. + test_psr( + &mut router, + None, + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + + router.graceful_shutdown().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_priority_sampling_no_parent_propagated() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + let mock_server = mock_otlp_server().await; + let config = include_str!("fixtures/otlp_datadog_propagation_no_parent_sampler.router.yaml") + .replace("", &mock_server.uri()); + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Datadog) + .config(config) + .responder(ResponseTemplate::new(200).set_body_json( + json!({"data":{"topProducts":[{"name":"Table"},{"name":"Couch"},{"name":"Chair"}]}}), + )) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // The router will ignore the upstream PSR as parent based sampling is disabled. + test_psr( + &mut router, + Some("-1"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("0"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("1"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + test_psr( + &mut router, + Some("2"), + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, ) .await?; - find_valid_metrics(mock_server, query, operation_name, services).await?; + + test_psr( + &mut router, + None, + Spec::builder() + .services(["router"].into()) + .priority_sampled("1") + .build(), + &mock_server, + ) + .await?; + + router.graceful_shutdown().await; Ok(()) } -async fn find_valid_trace( +async fn test_psr( + router: &mut IntegrationTest, + psr: Option<&str>, + trace_spec: Spec, mock_server: &MockServer, - _query: &Value, - _operation_name: Option<&str>, - services: &[&'static str], - _custom_span_instrumentation: bool, ) -> Result<(), BoxError> { - let requests = mock_server - .received_requests() - .await - .expect("Could not get otlp requests"); - - // A valid trace has: - // * A valid service name - // * All three services - // * The correct spans - // * All spans are parented - // * Required attributes of 'router' span has been set - let traces: Vec<_>= requests - .iter() - .filter_map(|r| { - if r.url.path().ends_with("/traces") { + let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}}); + let headers = if let Some(psr) = psr { + vec![("x-datadog-sampling-priority".to_string(), psr.to_string())] + } else { + vec![] + }; + let (id, result) = router + .execute_query_with_headers(&query, headers.into_iter().collect()) + .await; + + assert!(result.status().is_success()); + trace_spec.validate_trace(id, mock_server).await?; + Ok(()) +} + +#[derive(buildstructor::Builder)] +struct Spec { + operation_name: Option, + version: Option, + services: HashSet<&'static str>, + span_names: HashSet<&'static str>, + measured_spans: HashSet<&'static str>, + unmeasured_spans: HashSet<&'static str>, + priority_sampled: Option<&'static str>, +} + +impl Spec { + #[allow(clippy::too_many_arguments)] + async fn validate_trace(&self, id: TraceId, mock_server: &MockServer) -> Result<(), BoxError> { + for _ in 0..10 { + if self.find_valid_trace(id, mock_server).await.is_ok() { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + self.find_valid_trace(id, mock_server).await?; + Ok(()) + } + + async fn validate_metrics(&self, mock_server: &MockServer) -> Result<(), BoxError> { + for _ in 0..10 { + if self.find_valid_metrics(mock_server).await.is_ok() { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + self.find_valid_metrics(mock_server).await?; + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn find_valid_trace( + &self, + trace_id: TraceId, + mock_server: &MockServer, + ) -> Result<(), BoxError> { + // A valid trace has: + // * All three services + // * The correct spans + // * All spans are parented + // * Required attributes of 'router' span has been set + + let requests = mock_server.received_requests().await; + let trace= Value::Array(requests.unwrap_or_default().iter().filter(|r| r.url.path().ends_with("/traces")) + .filter_map(|r|{ match opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest::decode( bytes::Bytes::copy_from_slice(&r.body), ) { Ok(trace) => { match serde_json::to_value(trace) { - Ok(trace) => { Some(Ok(trace)) } - Err(e) => { - Some(Err(BoxError::from(format!("failed to decode trace: {}", e)))) + Ok(trace) => { + Some(trace) } + Err(_) => { + None } } } - Err(e) => { - Some(Err(BoxError::from(format!("failed to decode trace: {}", e)))) + Err(_) => { + None } } + }).filter(|t| { + + let datadog_trace_id = TraceId::from_u128(trace_id.to_datadog() as u128); + let trace_found1 = !t.select_path(&format!("$..[?(@.traceId == '{}')]", trace_id)).unwrap_or_default().is_empty(); + let trace_found2 = !t.select_path(&format!("$..[?(@.traceId == '{}')]", datadog_trace_id)).unwrap_or_default().is_empty(); + trace_found1 | trace_found2 + }).collect()); + + self.verify_services(&trace)?; + self.verify_spans_present(&trace)?; + self.verify_measured_spans(&trace)?; + self.verify_operation_name(&trace)?; + self.verify_priority_sampled(&trace)?; + self.verify_version(&trace)?; + self.verify_span_kinds(&trace)?; + + Ok(()) + } + + fn verify_version(&self, trace: &Value) -> Result<(), BoxError> { + if let Some(expected_version) = &self.version { + let binding = trace.select_path("$..version")?; + let version = binding.first(); + assert_eq!( + version + .expect("version expected") + .as_str() + .expect("version must be a string"), + expected_version + ); + } + Ok(()) + } + + fn verify_measured_spans(&self, trace: &Value) -> Result<(), BoxError> { + for expected in &self.measured_spans { + assert!( + self.measured_span(trace, expected)?, + "missing measured span {}", + expected + ); + } + for unexpected in &self.unmeasured_spans { + assert!( + !self.measured_span(trace, unexpected)?, + "unexpected measured span {}", + unexpected + ); + } + Ok(()) + } + + fn measured_span(&self, trace: &Value, name: &str) -> Result { + let binding1 = trace.select_path(&format!( + "$..[?(@.meta.['otel.original_name'] == '{}')].metrics.['_dd.measured']", + name + ))?; + let binding2 = trace.select_path(&format!( + "$..[?(@.name == '{}')].metrics.['_dd.measured']", + name + ))?; + Ok(binding1 + .first() + .or(binding2.first()) + .and_then(|v| v.as_f64()) + .map(|v| v == 1.0) + .unwrap_or_default()) + } + + fn verify_span_kinds(&self, trace: &Value) -> Result<(), BoxError> { + // Validate that the span.kind has been propagated. We can just do this for a selection of spans. + self.validate_span_kind(trace, "router", "server")?; + self.validate_span_kind(trace, "supergraph", "internal")?; + self.validate_span_kind(trace, "http_request", "client")?; + Ok(()) + } + + fn verify_services(&self, trace: &Value) -> Result<(), BoxError> { + let actual_services: HashSet = trace + .select_path("$..resource.attributes..[?(@.key == 'service.name')].value.stringValue")? + .into_iter() + .filter_map(|service| service.as_string()) + .collect(); + tracing::debug!("found services {:?}", actual_services); + let expected_services = self + .services + .iter() + .map(|s| s.to_string()) + .collect::>(); + if actual_services != expected_services { + return Err(BoxError::from(format!( + "incomplete traces, got {actual_services:?} expected {expected_services:?}" + ))); + } + Ok(()) + } + + fn verify_spans_present(&self, trace: &Value) -> Result<(), BoxError> { + let operation_names: HashSet = trace + .select_path("$..spans..name")? + .into_iter() + .filter_map(|span_name| span_name.as_string()) + .collect(); + let mut span_names: HashSet<&str> = self.span_names.clone(); + if self.services.contains("client") { + span_names.insert("client_request"); + } + tracing::debug!("found spans {:?}", operation_names); + let missing_operation_names: Vec<_> = span_names + .iter() + .filter(|o| !operation_names.contains(**o)) + .collect(); + if !missing_operation_names.is_empty() { + return Err(BoxError::from(format!( + "spans did not match, got {operation_names:?}, missing {missing_operation_names:?}" + ))); + } + Ok(()) + } + + fn validate_span_kind(&self, trace: &Value, name: &str, kind: &str) -> Result<(), BoxError> { + let kind = match kind { + "internal" => 1, + "client" => 3, + "server" => 2, + _ => panic!("unknown kind"), + }; + let binding1 = trace.select_path(&format!( + "$..spans..[?(@.kind == {})]..[?(@.key == 'otel.original_name')].value..[?(@ == '{}')]", + kind, name + ))?; + let binding2 = trace.select_path(&format!( + "$..spans..[?(@.kind == {} && @.name == '{}')]", + kind, name + ))?; + let binding = binding1.first().or(binding2.first()); + + if binding.is_none() { + return Err(BoxError::from(format!( + "span.kind missing or incorrect {}, {}", + name, kind + ))); + } + Ok(()) + } + + fn verify_operation_name(&self, trace: &Value) -> Result<(), BoxError> { + if let Some(expected_operation_name) = &self.operation_name { + let binding = + trace.select_path("$..[?(@.name == 'supergraph')]..[?(@.key == 'graphql.operation.name')].value.stringValue")?; + let operation_name = binding.first(); + assert_eq!( + operation_name + .expect("graphql.operation.name expected") + .as_str() + .expect("graphql.operation.name must be a string"), + expected_operation_name + ); + } + Ok(()) + } + + fn verify_priority_sampled(&self, trace: &Value) -> Result<(), BoxError> { + if let Some(psr) = self.priority_sampled { + let binding = trace.select_path( + "$..[?(@.name == 'execution')]..[?(@.key == 'sampling.priority')].value.intValue", + )?; + if binding.is_empty() { + return Err(BoxError::from("missing sampling priority")); } - else { - None + for sampling_priority in binding { + assert_eq!( + sampling_priority + .as_i64() + .expect("psr not an integer") + .to_string(), + psr + ); } - }) - .try_collect()?; - if !traces.is_empty() { - let json_trace = serde_json::Value::Array(traces); - verify_trace_participants(&json_trace, services)?; - + } else { + assert!(trace.select_path("$..[?(@.name == 'execution')]..[?(@.key == 'sampling.priority')].value.intValue")?.is_empty()) + } Ok(()) - } else { - Err(anyhow!("No traces received").into()) } -} -fn verify_trace_participants(trace: &Value, services: &[&'static str]) -> Result<(), BoxError> { - let actual_services: HashSet = trace - .select_path("$..resource.attributes[?(@.key=='service.name')].value.stringValue")? - .into_iter() - .filter_map(|service| service.as_string()) - .collect(); - tracing::debug!("found services {:?}", actual_services); - - let expected_services = services - .iter() - .map(|s| s.to_string()) - .collect::>(); - if actual_services != expected_services { - return Err(BoxError::from(format!( - "incomplete traces, got {actual_services:?} expected {expected_services:?}" - ))); + async fn find_valid_metrics(&self, mock_server: &MockServer) -> Result<(), BoxError> { + let requests = mock_server + .received_requests() + .await + .expect("Could not get otlp requests"); + if let Some(metrics) = requests.iter().find(|r| r.url.path().ends_with("/metrics")) { + let metrics = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(bytes::Bytes::copy_from_slice(&metrics.body))?; + let json_metrics = serde_json::to_value(metrics)?; + // For now just validate service name. + self.verify_services(&json_metrics)?; + + Ok(()) + } else { + Err(anyhow!("No metrics received").into()) + } } - Ok(()) } -fn validate_service_name(trace: Value) -> Result<(), BoxError> { - let service_name = - trace.select_path("$..resource.attributes[?(@.key=='service.name')].value.stringValue")?; - assert_eq!( - service_name.first(), - Some(&&Value::String("router".to_string())) - ); - Ok(()) +async fn mock_otlp_server() -> MockServer { + let mock_server = wiremock::MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/traces")) + .respond_with(ResponseTemplate::new(200).set_body_raw( + ExportTraceServiceResponse::default().encode_to_vec(), + "application/x-protobuf", + )) + .expect(1..) + .mount(&mock_server) + .await; + Mock::given(method("POST")) + .and(path("/metrics")) + .respond_with(ResponseTemplate::new(200).set_body_raw( + ExportMetricsServiceResponse::default().encode_to_vec(), + "application/x-protobuf", + )) + .expect(1..) + .mount(&mock_server) + .await; + mock_server } -async fn find_valid_metrics( - mock_server: &MockServer, - _query: &Value, - _operation_name: Option<&str>, - _services: &[&'static str], -) -> Result<(), BoxError> { - let requests = mock_server - .received_requests() - .await - .expect("Could not get otlp requests"); - if let Some(metrics) = requests.iter().find(|r| r.url.path().ends_with("/metrics")) { - let metrics = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(bytes::Bytes::copy_from_slice(&metrics.body))?; - let json_trace = serde_json::to_value(metrics)?; - // For now just validate service name. - validate_service_name(json_trace)?; - - Ok(()) - } else { - Err(anyhow!("No metrics received").into()) +pub(crate) trait DatadogId { + fn to_datadog(&self) -> u64; +} +impl DatadogId for TraceId { + fn to_datadog(&self) -> u64 { + let bytes = &self.to_bytes()[std::mem::size_of::()..std::mem::size_of::()]; + u64::from_be_bytes(bytes.try_into().unwrap()) } } diff --git a/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx b/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx index 0eea7691d93..e91b12508e2 100644 --- a/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx +++ b/docs/source/configuration/telemetry/exporters/tracing/datadog.mdx @@ -12,11 +12,16 @@ For general tracing configuration, refer to [Router Tracing Configuration](./ove ## OTLP configuration -To export traces to Datadog via OTLP, you must do the following: -- Configure the Datadog agent to accept OTLP traces. -- Configure the router to send traces to the Datadog agent. +OTLP is the [OpenTelemetry protocol](https://opentelemetry.io/docs/specs/otel/protocol/), and is the recommended protocol for transmitting telemetry, including traces, to Datadog. -To configure the Datadog agent, add OTLP configuration to your `datadog.yaml`. For example: +To setup traces to Datadog via OTLP, you must do the following: + +- Modify the default configuration of the Datadog Agent to accept OTLP traces submitted to it by the router. +- Configure the router to send traces to the configured Datadog Agent. + +### Datadog Agent configuration + +To configure the Datadog Agent, add OTLP configuration to your `datadog.yaml`. For example: ```yaml title="datadog.yaml" otlp_config: @@ -26,26 +31,42 @@ otlp_config: endpoint: :4317 ``` -To configure the router, enable the [OTLP exporter](./otlp) and set `endpoint: `. For example: +For additional Datadog Agent configuration details, review Datadog's [Enabling OTLP Ingestion on the Datadog Agent](https://docs.datadoghq.com/opentelemetry/interoperability/otlp_ingest_in_the_agent/?tab=host#enabling-otlp-ingestion-on-the-datadog-agent) documentation. + +### Router configuration + +To configure the router, enable the [OTLP exporter](./otlp) and set `endpoint: `. For example: ```yaml title="router.yaml" telemetry: exporters: tracing: + common: + # Only 10 percent of spans will be forwarded from the Datadog agent to Datadog. Experiment to find a value that is good for you! + sampler: 0.1 + otlp: enabled: true - # Optional endpoint, either 'default' or a URL (Defaults to http://127.0.0.1:4317) endpoint: "${env.DATADOG_AGENT_HOST}:4317" + # Optional batch processor setting, this will enable the batch processor to send concurrent requests in a high load scenario. + batch_processor: + max_concurrent_exports: 100 ``` -For more details about Datadog configuration, see [Datadog Agent configuration](https://docs.datadoghq.com/opentelemetry/otlp_ingest_in_the_agent/?tab=host). +Adjusting the `sampler` will allow you to control the sampling decisions that the router will make on its own and decrease the rate at which you sample, which can have a direct impact on your your Datadog bill. + + + +Depending on the volume of spans being created in a router instance, it will be necessary to adjust the `batch_processor` settings in your `exporter` config. If this is necessary, you will see warning messages from the router regarding the batch span processor. This applies to both OTLP and the Datadog native exporter. + + ### Enabling log correlation To enable Datadog log correlation, you must configure `dd.trace_id` to appear on the `router` span: - + ```yaml title="router.yaml" telemetry: instrumentation: @@ -72,10 +93,18 @@ The router can be configured to connect to either the native, default Datadog ag telemetry: exporters: tracing: - datadog: - enabled: true - # Optional endpoint, either 'default' or a URL (Defaults to http://127.0.0.1:8126) - endpoint: "http://${env.DATADOG_AGENT_HOST}:8126" + common: + # Only 10 percent of spans will be forwarded from the Datadog agent to Datadog. Experiment to find a value that is good for you! + sampler: 0.1 + + datadog: + enabled: true + # Optional endpoint, either 'default' or a URL (Defaults to http://127.0.0.1:8126) + endpoint: "http://${env.DATADOG_AGENT_HOST}:8126" + + # Optional batch processor setting, this will enable the batch processor to send concurrent requests in a high load scenario. + batch_processor: + max_concurrent_exports: 100 # Enable graphql.operation.name attribute on supergraph spans. instrumentation: @@ -86,6 +115,12 @@ telemetry: graphql.operation.name: true ``` + + +Depending on the volume of spans being created in a router instance, it will be necessary to adjust the `batch_processor` settings in your `exporter` config. This applies to both OTLP and the Datadog native exporter. + + + ### `enabled` Set to true to enable the Datadog exporter. Defaults to false. @@ -227,11 +262,11 @@ If you have introduced a new span in a custom build of the Router you can enable telemetry: exporters: tracing: - datadog: - batch_processor: + datadog: + batch_processor: max_export_batch_size: 512 max_concurrent_exports: 1 - max_export_timeout: 30s + max_export_timeout: 30s max_queue_size: 2048 scheduled_delay: 5s ```