Skip to content

Commit

Permalink
ref: Kafka topic configuration (#3282)
Browse files Browse the repository at this point in the history
As part of the kafka control plane project, Kafka config for all Sentry
services should be automatically generated from a single source of truth
in the ops/sentry-kafka-schemas repos.

The system knows about the default topic name + any override value per
environment if relevant, but is not aware of the Relay specific values
like "events" and "metrics".

This is aligned with the configuration format of Sentry, Snuba and
super-big-consumers.
  • Loading branch information
lynnagara authored Mar 20, 2024
1 parent a21a082 commit 4cb86c9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Apply rate limits to span metrics. ([#3255](https://github.com/getsentry/relay/pull/3255))
- Extract metrics from transaction spans. ([#3273](https://github.com/getsentry/relay/pull/3273))
- Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281))
- Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282))

## 24.3.0

Expand Down
58 changes: 45 additions & 13 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,38 +88,47 @@ impl KafkaTopic {
#[serde(default)]
pub struct TopicAssignments {
/// Simple events topic name.
#[serde(alias = "ingest-events")]
pub events: TopicAssignment,
/// Events with attachments topic name.
#[serde(alias = "ingest-attachments")]
pub attachments: TopicAssignment,
/// Transaction events topic name.
#[serde(alias = "ingest-transactions")]
pub transactions: TopicAssignment,
/// Outcomes topic name.
pub outcomes: TopicAssignment,
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`.
#[serde(alias = "outcomes-billing")]
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
#[serde(alias = "ingest-sessions")]
pub sessions: TopicAssignment,
/// Default topic name for all aggregate metrics. Specialized topics for session-based and
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each.
pub metrics: TopicAssignment,
/// Topic name for metrics extracted from sessions. Defaults to the assignment of `metrics`.
pub metrics_sessions: Option<TopicAssignment>,
/// Topic name for metrics extracted from sessions, aka release health.
#[serde(alias = "metrics", alias = "ingest-metrics")]
pub metrics_sessions: TopicAssignment,
/// Topic name for all other kinds of metrics. Defaults to the assignment of `metrics`.
#[serde(alias = "metrics_transactions")]
#[serde(alias = "metrics_transactions", alias = "ingest-generic-metrics")]
pub metrics_generic: TopicAssignment,
/// Stacktrace topic name
pub profiles: TopicAssignment,
/// Replay Events topic name.
#[serde(alias = "ingest-replay-events")]
pub replay_events: TopicAssignment,
/// Recordings topic name.
#[serde(alias = "ingest-replay-recordings")]
pub replay_recordings: TopicAssignment,
/// Monitor check-ins.
#[serde(alias = "ingest-monitors")]
pub monitors: TopicAssignment,
/// Standalone spans without a transaction.
#[serde(alias = "snuba-spans")]
pub spans: TopicAssignment,
/// Summary for metrics collected during a span.
#[serde(alias = "snuba-metrics-summaries")]
pub metrics_summaries: TopicAssignment,
/// COGS measurements.
#[serde(alias = "shared-resources-usage")]
pub cogs: TopicAssignment,
}

Expand All @@ -134,7 +143,7 @@ impl TopicAssignments {
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics),
KafkaTopic::MetricsSessions => &self.metrics_sessions,
KafkaTopic::MetricsGeneric => &self.metrics_generic,
KafkaTopic::Profiles => &self.profiles,
KafkaTopic::ReplayEvents => &self.replay_events,
Expand All @@ -156,8 +165,7 @@ impl Default for TopicAssignments {
outcomes: "outcomes".to_owned().into(),
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
metrics_sessions: None,
metrics_sessions: "ingest-metrics".to_owned().into(),
metrics_generic: "ingest-performance-metrics".to_owned().into(),
profiles: "profiles".to_owned().into(),
replay_events: "ingest-replay-events".to_owned().into(),
Expand Down Expand Up @@ -344,11 +352,11 @@ mod tests {
#[test]
fn test_kafka_config() {
let yaml = r#"
events: "ingest-events-kafka-topic"
ingest-events: "ingest-events-kafka-topic"
profiles:
name: "ingest-profiles"
config: "profiles"
metrics:
ingest-metrics:
shards: 65000
mapping:
0:
Expand All @@ -360,6 +368,7 @@ metrics:
45000:
name: "ingest-metrics-3"
config: "metrics_3"
transactions: "ingest-transactions-kafka-topic"
"#;

let def_config = vec![KafkaConfigParam {
Expand Down Expand Up @@ -398,7 +407,8 @@ metrics:
let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
let events = topics.events;
let profiles = topics.profiles;
let metrics = topics.metrics;
let metrics = topics.metrics_sessions;
let transactions = topics.transactions;

assert!(matches!(events, TopicAssignment::Primary(_)));
assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
Expand All @@ -412,7 +422,29 @@ metrics:
let events_config = events
.kafka_config(&def_config, &second_config)
.expect("Kafka config for events topic");
assert!(matches!(events_config, KafkaConfig::Single { .. }));
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-events-kafka-topic",
..
}
}
));

// Legacy keys are still supported
let transactions_config = transactions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for transactions topic");
assert!(matches!(
transactions_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-transactions-kafka-topic",
..
}
}
));

let (shards, mapping) =
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics {
Expand Down

0 comments on commit 4cb86c9

Please sign in to comment.