diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c5e062c51..00abc5b7d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Apply rate limits to span metrics. ([#3255](https://github.com/getsentry/relay/pull/3255)) - Extract metrics from transaction spans. ([#3273](https://github.com/getsentry/relay/pull/3273)) - Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281)) +- Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282)) ## 24.3.0 diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 5feb3353d7..23bec97d57 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -88,38 +88,47 @@ impl KafkaTopic { #[serde(default)] pub struct TopicAssignments { /// Simple events topic name. + #[serde(alias = "ingest-events")] pub events: TopicAssignment, /// Events with attachments topic name. + #[serde(alias = "ingest-attachments")] pub attachments: TopicAssignment, /// Transaction events topic name. + #[serde(alias = "ingest-transactions")] pub transactions: TopicAssignment, /// Outcomes topic name. pub outcomes: TopicAssignment, /// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. + #[serde(alias = "outcomes-billing")] pub outcomes_billing: Option, /// Session health topic name. + #[serde(alias = "ingest-sessions")] pub sessions: TopicAssignment, - /// Default topic name for all aggregate metrics. Specialized topics for session-based and - /// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each. - pub metrics: TopicAssignment, - /// Topic name for metrics extracted from sessions. Defaults to the assignment of `metrics`. - pub metrics_sessions: Option, + /// Topic name for metrics extracted from sessions, aka release health. + #[serde(alias = "metrics", alias = "ingest-metrics")] + pub metrics_sessions: TopicAssignment, /// Topic name for all other kinds of metrics. Defaults to the assignment of `metrics`. - #[serde(alias = "metrics_transactions")] + #[serde(alias = "metrics_transactions", alias = "ingest-generic-metrics")] pub metrics_generic: TopicAssignment, /// Stacktrace topic name pub profiles: TopicAssignment, /// Replay Events topic name. + #[serde(alias = "ingest-replay-events")] pub replay_events: TopicAssignment, /// Recordings topic name. + #[serde(alias = "ingest-replay-recordings")] pub replay_recordings: TopicAssignment, /// Monitor check-ins. + #[serde(alias = "ingest-monitors")] pub monitors: TopicAssignment, /// Standalone spans without a transaction. + #[serde(alias = "snuba-spans")] pub spans: TopicAssignment, /// Summary for metrics collected during a span. + #[serde(alias = "snuba-metrics-summaries")] pub metrics_summaries: TopicAssignment, /// COGS measurements. + #[serde(alias = "shared-resources-usage")] pub cogs: TopicAssignment, } @@ -134,7 +143,7 @@ impl TopicAssignments { KafkaTopic::Outcomes => &self.outcomes, KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), KafkaTopic::Sessions => &self.sessions, - KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics), + KafkaTopic::MetricsSessions => &self.metrics_sessions, KafkaTopic::MetricsGeneric => &self.metrics_generic, KafkaTopic::Profiles => &self.profiles, KafkaTopic::ReplayEvents => &self.replay_events, @@ -156,8 +165,7 @@ impl Default for TopicAssignments { outcomes: "outcomes".to_owned().into(), outcomes_billing: None, sessions: "ingest-sessions".to_owned().into(), - metrics: "ingest-metrics".to_owned().into(), - metrics_sessions: None, + metrics_sessions: "ingest-metrics".to_owned().into(), metrics_generic: "ingest-performance-metrics".to_owned().into(), profiles: "profiles".to_owned().into(), replay_events: "ingest-replay-events".to_owned().into(), @@ -344,11 +352,11 @@ mod tests { #[test] fn test_kafka_config() { let yaml = r#" -events: "ingest-events-kafka-topic" +ingest-events: "ingest-events-kafka-topic" profiles: name: "ingest-profiles" config: "profiles" -metrics: +ingest-metrics: shards: 65000 mapping: 0: @@ -360,6 +368,7 @@ metrics: 45000: name: "ingest-metrics-3" config: "metrics_3" +transactions: "ingest-transactions-kafka-topic" "#; let def_config = vec![KafkaConfigParam { @@ -398,7 +407,8 @@ metrics: let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap(); let events = topics.events; let profiles = topics.profiles; - let metrics = topics.metrics; + let metrics = topics.metrics_sessions; + let transactions = topics.transactions; assert!(matches!(events, TopicAssignment::Primary(_))); assert!(matches!(profiles, TopicAssignment::Secondary { .. })); @@ -412,7 +422,29 @@ metrics: let events_config = events .kafka_config(&def_config, &second_config) .expect("Kafka config for events topic"); - assert!(matches!(events_config, KafkaConfig::Single { .. })); + assert!(matches!( + events_config, + KafkaConfig::Single { + params: KafkaParams { + topic_name: "ingest-events-kafka-topic", + .. + } + } + )); + + // Legacy keys are still supported + let transactions_config = transactions + .kafka_config(&def_config, &second_config) + .expect("Kafka config for transactions topic"); + assert!(matches!( + transactions_config, + KafkaConfig::Single { + params: KafkaParams { + topic_name: "ingest-transactions-kafka-topic", + .. + } + } + )); let (shards, mapping) = if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics {