Skip to content

Commit

Permalink
perf(spans): Prevent span extraction when quota is active (#4097)
Browse files Browse the repository at this point in the history
Track spans inside a transaction item in
`EnvelopeSummary::span_quantity`, and skip their extraction if the
`span_indexed` category is rate limited.

This should reduce load on redis.

NOTE: With this PR, Relay will produce negative outcomes for spans in
_any_ transaction, even for projects that do not have span extraction
enabled (AM1). This should be fine, the product will simply ignore these
outcomes.
  • Loading branch information
jjbayer authored Oct 4, 2024
1 parent 4b15aa6 commit 771dd0f
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073))
- Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068))
- Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080))
- Prevent span extraction when quota is active to reduce load on redis. ([#4097](https://github.com/getsentry/relay/pull/4097))

## 24.9.0

Expand Down
1 change: 1 addition & 0 deletions relay-base-schema/src/data_category.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl DataCategory {
"span" => Self::Span,
"monitor_seat" => Self::MonitorSeat,
"feedback" => Self::UserReportV2,
"user_report_v2" => Self::UserReportV2,
"metric_bucket" => Self::MetricBucket,
"span_indexed" => Self::SpanIndexed,
"profile_duration" => Self::ProfileDuration,
Expand Down
22 changes: 22 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use smallvec::SmallVec;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
use crate::utils::SeqCount;

pub const CONTENT_TYPE: &str = "application/x-sentry-envelope";

Expand Down Expand Up @@ -871,6 +872,27 @@ impl Item {
self.headers.other.insert(name.into(), value.into())
}

/// Counts how many spans are contained in a transaction payload.
///
/// The transaction itself represents a span as well, so this function returns
/// `len(event.spans) + 1`.
///
/// Returns zero if
/// - the item is not a transaction,
/// - the spans have already been extracted (in which case they are represented elsewhere).
pub fn count_nested_spans(&self) -> usize {
#[derive(Debug, Deserialize)]
struct PartialEvent {
spans: SeqCount,
}

if self.ty() != &ItemType::Transaction || self.spans_extracted() {
return 0;
}

serde_json::from_slice::<PartialEvent>(&self.payload()).map_or(0, |event| event.spans.0 + 1)
}

/// Determines whether the given item creates an event.
///
/// This is only true for literal events and crash report attachments.
Expand Down
17 changes: 2 additions & 15 deletions relay-server/src/metrics_extraction/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ impl Extractable for Span {
/// If this is a transaction event with spans, metrics will also be extracted from the spans.
pub fn extract_metrics(
event: &mut Event,
spans_extracted: bool,
config: CombinedMetricExtractionConfig<'_>,
max_tag_value_size: usize,
span_extraction_sample_rate: Option<f32>,
) -> Vec<Bucket> {
let mut metrics = generic::extract_metrics(event, config);
// If spans were already extracted for an event, we rely on span processing to extract metrics.
if !spans_extracted && sample(span_extraction_sample_rate.unwrap_or(1.0)) {
if sample(span_extraction_sample_rate.unwrap_or(1.0)) {
extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics);
}

Expand Down Expand Up @@ -1202,7 +1200,6 @@ mod tests {

extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
combined_config(features, None).combined(),
200,
None,
Expand Down Expand Up @@ -1413,7 +1410,6 @@ mod tests {

let metrics = extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(),
200,
None,
Expand Down Expand Up @@ -1470,7 +1466,6 @@ mod tests {

let metrics = extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(),
200,
None,
Expand Down Expand Up @@ -1502,7 +1497,6 @@ mod tests {

let metrics = extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(),
200,
None,
Expand Down Expand Up @@ -1765,7 +1759,6 @@ mod tests {

let metrics = extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(),
200,
None,
Expand Down Expand Up @@ -1906,13 +1899,7 @@ mod tests {
);
let config = binding.combined();

let _ = extract_metrics(
event.value_mut().as_mut().unwrap(),
false,
config,
200,
None,
);
let _ = extract_metrics(event.value_mut().as_mut().unwrap(), config, 200, None);

insta::assert_debug_snapshot!(&event.value().unwrap()._metrics_summary);
insta::assert_debug_snapshot!(
Expand Down
8 changes: 0 additions & 8 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,8 @@ impl EnvelopeBufferService {
services: &Services,
envelopes_tx_permit: Permit<'a, DequeuedEnvelope>,
) -> Result<Duration, EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService: peeking the buffer");

let sleep = match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService: peek returned empty");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty"
Expand Down Expand Up @@ -403,11 +400,7 @@ impl Service for EnvelopeBufferService {
let mut shutdown = Controller::shutdown_handle();

relay_log::info!("EnvelopeBufferService: starting");
let mut iteration = 0;
loop {
iteration += 1;
relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}");

let used_capacity = self.services.envelopes_tx.max_capacity()
- self.services.envelopes_tx.capacity();
relay_statsd::metric!(
Expand Down Expand Up @@ -446,7 +439,6 @@ impl Service for EnvelopeBufferService {
}
}
Ok(()) = global_config_rx.changed() => {
relay_log::trace!("EnvelopeBufferService: received global config");
sleep = Duration::ZERO;
}
else => break,
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,6 @@ impl EnvelopeProcessorService {

let metrics = crate::metrics_extraction::event::extract_metrics(
event,
state.spans_extracted,
combined_config,
self.inner
.config
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use relay_event_normalization::{
GeoIpLookup, ModelCosts, SchemaProcessor, TimestampProcessor, TransactionNameRule,
TrimmingProcessor,
};
use relay_event_schema::processor::{process_value, ProcessingState};
use relay_event_schema::processor::{process_value, ProcessingAction, ProcessingState};
use relay_event_schema::protocol::{
BrowserContext, IpAddr, Measurement, Measurements, Span, SpanData,
};
Expand Down Expand Up @@ -96,9 +96,9 @@ pub fn process(
if let Err(e) = normalize(&mut annotated_span, normalize_span_config.clone()) {
relay_log::debug!("failed to normalize span: {}", e);
return ItemAction::Drop(Outcome::Invalid(match e {
ProcessingError::InvalidTransaction | ProcessingError::InvalidTimestamp => {
DiscardReason::InvalidSpan
}
ProcessingError::ProcessingFailed(ProcessingAction::InvalidTransaction(_))
| ProcessingError::InvalidTransaction
| ProcessingError::InvalidTimestamp => DiscardReason::InvalidSpan,
_ => DiscardReason::Internal,
}));
};
Expand Down
159 changes: 79 additions & 80 deletions relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ use relay_system::{Addr, BroadcastChannel};
use serde::{Deserialize, Serialize};
use tokio::time::Instant;

use crate::envelope::ItemType;
use crate::services::metrics::{Aggregator, MergeBuckets};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics};
use crate::services::project::state::ExpiryState;
use crate::services::project_cache::{
CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate,
};
use crate::utils::{Enforcement, SeqCount};

use crate::statsd::RelayCounters;
use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff};
Expand Down Expand Up @@ -557,19 +555,9 @@ impl Project {
Ok(current_limits.check_with_quotas(quotas, item_scoping))
});

let (mut enforcement, mut rate_limits) =
let (enforcement, mut rate_limits) =
envelope_limiter.compute(envelope.envelope_mut(), &scoping)?;

let check_nested_spans = state
.as_ref()
.is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent));

// If we can extract spans from the event, we want to try and count the number of nested
// spans to correctly emit negative outcomes in case the transaction itself is dropped.
if check_nested_spans {
sync_spans_to_enforcement(&envelope, &mut enforcement);
}

enforcement.apply_with_outcomes(&mut envelope);

envelope.update();
Expand Down Expand Up @@ -598,52 +586,9 @@ impl Project {
}
}

/// Adds category limits for the nested spans inside a transaction.
///
/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted
/// as top-level spans, thus if we limited a transaction, we want to count and emit negative
/// outcomes for each of the spans nested inside that transaction.
fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) {
if !enforcement.is_event_active() {
return;
}

let spans_count = count_nested_spans(envelope);
if spans_count == 0 {
return;
}

if enforcement.event.is_active() {
enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count);
}

if enforcement.event_indexed.is_active() {
enforcement.spans_indexed = enforcement
.event_indexed
.clone_for(DataCategory::SpanIndexed, spans_count);
}
}

/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope).
fn count_nested_spans(envelope: &ManagedEnvelope) -> usize {
#[derive(Debug, Deserialize)]
struct PartialEvent {
spans: SeqCount,
}

envelope
.envelope()
.items()
.find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted())
.and_then(|item| serde_json::from_slice::<PartialEvent>(&item.payload()).ok())
// We do + 1, since we count the transaction itself because it will be extracted
// as a span and counted during the slow path of rate limiting.
.map_or(0, |event| event.spans.0 + 1)
}

#[cfg(test)]
mod tests {
use crate::envelope::{ContentType, Envelope, Item};
use crate::envelope::{ContentType, Envelope, Item, ItemType};
use crate::extractors::RequestMeta;
use crate::services::processor::ProcessingGroup;
use relay_base_schema::project::ProjectId;
Expand Down Expand Up @@ -775,27 +720,7 @@ mod tests {
RequestMeta::new(dsn)
}

#[test]
fn test_track_nested_spans_outcomes() {
let mut project = create_project(Some(json!({
"features": [
"organizations:indexed-spans-extraction"
],
"quotas": [{
"id": "foo",
"categories": ["transaction"],
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());

let mut transaction = Item::new(ItemType::Transaction);
transaction.set_payload(
ContentType::Json,
r#"{
const EVENT_WITH_SPANS: &str = r#"{
"event_id": "52df9022835246eeb317dbd739ccd059",
"type": "transaction",
"transaction": "I have a stale timestamp, but I'm recent!",
Expand All @@ -821,8 +746,27 @@ mod tests {
"trace_id": "ff62a8b040f340bda5d830223def1d81"
}
]
}"#,
);
}"#;

#[test]
fn test_track_nested_spans_outcomes() {
let mut project = create_project(Some(json!({
"features": [
"organizations:indexed-spans-extraction"
],
"quotas": [{
"id": "foo",
"categories": ["transaction"],
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());

let mut transaction = Item::new(ItemType::Transaction);
transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS);

envelope.add_item(transaction);

Expand Down Expand Up @@ -852,4 +796,59 @@ mod tests {
assert_eq!(outcome.quantity, expected_quantity);
}
}

#[test]
fn test_track_nested_spans_outcomes_span_quota() {
let mut project = create_project(Some(json!({
"features": [
"organizations:indexed-spans-extraction"
],
"quotas": [{
"id": "foo",
"categories": ["span_indexed"],
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta());

let mut transaction = Item::new(ItemType::Transaction);
transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS);

envelope.add_item(transaction);

let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
let (test_store, _) = Addr::custom();

let managed_envelope = ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
test_store,
ProcessingGroup::Transaction,
);

let CheckedEnvelope {
envelope,
rate_limits: _,
} = project.check_envelope(managed_envelope).unwrap();
let envelope = envelope.unwrap();
let transaction_item = envelope
.envelope()
.items()
.find(|i| *i.ty() == ItemType::Transaction)
.unwrap();
assert!(transaction_item.spans_extracted());

drop(outcome_aggregator);

let expected = [(DataCategory::SpanIndexed, 3)];

for (expected_category, expected_quantity) in expected {
let outcome = outcome_aggregator_rx.blocking_recv().unwrap();
assert_eq!(outcome.category, expected_category);
assert_eq!(outcome.quantity, expected_quantity);
}
}
}
Loading

0 comments on commit 771dd0f

Please sign in to comment.