Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The `opentelemetry` source now supports independent configuration of OTLP decoding for logs, metrics, and traces. This allows more granular
control over which signal types are decoded, while maintaining backward compatibility with the existing boolean configuration.

## Simple boolean form (applies to all signals)

```yaml
use_otlp_decoding: true # All signals preserve OTLP format
# or
use_otlp_decoding: false # All signals use Vector native format (default)
```

## Per-signal configuration

```yaml
use_otlp_decoding:
logs: false # Convert to Vector native format
metrics: false # Convert to Vector native format
traces: true # Preserve OTLP format
```

authors: pront
121 changes: 109 additions & 12 deletions src/sources/opentelemetry/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,65 @@ pub const LOGS: &str = "logs";
pub const METRICS: &str = "metrics";
pub const TRACES: &str = "traces";

/// Configuration for OTLP decoding behavior.
#[configurable_component]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct OtlpDecodingConfig {
/// Whether to use OTLP decoding for logs.
///
/// When `true`, logs preserve their OTLP format.
/// When `false` (default), logs are converted to Vector's native format.
#[serde(default)]
pub logs: bool,

/// Whether to use OTLP decoding for metrics.
///
/// When `true`, metrics preserve their OTLP format but are processed as logs.
/// When `false` (default), metrics are converted to Vector's native metric format.
#[serde(default)]
pub metrics: bool,

/// Whether to use OTLP decoding for traces.
///
/// When `true`, traces preserve their OTLP format.
/// When `false` (default), traces are converted to Vector's native format.
#[serde(default)]
pub traces: bool,
}

impl From<bool> for OtlpDecodingConfig {
/// Converts a boolean value to an OtlpDecodingConfig.
///
/// This provides backward compatibility with the previous boolean configuration.
/// - `true` enables OTLP decoding for all signals
/// - `false` disables OTLP decoding for all signals (uses Vector native format)
fn from(value: bool) -> Self {
Self {
logs: value,
metrics: value,
traces: value,
}
}
}

impl OtlpDecodingConfig {
/// Returns true if any signal is configured to use OTLP decoding.
pub const fn any_enabled(&self) -> bool {
self.logs || self.metrics || self.traces
}

/// Returns true if all signals are configured to use OTLP decoding.
pub const fn all_enabled(&self) -> bool {
self.logs && self.metrics && self.traces
}

/// Returns true if signals have mixed configuration (some enabled, some disabled).
pub const fn is_mixed(&self) -> bool {
self.any_enabled() && !self.all_enabled()
}
}

/// Configuration for the `opentelemetry` source.
#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
#[derive(Clone, Debug)]
Expand All @@ -67,14 +126,36 @@ pub struct OpentelemetryConfig {
#[serde(default)]
pub log_namespace: Option<bool>,

/// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly.
/// Configuration for OTLP decoding behavior.
///
/// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
/// This means that components that work on metrics, will not be compatible with this output.
/// However, these events can be forwarded directly to a downstream OTEL collector.
#[configurable(derived)]
#[serde(default)]
pub use_otlp_decoding: bool,
/// This configuration controls how OpenTelemetry Protocol (OTLP) data is decoded for each
/// signal type (logs, metrics, traces). When a signal is configured to use OTLP decoding, the raw OTLP format is
/// preserved, allowing the data to be forwarded to downstream OTLP collectors without transformation.
/// Otherwise, the signal is converted to Vector's native event format.
///
/// Simple boolean form:
///
/// ```yaml
/// use_otlp_decoding: true # All signals preserve OTLP format
/// # or
/// use_otlp_decoding: false # All signals use Vector native format (default)
/// ```
///
/// Per-signal configuration:
///
/// ```yaml
/// use_otlp_decoding:
/// logs: false # Convert to Vector native format
/// metrics: false # Convert to Vector native format
/// traces: true # Preserve OTLP format
/// ```
///
/// **Note:** When OTLP decoding is enabled for metrics:
/// - Metrics are parsed as logs while preserving the OTLP format
/// - Vector's metric transforms will NOT be compatible with this output
/// - The events can be forwarded directly (passthrough) to a downstream OTLP collector
#[serde(default, deserialize_with = "bool_or_struct")]
pub use_otlp_decoding: OtlpDecodingConfig,
}

/// Configuration for the `opentelemetry` gRPC server.
Expand Down Expand Up @@ -152,18 +233,24 @@ impl GenerateConfig for OpentelemetryConfig {
http: example_http_config(),
acknowledgements: Default::default(),
log_namespace: None,
use_otlp_decoding: false,
use_otlp_decoding: OtlpDecodingConfig::default(),
})
.unwrap()
}
}

impl OpentelemetryConfig {
fn get_signal_deserializer(
pub(crate) fn get_signal_deserializer(
&self,
signal_type: OtlpSignalType,
) -> vector_common::Result<Option<OtlpDeserializer>> {
if self.use_otlp_decoding {
let should_use_otlp = match signal_type {
OtlpSignalType::Logs => self.use_otlp_decoding.logs,
OtlpSignalType::Metrics => self.use_otlp_decoding.metrics,
OtlpSignalType::Traces => self.use_otlp_decoding.traces,
};

if should_use_otlp {
Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
signal_type,
]))))
Expand All @@ -183,6 +270,16 @@ impl SourceConfig for OpentelemetryConfig {

let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;

// Log info message when using mixed OTLP decoding formats
if self.use_otlp_decoding.is_mixed() {
info!(
message = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format.",
logs_otlp = self.use_otlp_decoding.logs,
metrics_otlp = self.use_otlp_decoding.metrics,
traces_otlp = self.use_otlp_decoding.traces,
);
}

let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
Expand Down Expand Up @@ -352,13 +449,13 @@ impl SourceConfig for OpentelemetryConfig {
}
};

let logs_output = if self.use_otlp_decoding {
let logs_output = if self.use_otlp_decoding.logs {
SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS)
} else {
SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS)
};

let metrics_output = if self.use_otlp_decoding {
let metrics_output = if self.use_otlp_decoding.metrics {
SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
} else {
SourceOutput::new_metrics().with_port(METRICS)
Expand Down
6 changes: 3 additions & 3 deletions src/sources/opentelemetry/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn receive_logs_legacy_namespace() {
},
acknowledgements: Default::default(),
log_namespace: Default::default(),
use_otlp_decoding: false,
use_otlp_decoding: false.into(),
};

let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string());
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn receive_trace() {
},
acknowledgements: Default::default(),
log_namespace: Default::default(),
use_otlp_decoding: false,
use_otlp_decoding: false.into(),
};

let (sender, trace_output, _) = new_source(EventStatus::Delivered, TRACES.to_string());
Expand Down Expand Up @@ -266,7 +266,7 @@ async fn receive_metric() {
},
acknowledgements: Default::default(),
log_namespace: Default::default(),
use_otlp_decoding: false,
use_otlp_decoding: false.into(),
};

let (sender, metrics_output, _) = new_source(EventStatus::Delivered, METRICS.to_string());
Expand Down
Loading
Loading