Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The `datadog_metrics` sink now defaults to the Datadog series v2 endpoint (`/api/v2/series`).
Set `VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API` to force the legacy v1 endpoint (`/api/v1/series`).

authors: vladimir-dd
10 changes: 10 additions & 0 deletions regression/cases/statsd_to_datadog_metrics/experiment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
optimization_goal: egress_throughput

target:
name: vector
command: /usr/bin/vector
cpu_allotment: 6
memory_allotment: 8GiB

environment:
VECTOR_THREADS: 4
16 changes: 16 additions & 0 deletions regression/cases/statsd_to_datadog_metrics/lading/lading.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
generator:
- tcp:
seed: [2, 3, 5, 7, 11, 13, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137]
addr: "0.0.0.0:8125"
variant:
dogstatsd: {}
bytes_per_second: "500 Mb"
maximum_prebuild_cache_size_bytes: "256 Mb"

blackhole:
- http:
binding_addr: "0.0.0.0:8080"

target_metrics:
- prometheus:
uri: "http://127.0.0.1:9090/metrics"
25 changes: 25 additions & 0 deletions regression/cases/statsd_to_datadog_metrics/vector/vector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
data_dir: "/var/lib/vector"

sources:
internal_metrics:
type: "internal_metrics"

statsd:
type: "statsd"
address: "0.0.0.0:8125"
mode: "tcp"

sinks:
prometheus:
type: "prometheus_exporter"
inputs: ["internal_metrics"]
address: "0.0.0.0:9090"

datadog_metrics:
type: "datadog_metrics"
inputs: ["statsd"]
endpoint: "http://0.0.0.0:8080"
default_api_key: "DEADBEEF"
default_namespace: "vector"
healthcheck:
enabled: false
104 changes: 90 additions & 14 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@ use crate::{
#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogMetricsDefaultBatchSettings;

// This default is centered around "series" data, which should be the lion's share of what we
// process. Given that a single series, when encoded, is in the 150-300 byte range, we can fit a
// lot of these into a single request, something like 150-200K series. Simply to be a little more
// conservative, though, we use 100K here. This will also get a little more tricky when it comes to
// distributions and sketches, but we're going to have to implement incremental encoding to handle
// "we've exceeded our maximum payload size, split this batch" scenarios anyways.
impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(100_000);
// No default byte cap here; the appropriate limit (v1: 60 MiB, v2: 5 MiB) is applied at
// sink build time based on the active series API version.
const MAX_BYTES: Option<usize> = None;
const TIMEOUT_SECS: f64 = 2.0;
}
Expand All @@ -41,6 +37,9 @@ pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";

