diff --git a/changelog.d/datadog_metrics_series_v2_default.enhancement.md b/changelog.d/datadog_metrics_series_v2_default.enhancement.md new file mode 100644 index 0000000000000..cd6d1545fd008 --- /dev/null +++ b/changelog.d/datadog_metrics_series_v2_default.enhancement.md @@ -0,0 +1,4 @@ +The `datadog_metrics` sink now defaults to the Datadog series v2 endpoint (`/api/v2/series`). +Set `VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API` to force the legacy v1 endpoint (`/api/v1/series`). + +authors: vladimir-dd diff --git a/regression/cases/statsd_to_datadog_metrics/experiment.yaml b/regression/cases/statsd_to_datadog_metrics/experiment.yaml new file mode 100644 index 0000000000000..7c0e069a9eddc --- /dev/null +++ b/regression/cases/statsd_to_datadog_metrics/experiment.yaml @@ -0,0 +1,10 @@ +optimization_goal: egress_throughput + +target: + name: vector + command: /usr/bin/vector + cpu_allotment: 6 + memory_allotment: 8GiB + + environment: + VECTOR_THREADS: 4 diff --git a/regression/cases/statsd_to_datadog_metrics/lading/lading.yaml b/regression/cases/statsd_to_datadog_metrics/lading/lading.yaml new file mode 100644 index 0000000000000..3494ed826fd6f --- /dev/null +++ b/regression/cases/statsd_to_datadog_metrics/lading/lading.yaml @@ -0,0 +1,16 @@ +generator: + - tcp: + seed: [2, 3, 5, 7, 11, 13, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137] + addr: "0.0.0.0:8125" + variant: + dogstatsd: {} + bytes_per_second: "500 Mb" + maximum_prebuild_cache_size_bytes: "256 Mb" + +blackhole: + - http: + binding_addr: "0.0.0.0:8080" + +target_metrics: + - prometheus: + uri: "http://127.0.0.1:9090/metrics" diff --git a/regression/cases/statsd_to_datadog_metrics/vector/vector.yaml b/regression/cases/statsd_to_datadog_metrics/vector/vector.yaml new file mode 100644 index 0000000000000..990d4d00c349f --- /dev/null +++ b/regression/cases/statsd_to_datadog_metrics/vector/vector.yaml @@ -0,0 +1,25 @@ +data_dir: "/var/lib/vector" + +sources: + internal_metrics: + type: "internal_metrics" + + statsd: + type: "statsd" + address: "0.0.0.0:8125" + mode: "tcp" + +sinks: + prometheus: + type: "prometheus_exporter" + inputs: ["internal_metrics"] + address: "0.0.0.0:9090" + + datadog_metrics: + type: "datadog_metrics" + inputs: ["statsd"] + endpoint: "http://0.0.0.0:8080" + default_api_key: "DEADBEEF" + default_namespace: "vector" + healthcheck: + enabled: false diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index 6d004e694e0ea..fe511b07a5ba2 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -25,14 +25,10 @@ use crate::{ #[derive(Clone, Copy, Debug, Default)] pub struct DatadogMetricsDefaultBatchSettings; -// This default is centered around "series" data, which should be the lion's share of what we -// process. Given that a single series, when encoded, is in the 150-300 byte range, we can fit a -// lot of these into a single request, something like 150-200K series. Simply to be a little more -// conservative, though, we use 100K here. This will also get a little more tricky when it comes to -// distributions and sketches, but we're going to have to implement incremental encoding to handle -// "we've exceeded our maximum payload size, split this batch" scenarios anyways. impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings { const MAX_EVENTS: Option = Some(100_000); + // No default byte cap here; the appropriate limit (v1: 60 MiB, v2: 5 MiB) is applied at + // sink build time based on the active series API version. const MAX_BYTES: Option = None; const TIMEOUT_SECS: f64 = 2.0; } @@ -41,6 +37,9 @@ pub(super) const SERIES_V1_PATH: &str = "/api/v1/series"; pub(super) const SERIES_V2_PATH: &str = "/api/v2/series"; pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches"; +// TODO: the series V1 endpoint support is considered deprecated and should be removed in a future release. +// At that time when the V1 support is removed, the SeriesApiVersion stops being useful and can be removed. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum SeriesApiVersion { V1, @@ -54,12 +53,18 @@ impl SeriesApiVersion { Self::V2 => SERIES_V2_PATH, } } - fn get_api_version() -> Self { + fn get_api_version_backwards_compatible() -> Self { static API_VERSION: OnceLock = OnceLock::new(); *API_VERSION.get_or_init(|| { - match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") { - Ok(_) => Self::V2, - Err(_) => Self::V1, + if std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API").is_ok() { + warn!( + "VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API is deprecated and has no effect. \ + The v2 series endpoint is now the default — you can safely remove this variable." + ); + } + match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API") { + Ok(_) => Self::V1, + Err(_) => Self::V2, } }) } @@ -96,12 +101,11 @@ impl DatadogMetricsEndpoint { // Creates an instance of the `Series` variant with the default API version. pub fn series() -> Self { - Self::Series(SeriesApiVersion::get_api_version()) + Self::Series(SeriesApiVersion::get_api_version_backwards_compatible()) } pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits { // from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics - let (uncompressed, compressed) = match self { // Sketches use the same payload size limits as v1 series DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) @@ -222,7 +226,12 @@ impl DatadogMetricsConfig { ) -> crate::Result { let base_uri = self.get_base_agent_endpoint(dd_common); - let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?; + // TODO: the V1 endpoint support is considered deprecated and should be removed in a future release. + // At that time, the get_api_version_backwards_compatible() should be replaced with statically using the v2. + let series_endpoint = build_uri( + &base_uri, + SeriesApiVersion::get_api_version_backwards_compatible().get_path(), + )?; let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?; Ok(DatadogMetricsEndpointConfiguration::new( @@ -253,7 +262,18 @@ impl DatadogMetricsConfig { dd_common: &DatadogCommonConfig, client: HttpClient, ) -> crate::Result { - let batcher_settings = self.batch.into_batcher_settings()?; + let mut batcher_settings = self.batch.into_batcher_settings()?; + // Cap the batcher to the endpoint's uncompressed payload limit when the user has not set + // an explicit max_bytes. This ensures each batch fits in a single HTTP request without + // splitting, which avoids request amplification and the associated memory overhead. + // Different endpoints have very different limits (v2 series: 5 MiB, v1/sketches: 60 MiB), + // so the cap must be applied dynamically rather than hardcoded as a default. + if batcher_settings.size_limit == usize::MAX { + let series_version = SeriesApiVersion::get_api_version_backwards_compatible(); + batcher_settings.size_limit = DatadogMetricsEndpoint::Series(series_version) + .payload_limits() + .uncompressed; + } // TODO: revisit our concurrency and batching defaults let request_limits = self.request.into_settings(); @@ -302,4 +322,60 @@ mod tests { fn generate_config() { crate::test_util::test_generate_config::(); } + + // When the user leaves max_bytes unset, the default batch config produces size_limit == + // usize::MAX, which is the sentinel that build_sink replaces with the endpoint limit. + #[test] + fn batcher_default_produces_no_byte_limit() { + let config = BatchConfig::::default(); + let settings = config.into_batcher_settings().unwrap(); + assert_eq!(settings.size_limit, usize::MAX); + } + + // An explicit user-supplied max_bytes must not be clobbered by the endpoint-limit override. + #[test] + fn batcher_user_max_bytes_is_preserved() { + let mut config = BatchConfig::::default(); + config.max_bytes = Some(1_000_000); + let settings = config.into_batcher_settings().unwrap(); + assert_eq!(settings.size_limit, 1_000_000); + } + + // Simulate the override in build_sink: when size_limit is usize::MAX the endpoint's + // uncompressed payload limit should be applied. + #[test] + fn batcher_size_limit_override_uses_v2_endpoint_limit() { + let mut settings = BatchConfig::::default() + .into_batcher_settings() + .unwrap(); + if settings.size_limit == usize::MAX { + settings.size_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) + .payload_limits() + .uncompressed; + } + assert_eq!( + settings.size_limit, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) + .payload_limits() + .uncompressed, + ); + } + + #[test] + fn batcher_size_limit_override_uses_v1_endpoint_limit() { + let mut settings = BatchConfig::::default() + .into_batcher_settings() + .unwrap(); + if settings.size_limit == usize::MAX { + settings.size_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) + .payload_limits() + .uncompressed; + } + assert_eq!( + settings.size_limit, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) + .payload_limits() + .uncompressed, + ); + } } diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 617e69c1e5e6b..9f0b71c87ba98 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -1582,6 +1582,77 @@ mod tests { assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); } + #[test] + fn default_payload_limits_are_endpoint_aware() { + let v1 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1).payload_limits(); + assert_eq!(v1.uncompressed, 62_914_560); + assert_eq!(v1.compressed, 3_200_000); + + let v2 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2).payload_limits(); + assert_eq!(v2.uncompressed, 5_242_880); + assert_eq!(v2.compressed, 512_000); + + let sketches = DatadogMetricsEndpoint::Sketches.payload_limits(); + assert_eq!(sketches.uncompressed, 62_914_560); + assert_eq!(sketches.compressed, 3_200_000); + } + + #[test] + fn v2_series_default_limits_split_large_batches() { + // Simulate a large send and validate that default V2 limits split payloads into multiple + // requests, while still making forward progress each pass. + let mut pending = vec![get_simple_counter(); 120_000]; + let mut encoded_batches = 0; + let mut encoded_metrics = 0; + + while !pending.is_empty() { + let mut encoder = DatadogMetricsEncoder::new( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + None, + ) + .expect("default payload size limits should be valid"); + + let mut next_pending = Vec::new(); + let mut hit_limit = false; + for metric in pending.drain(..) { + match encoder.try_encode(metric.clone()) { + Ok(None) => {} + Ok(Some(returned_metric)) => { + hit_limit = true; + next_pending.push(returned_metric); + } + Err(error) => panic!("unexpected encoding error: {error}"), + } + } + + let finish_result = encoder.finish(); + assert!(finish_result.is_ok()); + let (_payload, processed) = finish_result.unwrap(); + assert!( + !processed.is_empty(), + "encoder should always make progress for a non-empty batch" + ); + + encoded_metrics += processed.len(); + encoded_batches += 1; + + if hit_limit { + assert!( + !next_pending.is_empty(), + "hitting limits should leave metrics to process in the next batch" + ); + } + + pending = next_pending; + } + + assert_eq!(encoded_metrics, 120_000); + assert!( + encoded_batches > 1, + "expected multiple batches for V2 default limits" + ); + } + #[test] fn encode_series_breaks_out_when_limit_reached_uncompressed() { // We manually create the encoder with an arbitrarily low "uncompressed" limit but high