From 90a0df1e974fc0d9db4b6717bdc21943d0e0cd74 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 4 Oct 2024 11:47:45 +0200 Subject: [PATCH] ref(quotas): Cache and enforce indexed quotas in processor. --- relay-quotas/src/rate_limit.rs | 65 +------ relay-server/src/services/processor.rs | 158 +++++++++++++----- .../services/processor/dynamic_sampling.rs | 3 + .../src/services/processor/profile.rs | 4 + relay-server/src/services/processor/report.rs | 5 + .../src/services/processor/span/processing.rs | 2 + relay-server/src/services/project.rs | 59 +------ relay-server/src/services/project_cache.rs | 4 + relay-server/src/utils/rate_limits.rs | 90 ++++++++-- 9 files changed, 218 insertions(+), 172 deletions(-) diff --git a/relay-quotas/src/rate_limit.rs b/relay-quotas/src/rate_limit.rs index b77c612952..c1162d340e 100644 --- a/relay-quotas/src/rate_limit.rs +++ b/relay-quotas/src/rate_limit.rs @@ -300,7 +300,11 @@ impl RateLimits { /// /// If no limits or quotas match, then the returned `RateLimits` instance evalutes `is_ok`. /// Otherwise, it contains rate limits that match the given scoping. - pub fn check_with_quotas(&self, quotas: &[Quota], scoping: ItemScoping<'_>) -> Self { + pub fn check_with_quotas<'a>( + &self, + quotas: impl IntoIterator, + scoping: ItemScoping<'_>, + ) -> Self { let mut applied_limits = Self::new(); for quota in quotas { @@ -396,13 +400,7 @@ impl<'a> IntoIterator for &'a RateLimits { /// Like [`RateLimits`], a collection of scoped rate limits but with all the checks /// necessary to cache the limits. /// -/// The data structure makes sure no expired rate limits are enforced as well -/// as removing any indexed rate limit. -/// -/// Cached rate limits don't enforce indexed rate limits because at the time of the check -/// the decision whether an envelope is sampled or not is not yet known. Additionally -/// even if the item is later dropped by dynamic sampling, it must still be around to extract metrics -/// and cannot be dropped too early. +/// The data structure makes sure no expired rate limits are enforced. #[derive(Debug, Default)] pub struct CachedRateLimits(RateLimits); @@ -415,13 +413,7 @@ impl CachedRateLimits { /// Adds a limit to this collection. /// /// See also: [`RateLimits::add`]. - pub fn add(&mut self, mut limit: RateLimit) { - if !limit.categories.is_empty() { - limit.categories.retain(|category| !category.is_indexed()); - if limit.categories.is_empty() { - return; - } - } + pub fn add(&mut self, limit: RateLimit) { self.0.add(limit); } @@ -1083,7 +1075,7 @@ mod tests { }); rate_limits1.add(RateLimit { - categories: smallvec![DataCategory::Transaction], + categories: smallvec![DataCategory::TransactionIndexed], scope: RateLimitScope::Organization(42), reason_code: None, retry_after: RetryAfter::from_secs(1), @@ -1114,7 +1106,7 @@ mod tests { ), RateLimit( categories: [ - transaction, + transaction_indexed, ], scope: Organization(42), reason_code: None, @@ -1166,43 +1158,4 @@ mod tests { ) "###); } - - #[test] - fn test_cached_rate_limits_indexed() { - let mut cached = CachedRateLimits::new(); - - cached.add(RateLimit { - categories: smallvec![DataCategory::Transaction, DataCategory::TransactionIndexed], - scope: RateLimitScope::Organization(42), - reason_code: None, - retry_after: RetryAfter::from_secs(5), - namespaces: smallvec![], - }); - - cached.add(RateLimit { - categories: smallvec![DataCategory::TransactionIndexed], - scope: RateLimitScope::Project(ProjectId::new(21)), - reason_code: None, - retry_after: RetryAfter::from_secs(5), - namespaces: smallvec![], - }); - - let rate_limits = cached.current_limits(); - - insta::assert_ron_snapshot!(rate_limits, @r###" - RateLimits( - limits: [ - RateLimit( - categories: [ - transaction, - ], - scope: Organization(42), - reason_code: None, - retry_after: RetryAfter(5), - namespaces: [], - ), - ], - ) - "###); - } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index d04a8e3310..35f16a1b5b 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -45,7 +45,7 @@ use smallvec::{smallvec, SmallVec}; #[cfg(feature = "processing")] use { crate::services::store::{Store, StoreEnvelope}, - crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction}, + crate::utils::{sample, ApplyLimits, Enforcement, EnvelopeLimiter, ItemAction}, itertools::Itertools, relay_cardinality::{ CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter, @@ -722,6 +722,10 @@ struct ProcessEnvelopeState<'a, Group> { /// The state of the project that this envelope belongs to. project_info: Arc, + /// Currently active cached rate limits of the project this envelope belongs to. + #[cfg_attr(not(feature = "processing"), expect(dead_code))] + rate_limits: RateLimits, + /// The config of this Relay instance. config: Arc, @@ -832,9 +836,15 @@ pub struct ProcessEnvelopeResponse { /// - Rate limiters and inbound filters on events in processing mode. #[derive(Debug)] pub struct ProcessEnvelope { + /// Envelope to process. pub envelope: ManagedEnvelope, + /// The project info. pub project_info: Arc, + /// Currently active cached rate limits for this project. + pub rate_limits: RateLimits, + /// Root sampling project info. pub sampling_project_info: Option>, + /// Sampling reservoir counters. pub reservoir_counters: ReservoirCounters, } @@ -1251,6 +1261,7 @@ impl EnvelopeProcessorService { mut managed_envelope: TypedEnvelope, project_id: ProjectId, project_info: Arc, + rate_limits: RateLimits, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> ProcessEnvelopeState { @@ -1291,6 +1302,7 @@ impl EnvelopeProcessorService { metrics: Metrics::default(), extracted_metrics, project_info, + rate_limits, config, sampling_project_info, project_id, @@ -1305,55 +1317,41 @@ impl EnvelopeProcessorService { &self, state: &mut ProcessEnvelopeState, ) -> Result<(), ProcessingError> { - let rate_limiter = match self.inner.rate_limiter.as_ref() { - Some(rate_limiter) => rate_limiter, - None => return Ok(()), - }; + self.enforce_cached_quotas(state)?; + self.enforce_consistend_quotas(state)?; + Ok(()) + } - let project_info = &state.project_info; + #[cfg(feature = "processing")] + fn enforce_cached_quotas( + &self, + state: &mut ProcessEnvelopeState, + ) -> Result<(), ProcessingError> { let global_config = self.inner.global_config.current(); - let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas()); - - if quotas.is_empty() { - return Ok(()); - } - - let event_category = state.event_category(); + let _ = RateLimiter::Cached.enforce(&global_config, state)?; - // When invoking the rate limiter, capture if the event item has been rate limited to also - // remove it from the processing state eventually. - let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, quantity| { - rate_limiter.is_rate_limited(quotas, item_scope, quantity, false) - }); - - // Tell the envelope limiter about the event, since it has been removed from the Envelope at - // this stage in processing. - if let Some(category) = event_category { - envelope_limiter.assume_event(category); - } + Ok(()) + } - let scoping = state.managed_envelope.scoping(); - let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { - envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)? - }); - let event_active = enforcement.is_event_active(); + #[cfg(feature = "processing")] + fn enforce_consistend_quotas( + &self, + state: &mut ProcessEnvelopeState, + ) -> Result<(), ProcessingError> { + let global_config = self.inner.global_config.current(); + let rate_limiter = match self.inner.rate_limiter.as_ref() { + Some(rate_limiter) => rate_limiter, + None => return Ok(()), + }; - // Use the same rate limits as used for the envelope on the metrics. - // Those rate limits should not be checked for expiry or similar to ensure a consistent - // limiting of envelope items and metrics. - state.extracted_metrics.apply_enforcement(&enforcement); - enforcement.apply_with_outcomes(&mut state.managed_envelope); - - if event_active { - state.remove_event(); - debug_assert!(state.envelope().is_empty()); - } + let limits = RateLimiter::Consistent(rate_limiter).enforce(&global_config, state)?; + // Update cached rate limits with the freshly computed ones. if !limits.is_empty() { - self.inner - .addrs - .project_cache - .send(UpdateRateLimits::new(scoping.project_key, limits)); + self.inner.addrs.project_cache.send(UpdateRateLimits::new( + state.managed_envelope.scoping().project_key, + limits, + )); } Ok(()) @@ -1748,6 +1746,13 @@ impl EnvelopeProcessorService { // Always extract metrics in processing Relays for sampled items. self.extract_transaction_metrics(state, SamplingDecision::Keep, profile_id)?; + // Enforce cached quotas here, because it is relatively cheap and can prevent + // us from extracting spans when it's not necessary. + // + // But do it *after* metric extraction to make sure we do not drop due to an indexed + // quota and never extract any metrics. + self.enforce_cached_quotas(state)?; + if state .project_info .has_feature(Feature::ExtractSpansFromEvent) @@ -1881,6 +1886,7 @@ impl EnvelopeProcessorService { mut managed_envelope: ManagedEnvelope, project_id: ProjectId, project_info: Arc, + rate_limits: RateLimits, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> Result { @@ -1905,6 +1911,7 @@ impl EnvelopeProcessorService { managed_envelope, project_id, project_info, + rate_limits, sampling_project_info, reservoir_counters, ); @@ -1981,6 +1988,7 @@ impl EnvelopeProcessorService { let ProcessEnvelope { envelope: mut managed_envelope, project_info, + rate_limits, sampling_project_info, reservoir_counters, } = message; @@ -2029,6 +2037,7 @@ impl EnvelopeProcessorService { managed_envelope, project_id, project_info, + rate_limits, sampling_project_info, reservoir_counters, ) { @@ -2906,6 +2915,67 @@ impl Service for EnvelopeProcessorService { } } +#[cfg(feature = "processing")] +enum RateLimiter<'a> { + Cached, + Consistent(&'a RedisRateLimiter), +} + +#[cfg(feature = "processing")] +impl<'a> RateLimiter<'a> { + fn enforce( + &self, + global_config: &GlobalConfig, + state: &mut ProcessEnvelopeState, + ) -> Result { + if state.envelope().is_empty() { + return Ok(RateLimits::default()); + } + + let quotas = CombinedQuotas::new(global_config, state.project_info.get_quotas()); + if quotas.is_empty() { + return Ok(RateLimits::default()); + } + + let event_category = state.event_category(); + + // When invoking the rate limiter, capture if the event item has been rate limited to also + // remove it from the processing state eventually. + let mut envelope_limiter = + EnvelopeLimiter::new(ApplyLimits::All, |item_scope, quantity| match self { + RateLimiter::Cached => Ok(state.rate_limits.check_with_quotas(quotas, item_scope)), + RateLimiter::Consistent(rl) => Ok::<_, ProcessingError>( + rl.is_rate_limited(quotas, item_scope, quantity, false)?, + ), + }); + + // Tell the envelope limiter about the event, since it has been removed from the Envelope at + // this stage in processing. + if let Some(category) = event_category { + envelope_limiter.assume_event(category); + } + + let scoping = state.managed_envelope.scoping(); + let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { + envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)? + }); + let event_active = enforcement.is_event_active(); + + // Use the same rate limits as used for the envelope on the metrics. + // Those rate limits should not be checked for expiry or similar to ensure a consistent + // limiting of envelope items and metrics. + state.extracted_metrics.apply_enforcement(&enforcement); + enforcement.apply_with_outcomes(&mut state.managed_envelope); + + if event_active { + state.remove_event(); + debug_assert!(state.envelope().is_empty()); + } + + Ok(limits) + } +} + fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result { let envelope_body: Vec = match http_encoding { HttpEncoding::Identity => return Ok(body.clone()), @@ -3463,6 +3533,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_info), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -3525,6 +3596,7 @@ mod tests { let process_message = ProcessEnvelope { envelope: managed_envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index cb81de7efc..1267015444 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -298,6 +298,7 @@ mod tests { let message = ProcessEnvelope { envelope: ManagedEnvelope::new(envelope, outcome_aggregator, test_store, group), project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info, reservoir_counters: ReservoirCounters::default(), }; @@ -438,6 +439,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: config.clone(), project_info, + rate_limits: Default::default(), sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: ManagedEnvelope::new( @@ -709,6 +711,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), project_info, + rate_limits: Default::default(), sampling_project_info: { let mut state = ProjectInfo::default(); state.config.metric_extraction = diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 559f61555c..be93112a02 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -259,6 +259,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -389,6 +390,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_info), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -458,6 +460,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -529,6 +532,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 3bc54a4557..002a0753de 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -325,6 +325,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -379,6 +380,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -441,6 +443,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -481,6 +484,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -529,6 +533,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 3e709a4357..2f6f9f94dd 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -762,6 +762,7 @@ mod tests { }; use relay_event_schema::protocol::{Contexts, Event, Span}; use relay_protocol::get_value; + use relay_quotas::RateLimits; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator}; use relay_system::Addr; @@ -821,6 +822,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), project_info, + rate_limits: RateLimits::default(), sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: managed_envelope.try_into().unwrap(), diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index b198d686b3..deb6fca6bf 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -21,7 +21,7 @@ use crate::services::project_cache::{ }; use crate::statsd::RelayCounters; -use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; +use crate::utils::{ApplyLimits, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; pub mod state; @@ -551,7 +551,7 @@ impl Project { let current_limits = self.rate_limits.current_limits(); let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]); - let envelope_limiter = EnvelopeLimiter::new(|item_scoping, _| { + let envelope_limiter = EnvelopeLimiter::new(ApplyLimits::NonIndexed, |item_scoping, _| { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }); @@ -796,59 +796,4 @@ mod tests { assert_eq!(outcome.quantity, expected_quantity); } } - - #[test] - fn test_track_nested_spans_outcomes_span_quota() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["span_indexed"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS); - - envelope.add_item(transaction); - - let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let (test_store, _) = Addr::custom(); - - let managed_envelope = ManagedEnvelope::new( - envelope, - outcome_aggregator.clone(), - test_store, - ProcessingGroup::Transaction, - ); - - let CheckedEnvelope { - envelope, - rate_limits: _, - } = project.check_envelope(managed_envelope).unwrap(); - let envelope = envelope.unwrap(); - let transaction_item = envelope - .envelope() - .items() - .find(|i| *i.ty() == ItemType::Transaction) - .unwrap(); - assert!(transaction_item.spans_extracted()); - - drop(outcome_aggregator); - - let expected = [(DataCategory::SpanIndexed, 3)]; - - for (expected_category, expected_quantity) in expected { - let outcome = outcome_aggregator_rx.blocking_recv().unwrap(); - assert_eq!(outcome.category, expected_category); - assert_eq!(outcome.quantity, expected_quantity); - } - } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 27a8f07be8..62f49cdeae 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -872,6 +872,8 @@ impl ProjectCacheBroker { .. }) = project.check_envelope(managed_envelope) { + let rate_limits = project.current_rate_limits().clone(); + let reservoir_counters = project.reservoir_counters(); let sampling_project_info = managed_envelope @@ -884,6 +886,7 @@ impl ProjectCacheBroker { let process = ProcessEnvelope { envelope: managed_envelope, project_info, + rate_limits, sampling_project_info, reservoir_counters, }; @@ -1156,6 +1159,7 @@ impl ProjectCacheBroker { services.envelope_processor.send(ProcessEnvelope { envelope: managed_envelope, project_info: project_info.clone(), + rate_limits: project.current_rate_limits().clone(), sampling_project_info: sampling_project_info.clone(), reservoir_counters, }); diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index db18c0b234..3b3ade8aa4 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -520,6 +520,39 @@ impl Enforcement { } } +/// Which limits to apply with the [`EnvelopeLimiter`]. +#[derive(Debug, Copy, Clone)] +pub enum ApplyLimits { + /// Applies all limits to the envelope except indexed categories. + /// + /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits. + /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known. + /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics + /// and cannot be dropped too early. + NonIndexed, + /// Applies all limits to the envelope. + #[cfg_attr(not(any(feature = "processing", test)), expect(dead_code))] + All, +} + +struct Check { + limits: ApplyLimits, + check: F, +} + +impl Check +where + F: FnMut(ItemScoping<'_>, usize) -> Result, +{ + fn apply(&mut self, scoping: ItemScoping<'_>, quantity: usize) -> Result { + if matches!(self.limits, ApplyLimits::NonIndexed) && scoping.category.is_indexed() { + return Ok(RateLimits::default()); + } + + (self.check)(scoping, quantity) + } +} + /// Enforces rate limits with the given `check` function on items in the envelope. /// /// The `check` function is called with the following rules: @@ -532,7 +565,7 @@ impl Enforcement { /// - Attachments are not removed if they create events (e.g. minidumps). /// - Sessions are handled separate to all of the above. pub struct EnvelopeLimiter { - check: F, + check: Check, event_category: Option, } @@ -541,9 +574,9 @@ where F: FnMut(ItemScoping<'_>, usize) -> Result, { /// Create a new `EnvelopeLimiter` with the given `check` function. - pub fn new(check: F) -> Self { + pub fn new(limits: ApplyLimits, check: F) -> Self { Self { - check, + check: Check { check, limits }, event_category: None, } } @@ -590,14 +623,14 @@ where if let Some(category) = summary.event_category { // Check the broad category for limits. - let mut event_limits = (self.check)(scoping.item(category), 1)?; + let mut event_limits = self.check.apply(scoping.item(category), 1)?; enforcement.event = CategoryLimit::new(category, 1, event_limits.longest()); if let Some(index_category) = category.index_category() { // Check the specific/indexed category for limits only if the specific one has not already // an enforced limit. if event_limits.is_empty() { - event_limits.merge((self.check)(scoping.item(index_category), 1)?); + event_limits.merge(self.check.apply(scoping.item(index_category), 1)?); } enforcement.event_indexed = @@ -612,7 +645,9 @@ where limit.clone_for(DataCategory::Attachment, summary.attachment_quantity); } else if summary.attachment_quantity > 0 { let item_scoping = scoping.item(DataCategory::Attachment); - let attachment_limits = (self.check)(item_scoping, summary.attachment_quantity)?; + let attachment_limits = self + .check + .apply(item_scoping, summary.attachment_quantity)?; enforcement.attachments = CategoryLimit::new( DataCategory::Attachment, summary.attachment_quantity, @@ -629,7 +664,7 @@ where if summary.session_quantity > 0 { let item_scoping = scoping.item(DataCategory::Session); - let session_limits = (self.check)(item_scoping, summary.session_quantity)?; + let session_limits = self.check.apply(item_scoping, summary.session_quantity)?; enforcement.sessions = CategoryLimit::new( DataCategory::Session, summary.session_quantity, @@ -647,7 +682,7 @@ where .event_indexed .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity) } else if summary.profile_quantity > 0 { - let mut profile_limits = (self.check)( + let mut profile_limits = self.check.apply( scoping.item(DataCategory::Profile), summary.profile_quantity, )?; @@ -658,7 +693,7 @@ where ); if profile_limits.is_empty() { - profile_limits.merge((self.check)( + profile_limits.merge(self.check.apply( scoping.item(DataCategory::ProfileIndexed), summary.profile_quantity, )?); @@ -675,7 +710,7 @@ where if summary.replay_quantity > 0 { let item_scoping = scoping.item(DataCategory::Replay); - let replay_limits = (self.check)(item_scoping, summary.replay_quantity)?; + let replay_limits = self.check.apply(item_scoping, summary.replay_quantity)?; enforcement.replays = CategoryLimit::new( DataCategory::Replay, summary.replay_quantity, @@ -686,7 +721,7 @@ where if summary.checkin_quantity > 0 { let item_scoping = scoping.item(DataCategory::Monitor); - let checkin_limits = (self.check)(item_scoping, summary.checkin_quantity)?; + let checkin_limits = self.check.apply(item_scoping, summary.checkin_quantity)?; enforcement.check_ins = CategoryLimit::new( DataCategory::Monitor, summary.checkin_quantity, @@ -704,8 +739,9 @@ where .event_indexed .clone_for(DataCategory::SpanIndexed, summary.span_quantity); } else if summary.span_quantity > 0 { - let mut span_limits = - (self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?; + let mut span_limits = self + .check + .apply(scoping.item(DataCategory::Span), summary.span_quantity)?; enforcement.spans = CategoryLimit::new( DataCategory::Span, summary.span_quantity, @@ -713,7 +749,7 @@ where ); if span_limits.is_empty() { - span_limits.merge((self.check)( + span_limits.merge(self.check.apply( scoping.item(DataCategory::SpanIndexed), summary.span_quantity, )?); @@ -730,7 +766,9 @@ where if summary.profile_chunk_quantity > 0 { let item_scoping = scoping.item(DataCategory::ProfileChunk); - let profile_chunk_limits = (self.check)(item_scoping, summary.profile_chunk_quantity)?; + let profile_chunk_limits = self + .check + .apply(item_scoping, summary.profile_chunk_quantity)?; enforcement.profile_chunks = CategoryLimit::new( DataCategory::ProfileChunk, summary.profile_chunk_quantity, @@ -1072,7 +1110,7 @@ mod tests { let scoping = envelope.scoping(); #[allow(unused_mut)] - let mut limiter = EnvelopeLimiter::new(|s, q| mock.check(s, q)); + let mut limiter = EnvelopeLimiter::new(ApplyLimits::All, |s, q| mock.check(s, q)); #[cfg(feature = "processing")] if let Some(assume_event) = assume_event { limiter.assume_event(assume_event); @@ -1304,6 +1342,26 @@ mod tests { ); } + #[test] + fn test_enforce_transaction_non_indexed() { + let mut envelope = envelope![Transaction, Profile]; + let scoping = envelope.scoping(); + + let mut mock = MockLimiter::default().deny(DataCategory::TransactionIndexed); + + let limiter = EnvelopeLimiter::new(ApplyLimits::NonIndexed, |s, q| mock.check(s, q)); + let (enforcement, limits) = limiter.compute(envelope.envelope_mut(), &scoping).unwrap(); + enforcement.clone().apply_with_outcomes(&mut envelope); + + assert!(!limits.is_limited()); + assert!(!enforcement.event_indexed.is_active()); + assert!(!enforcement.event.is_active()); + assert!(!enforcement.profiles_indexed.is_active()); + assert!(!enforcement.profiles.is_active()); + mock.assert_call(DataCategory::Transaction, 1); + mock.assert_call(DataCategory::Profile, 1); + } + #[test] fn test_enforce_transaction_no_indexing_quota() { let mut envelope = envelope![Transaction];