diff --git a/changelog.d/24455_otel_source_per_signal_decoding.enhancement.md b/changelog.d/24455_otel_source_per_signal_decoding.enhancement.md new file mode 100644 index 0000000000000..79db8934df463 --- /dev/null +++ b/changelog.d/24455_otel_source_per_signal_decoding.enhancement.md @@ -0,0 +1,21 @@ +The `opentelemetry` source now supports independent configuration of OTLP decoding for logs, metrics, and traces. This allows more granular +control over which signal types are decoded, while maintaining backward compatibility with the existing boolean configuration. + +## Simple boolean form (applies to all signals) + +```yaml +use_otlp_decoding: true # All signals preserve OTLP format +# or +use_otlp_decoding: false # All signals use Vector native format (default) +``` + +## Per-signal configuration + +```yaml +use_otlp_decoding: + logs: false # Convert to Vector native format + metrics: false # Convert to Vector native format + traces: true # Preserve OTLP format +``` + +authors: pront diff --git a/src/sources/opentelemetry/config.rs b/src/sources/opentelemetry/config.rs index 8c93b93e55921..c7eed8f301b50 100644 --- a/src/sources/opentelemetry/config.rs +++ b/src/sources/opentelemetry/config.rs @@ -47,6 +47,65 @@ pub const LOGS: &str = "logs"; pub const METRICS: &str = "metrics"; pub const TRACES: &str = "traces"; +/// Configuration for OTLP decoding behavior. +#[configurable_component] +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct OtlpDecodingConfig { + /// Whether to use OTLP decoding for logs. + /// + /// When `true`, logs preserve their OTLP format. + /// When `false` (default), logs are converted to Vector's native format. + #[serde(default)] + pub logs: bool, + + /// Whether to use OTLP decoding for metrics. + /// + /// When `true`, metrics preserve their OTLP format but are processed as logs. + /// When `false` (default), metrics are converted to Vector's native metric format. + #[serde(default)] + pub metrics: bool, + + /// Whether to use OTLP decoding for traces. + /// + /// When `true`, traces preserve their OTLP format. + /// When `false` (default), traces are converted to Vector's native format. + #[serde(default)] + pub traces: bool, +} + +impl From for OtlpDecodingConfig { + /// Converts a boolean value to an OtlpDecodingConfig. + /// + /// This provides backward compatibility with the previous boolean configuration. + /// - `true` enables OTLP decoding for all signals + /// - `false` disables OTLP decoding for all signals (uses Vector native format) + fn from(value: bool) -> Self { + Self { + logs: value, + metrics: value, + traces: value, + } + } +} + +impl OtlpDecodingConfig { + /// Returns true if any signal is configured to use OTLP decoding. + pub const fn any_enabled(&self) -> bool { + self.logs || self.metrics || self.traces + } + + /// Returns true if all signals are configured to use OTLP decoding. + pub const fn all_enabled(&self) -> bool { + self.logs && self.metrics && self.traces + } + + /// Returns true if signals have mixed configuration (some enabled, some disabled). + pub const fn is_mixed(&self) -> bool { + self.any_enabled() && !self.all_enabled() + } +} + /// Configuration for the `opentelemetry` source. #[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))] #[derive(Clone, Debug)] @@ -67,14 +126,36 @@ pub struct OpentelemetryConfig { #[serde(default)] pub log_namespace: Option, - /// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly. + /// Configuration for OTLP decoding behavior. /// - /// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format. - /// This means that components that work on metrics, will not be compatible with this output. - /// However, these events can be forwarded directly to a downstream OTEL collector. - #[configurable(derived)] - #[serde(default)] - pub use_otlp_decoding: bool, + /// This configuration controls how OpenTelemetry Protocol (OTLP) data is decoded for each + /// signal type (logs, metrics, traces). When a signal is configured to use OTLP decoding, the raw OTLP format is + /// preserved, allowing the data to be forwarded to downstream OTLP collectors without transformation. + /// Otherwise, the signal is converted to Vector's native event format. + /// + /// Simple boolean form: + /// + /// ```yaml + /// use_otlp_decoding: true # All signals preserve OTLP format + /// # or + /// use_otlp_decoding: false # All signals use Vector native format (default) + /// ``` + /// + /// Per-signal configuration: + /// + /// ```yaml + /// use_otlp_decoding: + /// logs: false # Convert to Vector native format + /// metrics: false # Convert to Vector native format + /// traces: true # Preserve OTLP format + /// ``` + /// + /// **Note:** When OTLP decoding is enabled for metrics: + /// - Metrics are parsed as logs while preserving the OTLP format + /// - Vector's metric transforms will NOT be compatible with this output + /// - The events can be forwarded directly (passthrough) to a downstream OTLP collector + #[serde(default, deserialize_with = "bool_or_struct")] + pub use_otlp_decoding: OtlpDecodingConfig, } /// Configuration for the `opentelemetry` gRPC server. @@ -152,18 +233,24 @@ impl GenerateConfig for OpentelemetryConfig { http: example_http_config(), acknowledgements: Default::default(), log_namespace: None, - use_otlp_decoding: false, + use_otlp_decoding: OtlpDecodingConfig::default(), }) .unwrap() } } impl OpentelemetryConfig { - fn get_signal_deserializer( + pub(crate) fn get_signal_deserializer( &self, signal_type: OtlpSignalType, ) -> vector_common::Result> { - if self.use_otlp_decoding { + let should_use_otlp = match signal_type { + OtlpSignalType::Logs => self.use_otlp_decoding.logs, + OtlpSignalType::Metrics => self.use_otlp_decoding.metrics, + OtlpSignalType::Traces => self.use_otlp_decoding.traces, + }; + + if should_use_otlp { Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([ signal_type, ])))) @@ -183,6 +270,16 @@ impl SourceConfig for OpentelemetryConfig { let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?; + // Log info message when using mixed OTLP decoding formats + if self.use_otlp_decoding.is_mixed() { + info!( + message = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format.", + logs_otlp = self.use_otlp_decoding.logs, + metrics_otlp = self.use_otlp_decoding.metrics, + traces_otlp = self.use_otlp_decoding.traces, + ); + } + let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?; let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?; let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?; @@ -352,13 +449,13 @@ impl SourceConfig for OpentelemetryConfig { } }; - let logs_output = if self.use_otlp_decoding { + let logs_output = if self.use_otlp_decoding.logs { SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS) } else { SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS) }; - let metrics_output = if self.use_otlp_decoding { + let metrics_output = if self.use_otlp_decoding.metrics { SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS) } else { SourceOutput::new_metrics().with_port(METRICS) diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index c712b08d7741f..00d642f41e47e 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -62,7 +62,7 @@ async fn receive_logs_legacy_namespace() { }, acknowledgements: Default::default(), log_namespace: Default::default(), - use_otlp_decoding: false, + use_otlp_decoding: false.into(), }; let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); @@ -161,7 +161,7 @@ async fn receive_trace() { }, acknowledgements: Default::default(), log_namespace: Default::default(), - use_otlp_decoding: false, + use_otlp_decoding: false.into(), }; let (sender, trace_output, _) = new_source(EventStatus::Delivered, TRACES.to_string()); @@ -266,7 +266,7 @@ async fn receive_metric() { }, acknowledgements: Default::default(), log_namespace: Default::default(), - use_otlp_decoding: false, + use_otlp_decoding: false.into(), }; let (sender, metrics_output, _) = new_source(EventStatus::Delivered, METRICS.to_string()); diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 466196ded223d..de5781756dba1 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1087,7 +1087,7 @@ fn get_source_config_with_headers( }, acknowledgements: Default::default(), log_namespace: Default::default(), - use_otlp_decoding, + use_otlp_decoding: use_otlp_decoding.into(), } } @@ -1263,7 +1263,7 @@ pub async fn build_otlp_test_env( }, acknowledgements: Default::default(), log_namespace, - use_otlp_decoding: false, + use_otlp_decoding: false.into(), }; let (sender, output, _) = new_source(EventStatus::Delivered, event_name.to_string()); @@ -1342,7 +1342,7 @@ async fn http_logs_use_otlp_decoding_emits_metric() { }, acknowledgements: Default::default(), log_namespace: None, - use_otlp_decoding: true, + use_otlp_decoding: true.into(), }; let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); @@ -1409,3 +1409,279 @@ async fn http_logs_use_otlp_decoding_emits_metric() { _ => panic!("component_received_events_total should be a counter"), } } + +#[cfg(test)] +mod otlp_decoding_config_tests { + use crate::config::{DataType, LogNamespace, SourceConfig}; + use crate::sources::opentelemetry::config::{ + GrpcConfig, HttpConfig, OpentelemetryConfig, OtlpDecodingConfig, + }; + use vector_lib::codecs::decoding::OtlpSignalType; + + #[test] + fn test_otlp_decoding_mixed_configurations() { + // Test single signal enabled + let config = OtlpDecodingConfig { + logs: false, + metrics: false, + traces: true, + }; + assert!(config.any_enabled()); + assert!(!config.all_enabled()); + assert!(config.is_mixed()); + + // Test two signals enabled + let config = OtlpDecodingConfig { + logs: true, + metrics: false, + traces: true, + }; + assert!(config.any_enabled()); + assert!(!config.all_enabled()); + assert!(config.is_mixed()); + + // Test different single signal + let config = OtlpDecodingConfig { + logs: true, + metrics: false, + traces: false, + }; + assert!(config.any_enabled()); + assert!(!config.all_enabled()); + assert!(config.is_mixed()); + } + + #[test] + fn test_otlp_decoding_from_bool() { + // Test direct From trait implementation + let config_true = OtlpDecodingConfig::from(true); + assert!(config_true.logs); + assert!(config_true.metrics); + assert!(config_true.traces); + assert!(config_true.all_enabled()); + assert!(!config_true.is_mixed()); + + let config_false = OtlpDecodingConfig::from(false); + assert!(!config_false.logs); + assert!(!config_false.metrics); + assert!(!config_false.traces); + assert!(!config_false.any_enabled()); + assert!(!config_false.is_mixed()); + + // Test TOML deserialization (which uses From under the hood) + let config: OpentelemetryConfig = toml::from_str( + r#" + use_otlp_decoding = true + + [grpc] + address = "0.0.0.0:4317" + + [http] + address = "0.0.0.0:4318" + "#, + ) + .unwrap(); + assert!(config.use_otlp_decoding.logs); + assert!(config.use_otlp_decoding.metrics); + assert!(config.use_otlp_decoding.traces); + + let config: OpentelemetryConfig = toml::from_str( + r#" + use_otlp_decoding = false + + [grpc] + address = "0.0.0.0:4317" + + [http] + address = "0.0.0.0:4318" + "#, + ) + .unwrap(); + assert!(!config.use_otlp_decoding.logs); + assert!(!config.use_otlp_decoding.metrics); + assert!(!config.use_otlp_decoding.traces); + } + + #[test] + fn test_otlp_decoding_deserialization_from_struct() { + // Test deserializing from a struct with all fields + let config: OpentelemetryConfig = toml::from_str( + r#" + [grpc] + address = "0.0.0.0:4317" + + [http] + address = "0.0.0.0:4318" + + [use_otlp_decoding] + logs = false + metrics = false + traces = true + "#, + ) + .unwrap(); + assert!(!config.use_otlp_decoding.logs); + assert!(!config.use_otlp_decoding.metrics); + assert!(config.use_otlp_decoding.traces); + + // Test deserializing from a struct with partial fields (using defaults) + let config: OpentelemetryConfig = toml::from_str( + r#" + [grpc] + address = "0.0.0.0:4317" + + [http] + address = "0.0.0.0:4318" + + [use_otlp_decoding] + traces = true + "#, + ) + .unwrap(); + assert!(!config.use_otlp_decoding.logs); // default false + assert!(!config.use_otlp_decoding.metrics); // default false + assert!(config.use_otlp_decoding.traces); + } + + #[test] + fn test_otlp_decoding_default_when_not_specified() { + // Test that when use_otlp_decoding is not specified, it uses defaults (all false) + let config: OpentelemetryConfig = toml::from_str( + r#" + [grpc] + address = "0.0.0.0:4317" + + [http] + address = "0.0.0.0:4318" + "#, + ) + .unwrap(); + assert!(!config.use_otlp_decoding.logs); + assert!(!config.use_otlp_decoding.metrics); + assert!(!config.use_otlp_decoding.traces); + } + + #[tokio::test] + async fn test_get_signal_deserializer_per_signal() { + let config_all_true = OpentelemetryConfig { + grpc: GrpcConfig { + address: "0.0.0.0:4317".parse().unwrap(), + tls: None, + }, + http: HttpConfig { + address: "0.0.0.0:4318".parse().unwrap(), + tls: None, + keepalive: Default::default(), + headers: vec![], + }, + acknowledgements: Default::default(), + log_namespace: None, + use_otlp_decoding: OtlpDecodingConfig { + logs: true, + metrics: true, + traces: true, + }, + }; + + // All should return Some deserializer + assert!( + config_all_true + .get_signal_deserializer(OtlpSignalType::Logs) + .unwrap() + .is_some() + ); + assert!( + config_all_true + .get_signal_deserializer(OtlpSignalType::Metrics) + .unwrap() + .is_some() + ); + assert!( + config_all_true + .get_signal_deserializer(OtlpSignalType::Traces) + .unwrap() + .is_some() + ); + + let config_mixed = OpentelemetryConfig { + grpc: GrpcConfig { + address: "0.0.0.0:4317".parse().unwrap(), + tls: None, + }, + http: HttpConfig { + address: "0.0.0.0:4318".parse().unwrap(), + tls: None, + keepalive: Default::default(), + headers: vec![], + }, + acknowledgements: Default::default(), + log_namespace: None, + use_otlp_decoding: OtlpDecodingConfig { + logs: false, + metrics: false, + traces: true, + }, + }; + + // Only traces should return Some deserializer + assert!( + config_mixed + .get_signal_deserializer(OtlpSignalType::Logs) + .unwrap() + .is_none() + ); + assert!( + config_mixed + .get_signal_deserializer(OtlpSignalType::Metrics) + .unwrap() + .is_none() + ); + assert!( + config_mixed + .get_signal_deserializer(OtlpSignalType::Traces) + .unwrap() + .is_some() + ); + } + + #[test] + fn test_outputs_configuration_per_signal() { + let config_mixed = OpentelemetryConfig { + grpc: GrpcConfig { + address: "0.0.0.0:4317".parse().unwrap(), + tls: None, + }, + http: HttpConfig { + address: "0.0.0.0:4318".parse().unwrap(), + tls: None, + keepalive: Default::default(), + headers: vec![], + }, + acknowledgements: Default::default(), + log_namespace: None, + use_otlp_decoding: OtlpDecodingConfig { + logs: false, + metrics: true, + traces: true, + }, + }; + + let outputs = config_mixed.outputs(LogNamespace::Legacy); + assert_eq!(outputs.len(), 3); + + // Verify logs output (native format) + let logs_output = &outputs[0]; + assert_eq!(logs_output.port.as_deref(), Some("logs")); + assert_eq!(logs_output.ty, DataType::Log); + + // Verify metrics output (OTLP format, logs data type) + let metrics_output = &outputs[1]; + assert_eq!(metrics_output.port.as_deref(), Some("metrics")); + assert_eq!(metrics_output.ty, DataType::Log); // Should be Log when OTLP decoding is enabled + + // Verify traces output (OTLP format, traces data type) + let traces_output = &outputs[2]; + assert_eq!(traces_output.port.as_deref(), Some("traces")); + assert_eq!(traces_output.ty, DataType::Trace); // Should always be Trace regardless of OTLP decoding + } +} diff --git a/website/cue/reference/components/sources/generated/opentelemetry.cue b/website/cue/reference/components/sources/generated/opentelemetry.cue index 4791cc458c8cf..d3b498506a323 100644 --- a/website/cue/reference/components/sources/generated/opentelemetry.cue +++ b/website/cue/reference/components/sources/generated/opentelemetry.cue @@ -327,13 +327,67 @@ generated: components: sources: opentelemetry: configuration: { } use_otlp_decoding: { description: """ - Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly. - - One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format. - This means that components that work on metrics, will not be compatible with this output. - However, these events can be forwarded directly to a downstream OTEL collector. + Configuration for OTLP decoding behavior. + + This configuration controls how OpenTelemetry Protocol (OTLP) data is decoded for each + signal type (logs, metrics, traces). When a signal is configured to use OTLP decoding, the raw OTLP format is + preserved, allowing the data to be forwarded to downstream OTLP collectors without transformation. + Otherwise, the signal is converted to Vector's native event format. + + Simple boolean form: + + ```yaml + use_otlp_decoding: true # All signals preserve OTLP format + # or + use_otlp_decoding: false # All signals use Vector native format (default) + ``` + + Per-signal configuration: + + ```yaml + use_otlp_decoding: + logs: false # Convert to Vector native format + metrics: false # Convert to Vector native format + traces: true # Preserve OTLP format + ``` + + **Note:** When OTLP decoding is enabled for metrics: + - Metrics are parsed as logs while preserving the OTLP format + - Vector's metric transforms will NOT be compatible with this output + - The events can be forwarded directly (passthrough) to a downstream OTLP collector """ required: false - type: bool: default: false + type: object: options: { + logs: { + description: """ + Whether to use OTLP decoding for logs. + + When `true`, logs preserve their OTLP format. + When `false` (default), logs are converted to Vector's native format. + """ + required: false + type: bool: default: false + } + metrics: { + description: """ + Whether to use OTLP decoding for metrics. + + When `true`, metrics preserve their OTLP format but are processed as logs. + When `false` (default), metrics are converted to Vector's native metric format. + """ + required: false + type: bool: default: false + } + traces: { + description: """ + Whether to use OTLP decoding for traces. + + When `true`, traces preserve their OTLP format. + When `false` (default), traces are converted to Vector's native format. + """ + required: false + type: bool: default: false + } + } } }