// TODO: the series V1 endpoint support is considered deprecated and should be removed in a future release.
// At that time when the V1 support is removed, the SeriesApiVersion stops being useful and can be removed.

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SeriesApiVersion {
V1,
Expand All @@ -54,12 +53,18 @@ impl SeriesApiVersion {
Self::V2 => SERIES_V2_PATH,
}
}
fn get_api_version() -> Self {
fn get_api_version_backwards_compatible() -> Self {
static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
*API_VERSION.get_or_init(|| {
match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
Ok(_) => Self::V2,
Err(_) => Self::V1,
if std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API").is_ok() {
warn!(
"VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API is deprecated and has no effect. \
The v2 series endpoint is now the default — you can safely remove this variable."
);
}
match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API") {
Ok(_) => Self::V1,
Err(_) => Self::V2,
}
})
}
Expand Down Expand Up @@ -96,12 +101,11 @@ impl DatadogMetricsEndpoint {

// Creates an instance of the `Series` variant with the default API version.
pub fn series() -> Self {
Self::Series(SeriesApiVersion::get_api_version())
Self::Series(SeriesApiVersion::get_api_version_backwards_compatible())
}

pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
// from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics

let (uncompressed, compressed) = match self {
// Sketches use the same payload size limits as v1 series
DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
Expand Down Expand Up @@ -222,7 +226,12 @@ impl DatadogMetricsConfig {
) -> crate::Result<DatadogMetricsEndpointConfiguration> {
let base_uri = self.get_base_agent_endpoint(dd_common);

let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
// TODO: the V1 endpoint support is considered deprecated and should be removed in a future release.
// At that time, the get_api_version_backwards_compatible() should be replaced with statically using the v2.
let series_endpoint = build_uri(
&base_uri,
SeriesApiVersion::get_api_version_backwards_compatible().get_path(),
)?;
let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;

Ok(DatadogMetricsEndpointConfiguration::new(
Expand Down Expand Up @@ -253,7 +262,18 @@ impl DatadogMetricsConfig {
dd_common: &DatadogCommonConfig,
client: HttpClient,
) -> crate::Result<VectorSink> {
let batcher_settings = self.batch.into_batcher_settings()?;
let mut batcher_settings = self.batch.into_batcher_settings()?;
// Cap the batcher to the endpoint's uncompressed payload limit when the user has not set
// an explicit max_bytes. This ensures each batch fits in a single HTTP request without
// splitting, which avoids request amplification and the associated memory overhead.
// Different endpoints have very different limits (v2 series: 5 MiB, v1/sketches: 60 MiB),
// so the cap must be applied dynamically rather than hardcoded as a default.
if batcher_settings.size_limit == usize::MAX {
let series_version = SeriesApiVersion::get_api_version_backwards_compatible();
batcher_settings.size_limit = DatadogMetricsEndpoint::Series(series_version)
.payload_limits()
Comment on lines +272 to +274

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Derive batch cap from each metric endpoint

When batch.max_bytes is unset, this assigns a single global cap from the selected series API version, which is 5 MiB by default on v2. The sink then reuses that same batch_settings for both Series and Sketches partitions (batched_partitioned(... batch_settings.as_byte_size_config()) in src/sinks/datadog/metrics/sink.rs), so sketch/distribution traffic is also constrained to 5 MiB even though Sketches allows 60 MiB. In sketch-heavy workloads this can cause many more requests and lower throughput/efficiency than before.

Useful? React with 👍 / 👎.

.uncompressed;
}

// TODO: revisit our concurrency and batching defaults
let request_limits = self.request.into_settings();
Expand Down Expand Up @@ -302,4 +322,60 @@ mod tests {
fn generate_config() {
crate::test_util::test_generate_config::<DatadogMetricsConfig>();
}

// When the user leaves max_bytes unset, the default batch config produces size_limit ==
// usize::MAX, which is the sentinel that build_sink replaces with the endpoint limit.
#[test]
fn batcher_default_produces_no_byte_limit() {
let config = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default();
let settings = config.into_batcher_settings().unwrap();
assert_eq!(settings.size_limit, usize::MAX);
}

// An explicit user-supplied max_bytes must not be clobbered by the endpoint-limit override.
#[test]
fn batcher_user_max_bytes_is_preserved() {
let mut config = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default();
config.max_bytes = Some(1_000_000);
let settings = config.into_batcher_settings().unwrap();
assert_eq!(settings.size_limit, 1_000_000);
}

// Simulate the override in build_sink: when size_limit is usize::MAX the endpoint's
// uncompressed payload limit should be applied.
#[test]
fn batcher_size_limit_override_uses_v2_endpoint_limit() {
let mut settings = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default()
.into_batcher_settings()
.unwrap();
if settings.size_limit == usize::MAX {
settings.size_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2)
.payload_limits()
.uncompressed;
}
assert_eq!(
settings.size_limit,
DatadogMetricsEndpoint::Series(SeriesApiVersion::V2)
.payload_limits()
.uncompressed,
);
}

#[test]
fn batcher_size_limit_override_uses_v1_endpoint_limit() {
let mut settings = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default()
.into_batcher_settings()
.unwrap();
if settings.size_limit == usize::MAX {
settings.size_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
.payload_limits()
.uncompressed;
}
assert_eq!(
settings.size_limit,
DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
.payload_limits()
.uncompressed,
);
}
}
71 changes: 71 additions & 0 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,77 @@ mod tests {
assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
}

#[test]
fn default_payload_limits_are_endpoint_aware() {
let v1 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1).payload_limits();
assert_eq!(v1.uncompressed, 62_914_560);
assert_eq!(v1.compressed, 3_200_000);

let v2 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2).payload_limits();
assert_eq!(v2.uncompressed, 5_242_880);
assert_eq!(v2.compressed, 512_000);

let sketches = DatadogMetricsEndpoint::Sketches.payload_limits();
assert_eq!(sketches.uncompressed, 62_914_560);
assert_eq!(sketches.compressed, 3_200_000);
}

#[test]
fn v2_series_default_limits_split_large_batches() {
// Simulate a large send and validate that default V2 limits split payloads into multiple
// requests, while still making forward progress each pass.
let mut pending = vec![get_simple_counter(); 120_000];
let mut encoded_batches = 0;
let mut encoded_metrics = 0;

while !pending.is_empty() {
let mut encoder = DatadogMetricsEncoder::new(
DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
None,
)
.expect("default payload size limits should be valid");

let mut next_pending = Vec::new();
let mut hit_limit = false;
for metric in pending.drain(..) {
match encoder.try_encode(metric.clone()) {
Ok(None) => {}
Ok(Some(returned_metric)) => {
hit_limit = true;
next_pending.push(returned_metric);
}
Err(error) => panic!("unexpected encoding error: {error}"),
}
}

let finish_result = encoder.finish();
assert!(finish_result.is_ok());
let (_payload, processed) = finish_result.unwrap();
assert!(
!processed.is_empty(),
"encoder should always make progress for a non-empty batch"
);

encoded_metrics += processed.len();
encoded_batches += 1;

if hit_limit {
assert!(
!next_pending.is_empty(),
"hitting limits should leave metrics to process in the next batch"
);
}

pending = next_pending;
}

assert_eq!(encoded_metrics, 120_000);
assert!(
encoded_batches > 1,
"expected multiple batches for V2 default limits"
);
}

#[test]
fn encode_series_breaks_out_when_limit_reached_uncompressed() {
// We manually create the encoder with an arbitrarily low "uncompressed" limit but high
Expand Down
Loading