From 2045ab415d404c9d6cf8ab3f0944b3e208e58679 Mon Sep 17 00:00:00 2001 From: Daniel Griesser Date: Tue, 8 Oct 2024 09:23:22 +0200 Subject: [PATCH 1/8] ref(filter): Add Rails 7 Health Endpoint (#4117) Rails 7 uses `/up` as a default health check endpoint, this only adds the tests for it, change is necessary in Sentry to actually filter. https://github.com/getsentry/sentry-docs/pull/11505 https://github.com/getsentry/sentry/pull/78751 --- relay-filter/src/transaction_name.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/relay-filter/src/transaction_name.rs b/relay-filter/src/transaction_name.rs index 5968b7369b..d60a9e10ed 100644 --- a/relay-filter/src/transaction_name.rs +++ b/relay-filter/src/transaction_name.rs @@ -49,6 +49,7 @@ mod tests { "*/health", "*/healthz", "*/ping", + "*/up", ] .map(|val| val.to_string()) .to_vec(); @@ -88,6 +89,8 @@ mod tests { "123/health", "123/healthz", "123/ping", + "/up", + "123/up", ]; for name in transaction_names { @@ -119,6 +122,7 @@ mod tests { "delivery", "notready", "already", + "/upload", ]; let config = _get_config(); From c9dc813eb965e77249b2fcac85676ae2ede2a819 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 8 Oct 2024 10:18:28 +0200 Subject: [PATCH 2/8] feat(spool): Prioritize regular messages over dequeued envelopes (#4116) During INC-913 we saw that we accumulated a backlog on the project cache (`CheckEnvelope`) while unspooling, even though we restrict unspooled envelopes in a custom bounded channel. Since there's always unspooled envelopes available, and they get prioritized over regular service messages, they seem to starve the request handler. --- relay-server/src/services/project_cache.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 27a8f07be8..ef93f49005 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1480,16 +1480,16 @@ impl Service for ProjectCacheService { broker.handle_periodic_unspool() }) } - Some(message) = envelopes_rx.recv() => { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_envelope", { - broker.handle_envelope(message) - }) - } Some(message) = rx.recv() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message", { broker.handle_message(message) }) } + Some(message) = envelopes_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_envelope", { + broker.handle_envelope(message) + }) + } else => break, } } From d4e4b5e9c4429a537aabf4d2f430041638b62a48 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 8 Oct 2024 11:58:44 +0200 Subject: [PATCH 3/8] ref(processor): Process metrics in batches (#4103) Instead of sending many messages for metrics, sends one big message. The idea is to make backlogs when they occur more actionable. --- relay-server/src/endpoints/common.rs | 7 +- relay-server/src/services/processor.rs | 129 ++++++++++++--------- relay-server/src/services/project.rs | 35 ++---- relay-server/src/services/project_cache.rs | 79 ++++++++----- 4 files changed, 135 insertions(+), 115 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 9d5e9c50fc..401f68c196 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -7,6 +7,7 @@ use relay_event_schema::protocol::{EventId, EventType}; use relay_quotas::RateLimits; use relay_statsd::metric; use serde::Deserialize; +use smallvec::smallvec; use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items}; use crate::service::ServiceState; @@ -275,10 +276,12 @@ fn queue_envelope( if !metric_items.is_empty() { relay_log::trace!("sending metrics into processing queue"); state.project_cache().send(ProcessMetrics { - data: MetricData::Raw(metric_items.into_vec()), + data: smallvec![( + envelope.meta().public_key(), + MetricData::Raw(metric_items.into_vec()) + )], start_time: envelope.meta().start_time().into(), sent_at: envelope.sent_at(), - project_key: envelope.meta().public_key(), source: envelope.meta().into(), }); } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 9b09ffa6d0..2689e47283 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -851,6 +851,21 @@ pub struct ProcessEnvelope { /// the Envelope specifies the [`sent_at`](Envelope::sent_at) header. #[derive(Debug)] pub struct ProcessProjectMetrics { + pub data: SmallVec<[ProcessProjectData; 1]>, + /// Whether to keep or reset the metric metadata. + pub source: BucketSource, + /// The instant at which the request was received. + pub start_time: Instant, + /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift + /// correction. + pub sent_at: Option>, +} + +/// Raw metric data associated with project data. +#[derive(Debug)] +pub struct ProcessProjectData { + /// The target project. + pub project_key: ProjectKey, /// The project state the metrics belong to. /// /// The project state can be pending, in which case cached rate limits @@ -859,18 +874,8 @@ pub struct ProcessProjectMetrics { pub project_state: ProjectState, /// Currently active cached rate limits for this project. pub rate_limits: RateLimits, - - /// A list of metric items. + /// The metric data. pub data: MetricData, - /// The target project. - pub project_key: ProjectKey, - /// Whether to keep or reset the metric metadata. - pub source: BucketSource, - /// The instant at which the request was received. - pub start_time: Instant, - /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift - /// correction. - pub sent_at: Option>, } /// Raw unparsed metric data. @@ -2098,10 +2103,7 @@ impl EnvelopeProcessorService { fn handle_process_project_metrics(&self, cogs: &mut Token, message: ProcessProjectMetrics) { let ProcessProjectMetrics { - project_state, - rate_limits, data, - project_key, start_time, sent_at, source, @@ -2109,49 +2111,57 @@ impl EnvelopeProcessorService { let received_timestamp = UnixTimestamp::from_instant(start_time); - let mut buckets = data.into_buckets(received_timestamp); - if buckets.is_empty() { - return; - }; - cogs.update(relay_metrics::cogs::BySize(&buckets)); + for ProcessProjectData { + project_key, + project_state, + rate_limits, + data, + } in data + { + let mut buckets = data.into_buckets(received_timestamp); + if buckets.is_empty() { + return; + }; + cogs.update(relay_metrics::cogs::BySize(&buckets)); - let received = relay_common::time::instant_to_date_time(start_time); - let clock_drift_processor = - ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); + let received = relay_common::time::instant_to_date_time(start_time); + let clock_drift_processor = + ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - buckets.retain_mut(|bucket| { - if let Err(error) = relay_metrics::normalize_bucket(bucket) { - relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}"); - return false; - } + buckets.retain_mut(|bucket| { + if let Err(error) = relay_metrics::normalize_bucket(bucket) { + relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}"); + return false; + } - if !self::metrics::is_valid_namespace(bucket, source) { - return false; - } + if !self::metrics::is_valid_namespace(bucket, source) { + return false; + } - clock_drift_processor.process_timestamp(&mut bucket.timestamp); + clock_drift_processor.process_timestamp(&mut bucket.timestamp); - if !matches!(source, BucketSource::Internal) { - bucket.metadata = BucketMetadata::new(received_timestamp); - } + if !matches!(source, BucketSource::Internal) { + bucket.metadata = BucketMetadata::new(received_timestamp); + } - true - }); + true + }); - // Best effort check to filter and rate limit buckets, if there is no project state - // available at the current time, we will check again after flushing. - let buckets = match project_state.enabled() { - Some(project_info) => { - self.check_buckets(project_key, &project_info, &rate_limits, buckets) - } - None => buckets, - }; + // Best effort check to filter and rate limit buckets, if there is no project state + // available at the current time, we will check again after flushing. + let buckets = match project_state.enabled() { + Some(project_info) => { + self.check_buckets(project_key, &project_info, &rate_limits, buckets) + } + None => buckets, + }; - relay_log::trace!("merging metric buckets into the aggregator"); - self.inner - .addrs - .aggregator - .send(MergeBuckets::new(project_key, buckets)); + relay_log::trace!("merging metric buckets into the aggregator"); + self.inner + .addrs + .aggregator + .send(MergeBuckets::new(project_key, buckets)); + } } fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) { @@ -2184,8 +2194,7 @@ impl EnvelopeProcessorService { feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); self.inner.addrs.project_cache.send(ProcessMetrics { - data: MetricData::Parsed(buckets), - project_key, + data: smallvec![(project_key, MetricData::Parsed(buckets))], source, start_time: start_time.into(), sent_at, @@ -3731,10 +3740,12 @@ mod tests { (BucketSource::Internal, None), ] { let message = ProcessProjectMetrics { - data: MetricData::Raw(vec![item.clone()]), - project_state: ProjectState::Pending, - rate_limits: Default::default(), - project_key, + data: smallvec![ProcessProjectData { + project_key, + project_state: ProjectState::Pending, + rate_limits: Default::default(), + data: MetricData::Raw(vec![item.clone()]), + }], source, start_time, sent_at: Some(Utc::now()), @@ -3814,11 +3825,15 @@ mod tests { }; let mut messages = vec![pm1, pm2]; - messages.sort_by_key(|pm| pm.project_key); + messages.sort_by_key(|pm| pm.data[0].0); let actual = messages .into_iter() - .map(|pm| (pm.project_key, pm.data, pm.source)) + .map(|mut pm| { + assert_eq!(pm.data.len(), 1); + let (project_key, data) = pm.data.pop().unwrap(); + (project_key, data, pm.source) + }) .collect::>(); assert_debug_snapshot!(actual, @r###" diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index 778eca9f89..7f8d51322f 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -15,11 +15,9 @@ use tokio::time::Instant; use crate::envelope::ItemType; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; +use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; use crate::services::project::state::ExpiryState; -use crate::services::project_cache::{ - CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, -}; +use crate::services::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate}; use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; @@ -164,23 +162,6 @@ impl Project { self.last_updated_at = Instant::now(); } - /// Collects internal project state and assembles a [`ProcessProjectMetrics`] message. - pub fn process_metrics(&mut self, message: ProcessMetrics) -> ProcessProjectMetrics { - let project_state = self.current_state(); - let rate_limits = self.rate_limits.current_limits().clone(); - - ProcessProjectMetrics { - project_state, - rate_limits, - - data: message.data, - project_key: message.project_key, - source: message.source, - start_time: message.start_time.into(), - sent_at: message.sent_at, - } - } - /// Returns a list of buckets back to the aggregator. /// /// This is used to return flushed buckets back to the aggregator if the project has not been @@ -292,7 +273,7 @@ impl Project { /// needs to be upgraded with the `no_cache` flag to ensure a more recent update. fn fetch_state( &mut self, - project_cache: Addr, + project_cache: &Addr, no_cache: bool, ) -> &mut StateChannel { // If there is a running request and we do not need to upgrade it to no_cache, or if the @@ -322,7 +303,7 @@ impl Project { fn get_or_fetch_state( &mut self, - project_cache: Addr, + project_cache: &Addr, mut no_cache: bool, ) -> GetOrFetch<'_> { // count number of times we are looking for the project state @@ -374,7 +355,7 @@ impl Project { /// To wait for a valid state instead, use [`get_state`](Self::get_state). pub fn get_cached_state( &mut self, - project_cache: Addr, + project_cache: &Addr, no_cache: bool, ) -> ProjectState { match self.get_or_fetch_state(project_cache, no_cache) { @@ -393,7 +374,7 @@ impl Project { /// are in the [grace period](Config::project_grace_period). pub fn get_state( &mut self, - project_cache: Addr, + project_cache: &Addr, sender: ProjectSender, no_cache: bool, ) { @@ -418,7 +399,7 @@ impl Project { /// point. Therefore, this method is useful to trigger an update early if it is already clear /// that the project state will be needed soon. To retrieve an updated state, use /// [`Project::get_state`] instead. - pub fn prefetch(&mut self, project_cache: Addr, no_cache: bool) -> &mut Self { + pub fn prefetch(&mut self, project_cache: &Addr, no_cache: bool) -> &mut Self { self.get_cached_state(project_cache, no_cache); self } @@ -744,7 +725,7 @@ mod tests { &envelope_processor, false, ); - project.fetch_state(addr, false); + project.fetch_state(&addr, false); } fn create_project(config: Option) -> Project { diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ef93f49005..ad3c459b3a 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -7,7 +7,8 @@ use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; use crate::services::global_config; use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, + EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessProjectData, + ProcessProjectMetrics, ProcessingGroup, ProjectMetrics, }; use crate::services::project::state::UpstreamProjectState; use crate::Envelope; @@ -22,6 +23,7 @@ use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; +use smallvec::SmallVec; #[cfg(feature = "processing")] use tokio::sync::Semaphore; use tokio::sync::{mpsc, watch}; @@ -210,13 +212,11 @@ impl From<&RequestMeta> for BucketSource { /// Starts the processing flow for received metrics. /// /// Enriches the raw data with projcet information and forwards -/// the metrics using [`ProcessProjectMetrics`](crate::services::processor::ProcessProjectMetrics). +/// the metrics using [`ProcessProjectMetrics`]. #[derive(Debug)] pub struct ProcessMetrics { - /// A list of metric items. - pub data: MetricData, - /// The target project. - pub project_key: ProjectKey, + /// Project associated metrics. + pub data: SmallVec<[(ProjectKey, MetricData); 1]>, /// Whether to keep or reset the metric metadata. pub source: BucketSource, /// The instant at which the request was received. @@ -795,13 +795,13 @@ impl ProjectCacheBroker { let project_cache = self.services.project_cache.clone(); let project = self.get_or_create_project(project_key); - project.get_state(project_cache, sender, no_cache); + project.get_state(&project_cache, sender, no_cache); } fn handle_get_cached(&mut self, message: GetCachedProjectState) -> ProjectState { let project_cache = self.services.project_cache.clone(); self.get_or_create_project(message.project_key) - .get_cached_state(project_cache, false) + .get_cached_state(&project_cache, false) } fn handle_check_envelope( @@ -814,7 +814,7 @@ impl ProjectCacheBroker { if let Some(sampling_key) = context.envelope().sampling_key() { if sampling_key != project_key { let sampling_project = self.get_or_create_project(sampling_key); - sampling_project.prefetch(project_cache.clone(), false); + sampling_project.prefetch(&project_cache, false); } } let project = self.get_or_create_project(project_key); @@ -822,7 +822,7 @@ impl ProjectCacheBroker { // Preload the project cache so that it arrives a little earlier in processing. However, // do not pass `no_cache`. In case the project is rate limited, we do not want to force // a full reload. Fetching must not block the store request. - project.prefetch(project_cache, false); + project.prefetch(&project_cache, false); project.check_envelope(context) } @@ -838,14 +838,14 @@ impl ProjectCacheBroker { ); let mut project = Project::new(project_key, self.config.clone()); - project.prefetch(self.services.project_cache.clone(), false); + project.prefetch(&self.services.project_cache, false); self.projects.insert(project_key, project); self.enqueue(key, managed_envelope); return; }; let project_cache = self.services.project_cache.clone(); - let project_info = match project.get_cached_state(project_cache.clone(), false) { + let project_info = match project.get_cached_state(&project_cache, false) { ProjectState::Enabled(info) => info, ProjectState::Disabled => { managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); @@ -878,7 +878,7 @@ impl ProjectCacheBroker { .envelope() .sampling_key() .and_then(|key| self.projects.get_mut(&key)) - .and_then(|p| p.get_cached_state(project_cache, false).enabled()) + .and_then(|p| p.get_cached_state(&project_cache, false).enabled()) .filter(|state| state.organization_id == project_info.organization_id); let process = ProcessEnvelope { @@ -917,8 +917,7 @@ impl ProjectCacheBroker { let own_key = envelope.meta().public_key(); let project = self.get_or_create_project(own_key); - let project_state = - project.get_cached_state(project_cache.clone(), envelope.meta().no_cache()); + let project_state = project.get_cached_state(&project_cache, envelope.meta().no_cache()); let project_state = match project_state { ProjectState::Enabled(state) => Some(state), @@ -935,7 +934,7 @@ impl ProjectCacheBroker { let sampling_state = if let Some(sampling_key) = sampling_key { let state = self .get_or_create_project(sampling_key) - .get_cached_state(project_cache, envelope.meta().no_cache()); + .get_cached_state(&project_cache, envelope.meta().no_cache()); match state { ProjectState::Enabled(state) => Some(state), ProjectState::Disabled => { @@ -979,12 +978,34 @@ impl ProjectCacheBroker { fn handle_process_metrics(&mut self, message: ProcessMetrics) { let project_cache = self.services.project_cache.clone(); - let message = self - .get_or_create_project(message.project_key) - .prefetch(project_cache, false) - .process_metrics(message); - - self.services.envelope_processor.send(message); + let data = message + .data + .into_iter() + .map(|(project_key, data)| { + let project = self + .get_or_create_project(project_key) + .prefetch(&project_cache, false); + + let project_state = project.current_state(); + let rate_limits = project.current_rate_limits().clone(); + + ProcessProjectData { + project_key, + project_state, + rate_limits, + data, + } + }) + .collect(); + + self.services + .envelope_processor + .send(ProcessProjectMetrics { + data, + source: message.source, + start_time: message.start_time.into(), + sent_at: message.sent_at, + }); } fn handle_add_metric_meta(&mut self, message: AddMetricMeta) { @@ -1007,7 +1028,7 @@ impl ProjectCacheBroker { ProjectState::Pending => { no_project += 1; // Schedule an update for the project just in case. - project.prefetch(project_cache.clone(), false); + project.prefetch(&project_cache, false); project.return_buckets(&aggregator, buckets); continue; } @@ -1070,10 +1091,10 @@ impl ProjectCacheBroker { let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); spool_v1.index.insert(key); self.get_or_create_project(key.own_key) - .prefetch(project_cache.clone(), false); + .prefetch(&project_cache, false); if key.own_key != key.sampling_key { self.get_or_create_project(key.sampling_key) - .prefetch(project_cache.clone(), false); + .prefetch(&project_cache, false); } } } @@ -1088,7 +1109,7 @@ impl ProjectCacheBroker { let own_key = envelope.meta().public_key(); let project = self.get_or_create_project(own_key); - let project_state = project.get_cached_state(services.project_cache.clone(), false); + let project_state = project.get_cached_state(&services.project_cache, false); // Check if project config is enabled. let project_info = match project_state { @@ -1114,7 +1135,7 @@ impl ProjectCacheBroker { ( sampling_key, self.get_or_create_project(sampling_key) - .get_cached_state(services.project_cache, false), + .get_cached_state(&services.project_cache, false), ) }) { Some((_, ProjectState::Enabled(info))) => { @@ -1177,7 +1198,7 @@ impl ProjectCacheBroker { } let no_cache = false; - project.prefetch(project_cache, no_cache); + project.prefetch(&project_cache, no_cache); } /// Returns backoff timeout for an unspool attempt. @@ -1209,7 +1230,7 @@ impl ProjectCacheBroker { // Returns `Some` if the project is cached otherwise None and also triggers refresh // in background. !project - .get_cached_state(self.services.project_cache.clone(), false) + .get_cached_state(&self.services.project_cache.clone(), false) .is_pending() }) }) From 844106c2d43f2a89038fb279249ad9bef6dd2ae8 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 8 Oct 2024 12:21:42 +0200 Subject: [PATCH 4/8] ref(quotas): Cache and enforce indexed quotas in processor (#4106) Instead of never caching indexed rate limits, cache them but only selectively enforce them. With the indexed rate limits being cached, this allows us to enforce cached rate limits in the processor without ever going to Redis. --- relay-quotas/src/rate_limit.rs | 65 +----- relay-server/src/services/processor.rs | 193 +++++++++++++----- .../services/processor/dynamic_sampling.rs | 3 + .../src/services/processor/profile.rs | 4 + relay-server/src/services/processor/report.rs | 5 + .../src/services/processor/span/processing.rs | 2 + relay-server/src/services/project.rs | 4 +- relay-server/src/services/project_cache.rs | 4 + relay-server/src/utils/rate_limits.rs | 90 ++++++-- tests/integration/fixtures/processing.py | 12 +- tests/integration/test_store.py | 25 ++- 11 files changed, 273 insertions(+), 134 deletions(-) diff --git a/relay-quotas/src/rate_limit.rs b/relay-quotas/src/rate_limit.rs index b77c612952..c1162d340e 100644 --- a/relay-quotas/src/rate_limit.rs +++ b/relay-quotas/src/rate_limit.rs @@ -300,7 +300,11 @@ impl RateLimits { /// /// If no limits or quotas match, then the returned `RateLimits` instance evalutes `is_ok`. /// Otherwise, it contains rate limits that match the given scoping. - pub fn check_with_quotas(&self, quotas: &[Quota], scoping: ItemScoping<'_>) -> Self { + pub fn check_with_quotas<'a>( + &self, + quotas: impl IntoIterator, + scoping: ItemScoping<'_>, + ) -> Self { let mut applied_limits = Self::new(); for quota in quotas { @@ -396,13 +400,7 @@ impl<'a> IntoIterator for &'a RateLimits { /// Like [`RateLimits`], a collection of scoped rate limits but with all the checks /// necessary to cache the limits. /// -/// The data structure makes sure no expired rate limits are enforced as well -/// as removing any indexed rate limit. -/// -/// Cached rate limits don't enforce indexed rate limits because at the time of the check -/// the decision whether an envelope is sampled or not is not yet known. Additionally -/// even if the item is later dropped by dynamic sampling, it must still be around to extract metrics -/// and cannot be dropped too early. +/// The data structure makes sure no expired rate limits are enforced. #[derive(Debug, Default)] pub struct CachedRateLimits(RateLimits); @@ -415,13 +413,7 @@ impl CachedRateLimits { /// Adds a limit to this collection. /// /// See also: [`RateLimits::add`]. - pub fn add(&mut self, mut limit: RateLimit) { - if !limit.categories.is_empty() { - limit.categories.retain(|category| !category.is_indexed()); - if limit.categories.is_empty() { - return; - } - } + pub fn add(&mut self, limit: RateLimit) { self.0.add(limit); } @@ -1083,7 +1075,7 @@ mod tests { }); rate_limits1.add(RateLimit { - categories: smallvec![DataCategory::Transaction], + categories: smallvec![DataCategory::TransactionIndexed], scope: RateLimitScope::Organization(42), reason_code: None, retry_after: RetryAfter::from_secs(1), @@ -1114,7 +1106,7 @@ mod tests { ), RateLimit( categories: [ - transaction, + transaction_indexed, ], scope: Organization(42), reason_code: None, @@ -1166,43 +1158,4 @@ mod tests { ) "###); } - - #[test] - fn test_cached_rate_limits_indexed() { - let mut cached = CachedRateLimits::new(); - - cached.add(RateLimit { - categories: smallvec![DataCategory::Transaction, DataCategory::TransactionIndexed], - scope: RateLimitScope::Organization(42), - reason_code: None, - retry_after: RetryAfter::from_secs(5), - namespaces: smallvec![], - }); - - cached.add(RateLimit { - categories: smallvec![DataCategory::TransactionIndexed], - scope: RateLimitScope::Project(ProjectId::new(21)), - reason_code: None, - retry_after: RetryAfter::from_secs(5), - namespaces: smallvec![], - }); - - let rate_limits = cached.current_limits(); - - insta::assert_ron_snapshot!(rate_limits, @r###" - RateLimits( - limits: [ - RateLimit( - categories: [ - transaction, - ], - scope: Organization(42), - reason_code: None, - retry_after: RetryAfter(5), - namespaces: [], - ), - ], - ) - "###); - } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 2689e47283..d86609a7f9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -45,7 +45,7 @@ use smallvec::{smallvec, SmallVec}; #[cfg(feature = "processing")] use { crate::services::store::{Store, StoreEnvelope}, - crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction}, + crate::utils::{sample, CheckLimits, Enforcement, EnvelopeLimiter, ItemAction}, itertools::Itertools, relay_cardinality::{ CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter, @@ -638,24 +638,59 @@ impl ProcessingExtractedMetrics { /// This is used to apply rate limits which have been enforced on sampled items of an envelope /// to also consistently apply to the metrics extracted from these items. #[cfg(feature = "processing")] - fn apply_enforcement(&mut self, enforcement: &Enforcement) { - for (namespace, limit) in [ - (MetricNamespace::Transactions, &enforcement.event), - (MetricNamespace::Spans, &enforcement.spans), + fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) { + // Metric namespaces which need to be dropped. + let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![]; + // Metrics belonging to this metric namespace need to have the `extracted_from_indexed` + // flag reset to `false`. + let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![]; + + for (namespace, limit, indexed) in [ + ( + MetricNamespace::Transactions, + &enforcement.event, + &enforcement.event_indexed, + ), + ( + MetricNamespace::Spans, + &enforcement.spans, + &enforcement.spans_indexed, + ), ] { if limit.is_active() { - relay_log::trace!( - "dropping {namespace} metrics, due to enforced limit on envelope" - ); - self.retain(|bucket| bucket.name.try_namespace() != Some(namespace)); + drop_namespaces.push(namespace); + } else if indexed.is_active() && !enforced_consistently { + // If the enforcment was not computed by consistently checking the limits, + // the quota for the metrics has not yet been incremented. + // In this case we have a dropped indexed payload but a metric which still needs to + // be accounted for, make sure the metric will still be rate limited. + reset_extracted_from_indexed.push(namespace); } } + + if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() { + self.retain_mut(|bucket| { + let Some(namespace) = bucket.name.try_namespace() else { + return true; + }; + + if drop_namespaces.contains(&namespace) { + return false; + } + + if reset_extracted_from_indexed.contains(&namespace) { + bucket.metadata.extracted_from_indexed = false; + } + + true + }); + } } #[cfg(feature = "processing")] - fn retain(&mut self, mut f: impl FnMut(&Bucket) -> bool) { - self.metrics.project_metrics.retain(&mut f); - self.metrics.sampling_metrics.retain(&mut f); + fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) { + self.metrics.project_metrics.retain_mut(&mut f); + self.metrics.sampling_metrics.retain_mut(&mut f); } } @@ -722,6 +757,10 @@ struct ProcessEnvelopeState<'a, Group> { /// The state of the project that this envelope belongs to. project_info: Arc, + /// Currently active cached rate limits of the project this envelope belongs to. + #[cfg_attr(not(feature = "processing"), expect(dead_code))] + rate_limits: RateLimits, + /// The config of this Relay instance. config: Arc, @@ -832,9 +871,15 @@ pub struct ProcessEnvelopeResponse { /// - Rate limiters and inbound filters on events in processing mode. #[derive(Debug)] pub struct ProcessEnvelope { + /// Envelope to process. pub envelope: ManagedEnvelope, + /// The project info. pub project_info: Arc, + /// Currently active cached rate limits for this project. + pub rate_limits: RateLimits, + /// Root sampling project info. pub sampling_project_info: Option>, + /// Sampling reservoir counters. pub reservoir_counters: ReservoirCounters, } @@ -1256,6 +1301,7 @@ impl EnvelopeProcessorService { mut managed_envelope: TypedEnvelope, project_id: ProjectId, project_info: Arc, + rate_limits: RateLimits, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> ProcessEnvelopeState { @@ -1296,6 +1342,7 @@ impl EnvelopeProcessorService { metrics: Metrics::default(), extracted_metrics, project_info, + rate_limits, config, sampling_project_info, project_id, @@ -1310,55 +1357,25 @@ impl EnvelopeProcessorService { &self, state: &mut ProcessEnvelopeState, ) -> Result<(), ProcessingError> { + let global_config = self.inner.global_config.current(); let rate_limiter = match self.inner.rate_limiter.as_ref() { Some(rate_limiter) => rate_limiter, None => return Ok(()), }; - let project_info = &state.project_info; - let global_config = self.inner.global_config.current(); - let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas()); - - if quotas.is_empty() { - return Ok(()); - } - - let event_category = state.event_category(); - - // When invoking the rate limiter, capture if the event item has been rate limited to also - // remove it from the processing state eventually. - let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, quantity| { - rate_limiter.is_rate_limited(quotas, item_scope, quantity, false) - }); - - // Tell the envelope limiter about the event, since it has been removed from the Envelope at - // this stage in processing. - if let Some(category) = event_category { - envelope_limiter.assume_event(category); - } - - let scoping = state.managed_envelope.scoping(); - let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { - envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)? - }); - let event_active = enforcement.is_event_active(); + // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not + // applied in the fast path, all cached quotas can be applied here. + let _ = RateLimiter::Cached.enforce(&global_config, state)?; - // Use the same rate limits as used for the envelope on the metrics. - // Those rate limits should not be checked for expiry or similar to ensure a consistent - // limiting of envelope items and metrics. - state.extracted_metrics.apply_enforcement(&enforcement); - enforcement.apply_with_outcomes(&mut state.managed_envelope); - - if event_active { - state.remove_event(); - debug_assert!(state.envelope().is_empty()); - } + // Enforce all quotas consistently with Redis. + let limits = RateLimiter::Consistent(rate_limiter).enforce(&global_config, state)?; + // Update cached rate limits with the freshly computed ones. if !limits.is_empty() { - self.inner - .addrs - .project_cache - .send(UpdateRateLimits::new(scoping.project_key, limits)); + self.inner.addrs.project_cache.send(UpdateRateLimits::new( + state.managed_envelope.scoping().project_key, + limits, + )); } Ok(()) @@ -1887,6 +1904,7 @@ impl EnvelopeProcessorService { mut managed_envelope: ManagedEnvelope, project_id: ProjectId, project_info: Arc, + rate_limits: RateLimits, sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> Result { @@ -1911,6 +1929,7 @@ impl EnvelopeProcessorService { managed_envelope, project_id, project_info, + rate_limits, sampling_project_info, reservoir_counters, ); @@ -1987,6 +2006,7 @@ impl EnvelopeProcessorService { let ProcessEnvelope { envelope: mut managed_envelope, project_info, + rate_limits, sampling_project_info, reservoir_counters, } = message; @@ -2035,6 +2055,7 @@ impl EnvelopeProcessorService { managed_envelope, project_id, project_info, + rate_limits, sampling_project_info, reservoir_counters, ) { @@ -2916,6 +2937,70 @@ impl Service for EnvelopeProcessorService { } } +#[cfg(feature = "processing")] +enum RateLimiter<'a> { + Cached, + Consistent(&'a RedisRateLimiter), +} + +#[cfg(feature = "processing")] +impl<'a> RateLimiter<'a> { + fn enforce( + &self, + global_config: &GlobalConfig, + state: &mut ProcessEnvelopeState, + ) -> Result { + if state.envelope().is_empty() && !state.has_event() { + return Ok(RateLimits::default()); + } + + let quotas = CombinedQuotas::new(global_config, state.project_info.get_quotas()); + if quotas.is_empty() { + return Ok(RateLimits::default()); + } + + let event_category = state.event_category(); + + // When invoking the rate limiter, capture if the event item has been rate limited to also + // remove it from the processing state eventually. + let mut envelope_limiter = + EnvelopeLimiter::new(CheckLimits::All, |item_scope, quantity| match self { + RateLimiter::Cached => Ok(state.rate_limits.check_with_quotas(quotas, item_scope)), + RateLimiter::Consistent(rl) => Ok::<_, ProcessingError>( + rl.is_rate_limited(quotas, item_scope, quantity, false)?, + ), + }); + + // Tell the envelope limiter about the event, since it has been removed from the Envelope at + // this stage in processing. + if let Some(category) = event_category { + envelope_limiter.assume_event(category); + } + + let scoping = state.managed_envelope.scoping(); + let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { + envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)? + }); + let event_active = enforcement.is_event_active(); + + // Use the same rate limits as used for the envelope on the metrics. + // Those rate limits should not be checked for expiry or similar to ensure a consistent + // limiting of envelope items and metrics. + state + .extracted_metrics + .apply_enforcement(&enforcement, matches!(self, Self::Consistent(_))); + enforcement.apply_with_outcomes(&mut state.managed_envelope); + + if event_active { + state.remove_event(); + debug_assert!(state.envelope().is_empty()); + debug_assert!(!state.has_event()); + } + + Ok(limits) + } +} + fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result { let envelope_body: Vec = match http_encoding { HttpEncoding::Identity => return Ok(body.clone()), @@ -3473,6 +3558,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_info), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -3535,6 +3621,7 @@ mod tests { let process_message = ProcessEnvelope { envelope: managed_envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index cb81de7efc..1267015444 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -298,6 +298,7 @@ mod tests { let message = ProcessEnvelope { envelope: ManagedEnvelope::new(envelope, outcome_aggregator, test_store, group), project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info, reservoir_counters: ReservoirCounters::default(), }; @@ -438,6 +439,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: config.clone(), project_info, + rate_limits: Default::default(), sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: ManagedEnvelope::new( @@ -709,6 +711,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), project_info, + rate_limits: Default::default(), sampling_project_info: { let mut state = ProjectInfo::default(); state.config.metric_extraction = diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 559f61555c..be93112a02 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -259,6 +259,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -389,6 +390,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_info), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -458,6 +460,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -529,6 +532,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(project_state), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 3bc54a4557..002a0753de 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -325,6 +325,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -379,6 +380,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -441,6 +443,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -481,6 +484,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; @@ -529,6 +533,7 @@ mod tests { let message = ProcessEnvelope { envelope, project_info: Arc::new(ProjectInfo::default()), + rate_limits: Default::default(), sampling_project_info: None, reservoir_counters: ReservoirCounters::default(), }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 3e709a4357..2f6f9f94dd 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -762,6 +762,7 @@ mod tests { }; use relay_event_schema::protocol::{Contexts, Event, Span}; use relay_protocol::get_value; + use relay_quotas::RateLimits; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator}; use relay_system::Addr; @@ -821,6 +822,7 @@ mod tests { extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), project_info, + rate_limits: RateLimits::default(), sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: managed_envelope.try_into().unwrap(), diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index 7f8d51322f..e32d7ebbd1 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -21,7 +21,7 @@ use crate::services::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdat use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; -use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; +use crate::utils::{CheckLimits, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; pub mod state; @@ -534,7 +534,7 @@ impl Project { let current_limits = self.rate_limits.current_limits(); let quotas = state.as_deref().map(|s| s.get_quotas()).unwrap_or(&[]); - let envelope_limiter = EnvelopeLimiter::new(|item_scoping, _| { + let envelope_limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |item_scoping, _| { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }); diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ad3c459b3a..4804f6f170 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -872,6 +872,8 @@ impl ProjectCacheBroker { .. }) = project.check_envelope(managed_envelope) { + let rate_limits = project.current_rate_limits().clone(); + let reservoir_counters = project.reservoir_counters(); let sampling_project_info = managed_envelope @@ -884,6 +886,7 @@ impl ProjectCacheBroker { let process = ProcessEnvelope { envelope: managed_envelope, project_info, + rate_limits, sampling_project_info, reservoir_counters, }; @@ -1177,6 +1180,7 @@ impl ProjectCacheBroker { services.envelope_processor.send(ProcessEnvelope { envelope: managed_envelope, project_info: project_info.clone(), + rate_limits: project.current_rate_limits().clone(), sampling_project_info: sampling_project_info.clone(), reservoir_counters, }); diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 4adaf92c53..2dabb2ed3e 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -510,6 +510,39 @@ impl Enforcement { } } +/// Which limits to check with the [`EnvelopeLimiter`]. +#[derive(Debug, Copy, Clone)] +pub enum CheckLimits { + /// Checks all limits except indexed categories. + /// + /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits. + /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known. + /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics + /// and cannot be dropped too early. + NonIndexed, + /// Checks all limits against the envelope. + #[cfg_attr(not(any(feature = "processing", test)), expect(dead_code))] + All, +} + +struct Check { + limits: CheckLimits, + check: F, +} + +impl Check +where + F: FnMut(ItemScoping<'_>, usize) -> Result, +{ + fn apply(&mut self, scoping: ItemScoping<'_>, quantity: usize) -> Result { + if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() { + return Ok(RateLimits::default()); + } + + (self.check)(scoping, quantity) + } +} + /// Enforces rate limits with the given `check` function on items in the envelope. /// /// The `check` function is called with the following rules: @@ -522,7 +555,7 @@ impl Enforcement { /// - Attachments are not removed if they create events (e.g. minidumps). /// - Sessions are handled separate to all of the above. pub struct EnvelopeLimiter { - check: F, + check: Check, event_category: Option, } @@ -531,9 +564,9 @@ where F: FnMut(ItemScoping<'_>, usize) -> Result, { /// Create a new `EnvelopeLimiter` with the given `check` function. - pub fn new(check: F) -> Self { + pub fn new(limits: CheckLimits, check: F) -> Self { Self { - check, + check: Check { check, limits }, event_category: None, } } @@ -580,14 +613,14 @@ where if let Some(category) = summary.event_category { // Check the broad category for limits. - let mut event_limits = (self.check)(scoping.item(category), 1)?; + let mut event_limits = self.check.apply(scoping.item(category), 1)?; enforcement.event = CategoryLimit::new(category, 1, event_limits.longest()); if let Some(index_category) = category.index_category() { // Check the specific/indexed category for limits only if the specific one has not already // an enforced limit. if event_limits.is_empty() { - event_limits.merge((self.check)(scoping.item(index_category), 1)?); + event_limits.merge(self.check.apply(scoping.item(index_category), 1)?); } enforcement.event_indexed = @@ -602,7 +635,9 @@ where limit.clone_for(DataCategory::Attachment, summary.attachment_quantity); } else if summary.attachment_quantity > 0 { let item_scoping = scoping.item(DataCategory::Attachment); - let attachment_limits = (self.check)(item_scoping, summary.attachment_quantity)?; + let attachment_limits = self + .check + .apply(item_scoping, summary.attachment_quantity)?; enforcement.attachments = CategoryLimit::new( DataCategory::Attachment, summary.attachment_quantity, @@ -619,7 +654,7 @@ where if summary.session_quantity > 0 { let item_scoping = scoping.item(DataCategory::Session); - let session_limits = (self.check)(item_scoping, summary.session_quantity)?; + let session_limits = self.check.apply(item_scoping, summary.session_quantity)?; enforcement.sessions = CategoryLimit::new( DataCategory::Session, summary.session_quantity, @@ -637,7 +672,7 @@ where .event_indexed .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity) } else if summary.profile_quantity > 0 { - let mut profile_limits = (self.check)( + let mut profile_limits = self.check.apply( scoping.item(DataCategory::Profile), summary.profile_quantity, )?; @@ -648,7 +683,7 @@ where ); if profile_limits.is_empty() { - profile_limits.merge((self.check)( + profile_limits.merge(self.check.apply( scoping.item(DataCategory::ProfileIndexed), summary.profile_quantity, )?); @@ -665,7 +700,7 @@ where if summary.replay_quantity > 0 { let item_scoping = scoping.item(DataCategory::Replay); - let replay_limits = (self.check)(item_scoping, summary.replay_quantity)?; + let replay_limits = self.check.apply(item_scoping, summary.replay_quantity)?; enforcement.replays = CategoryLimit::new( DataCategory::Replay, summary.replay_quantity, @@ -676,7 +711,7 @@ where if summary.checkin_quantity > 0 { let item_scoping = scoping.item(DataCategory::Monitor); - let checkin_limits = (self.check)(item_scoping, summary.checkin_quantity)?; + let checkin_limits = self.check.apply(item_scoping, summary.checkin_quantity)?; enforcement.check_ins = CategoryLimit::new( DataCategory::Monitor, summary.checkin_quantity, @@ -694,8 +729,9 @@ where .event_indexed .clone_for(DataCategory::SpanIndexed, summary.span_quantity); } else if summary.span_quantity > 0 { - let mut span_limits = - (self.check)(scoping.item(DataCategory::Span), summary.span_quantity)?; + let mut span_limits = self + .check + .apply(scoping.item(DataCategory::Span), summary.span_quantity)?; enforcement.spans = CategoryLimit::new( DataCategory::Span, summary.span_quantity, @@ -703,7 +739,7 @@ where ); if span_limits.is_empty() { - span_limits.merge((self.check)( + span_limits.merge(self.check.apply( scoping.item(DataCategory::SpanIndexed), summary.span_quantity, )?); @@ -720,7 +756,9 @@ where if summary.profile_chunk_quantity > 0 { let item_scoping = scoping.item(DataCategory::ProfileChunk); - let profile_chunk_limits = (self.check)(item_scoping, summary.profile_chunk_quantity)?; + let profile_chunk_limits = self + .check + .apply(item_scoping, summary.profile_chunk_quantity)?; enforcement.profile_chunks = CategoryLimit::new( DataCategory::ProfileChunk, summary.profile_chunk_quantity, @@ -1062,7 +1100,7 @@ mod tests { let scoping = envelope.scoping(); #[allow(unused_mut)] - let mut limiter = EnvelopeLimiter::new(|s, q| mock.check(s, q)); + let mut limiter = EnvelopeLimiter::new(CheckLimits::All, |s, q| mock.check(s, q)); #[cfg(feature = "processing")] if let Some(assume_event) = assume_event { limiter.assume_event(assume_event); @@ -1294,6 +1332,26 @@ mod tests { ); } + #[test] + fn test_enforce_transaction_non_indexed() { + let mut envelope = envelope![Transaction, Profile]; + let scoping = envelope.scoping(); + + let mut mock = MockLimiter::default().deny(DataCategory::TransactionIndexed); + + let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |s, q| mock.check(s, q)); + let (enforcement, limits) = limiter.compute(envelope.envelope_mut(), &scoping).unwrap(); + enforcement.clone().apply_with_outcomes(&mut envelope); + + assert!(!limits.is_limited()); + assert!(!enforcement.event_indexed.is_active()); + assert!(!enforcement.event.is_active()); + assert!(!enforcement.profiles_indexed.is_active()); + assert!(!enforcement.profiles.is_active()); + mock.assert_call(DataCategory::Transaction, 1); + mock.assert_call(DataCategory::Profile, 1); + } + #[test] fn test_enforce_transaction_no_indexing_quota() { let mut envelope = envelope![Transaction]; diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index a9bf48bd47..d9bf9fc7a9 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -226,6 +226,8 @@ def assert_empty(self, timeout=None): def category_value(category): + if isinstance(category, DataCategory): + return category return DataCategory.parse(category) @@ -243,7 +245,13 @@ def get_outcome(self, timeout=None): return outcomes[0] def assert_rate_limited( - self, reason, key_id=None, categories=None, quantity=None, timeout=1 + self, + reason, + key_id=None, + categories=None, + quantity=None, + timeout=1, + ignore_other=False, ): if categories is None: outcome = self.get_outcome(timeout=timeout) @@ -253,6 +261,8 @@ def assert_rate_limited( outcomes = self.get_outcomes(timeout=timeout) expected = {category_value(category) for category in categories} actual = {outcome["category"] for outcome in outcomes} + if ignore_other: + actual = actual & set(categories) assert actual == expected, (actual, expected) for outcome in outcomes: diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 8056d9407d..60576a1e81 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -7,6 +7,8 @@ from datetime import UTC, datetime, timedelta, timezone from time import sleep +from sentry_relay.consts import DataCategory + from .asserts import time_within_delta from .consts import ( TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, @@ -860,6 +862,7 @@ def test_processing_quota_transaction_indexing( relay_with_processing, metrics_consumer, transactions_consumer, + outcomes_consumer, extraction_version, ): relay = relay_with_processing( @@ -876,6 +879,7 @@ def test_processing_quota_transaction_indexing( metrics_consumer = metrics_consumer() tx_consumer = transactions_consumer() + outcomes_consumer = outcomes_consumer() project_id = 42 projectconfig = mini_sentry.add_full_project_config(project_id) @@ -907,20 +911,29 @@ def test_processing_quota_transaction_indexing( relay.send_event(project_id, make_transaction({"message": "1st tx"})) event, _ = tx_consumer.get_event() assert event["logentry"]["formatted"] == "1st tx" - buckets = list(metrics_consumer.get_metrics()) - assert len(buckets) > 0 + assert len(list(metrics_consumer.get_metrics())) > 0 relay.send_event(project_id, make_transaction({"message": "2nd tx"})) - tx_consumer.assert_empty() - buckets = list(metrics_consumer.get_metrics()) - assert len(buckets) > 0 + assert len(list(metrics_consumer.get_metrics())) > 0 + outcomes_consumer.assert_rate_limited( + "get_lost", categories=[DataCategory.TRANSACTION_INDEXED], ignore_other=True + ) relay.send_event(project_id, make_transaction({"message": "3rd tx"})) - tx_consumer.assert_empty() + outcomes_consumer.assert_rate_limited( + "get_lost", + categories=[DataCategory.TRANSACTION, DataCategory.TRANSACTION_INDEXED], + ignore_other=True, + ) with pytest.raises(HTTPError) as exc_info: relay.send_event(project_id, make_transaction({"message": "4nd tx"})) assert exc_info.value.response.status_code == 429, "Expected a 429 status code" + outcomes_consumer.assert_rate_limited( + "get_lost", + categories=[DataCategory.TRANSACTION, DataCategory.TRANSACTION_INDEXED], + ignore_other=True, + ) def test_events_buffered_before_auth(relay, mini_sentry): From ee00d5585857f8b8209218e0fdbc3e879ccf4f76 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 8 Oct 2024 14:54:31 +0200 Subject: [PATCH 5/8] style(lints): Deny print macros (#4120) Make sure leftovers from debugging are caught early. --- Cargo.toml | 2 ++ relay-auth/src/lib.rs | 1 + relay-filter/src/error_messages.rs | 7 ------- relay-log/src/utils.rs | 1 + relay-pii/src/builtin.rs | 1 - relay-server/src/services/spooler/mod.rs | 1 + relay/src/main.rs | 6 +++++- 7 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ef37da754..100fca3242 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ strip = true [workspace.lints.clippy] dbg_macro = "warn" +print_stdout = "warn" +print_stderr = "warn" [workspace.dependencies] relay-auth = { path = "relay-auth" } diff --git a/relay-auth/src/lib.rs b/relay-auth/src/lib.rs index 6b8afa3662..49fbeba1a1 100644 --- a/relay-auth/src/lib.rs +++ b/relay-auth/src/lib.rs @@ -817,6 +817,7 @@ mod tests { /// exchanged authentication structures. /// It follows test_registration but instead of asserting it prints the strings #[test] + #[allow(clippy::print_stdout, reason = "helper test to generate output")] fn test_generate_strings_for_test_auth_py() { let max_age = Duration::minutes(15); println!("Generating test data for test_auth.py..."); diff --git a/relay-filter/src/error_messages.rs b/relay-filter/src/error_messages.rs index 34a223a1ab..579897f46d 100644 --- a/relay-filter/src/error_messages.rs +++ b/relay-filter/src/error_messages.rs @@ -142,13 +142,6 @@ mod tests { for config in &configs[..] { for &case in &cases[..] { - // Useful output to debug which testcase fails. Hidden if the test passes. - println!( - "------------------------------------------------------------------------" - ); - println!("Config: {config:?}"); - println!("Case: {case:?}"); - let (exc_type, exc_value, logentry_formatted, should_ingest) = case; let event = Event { exceptions: Annotated::new(Values::new(vec![Annotated::new(Exception { diff --git a/relay-log/src/utils.rs b/relay-log/src/utils.rs index f92ed96fb6..5041a0add7 100644 --- a/relay-log/src/utils.rs +++ b/relay-log/src/utils.rs @@ -23,6 +23,7 @@ pub fn backtrace_enabled() -> bool { /// Prefer to use [`relay_log::error`](crate::error) over this function whenever possible. This /// function is intended to be used during startup, where initializing the logger may fail or when /// errors need to be logged before the logger has been initialized. +#[allow(clippy::print_stderr, reason = "necessary for early logging")] pub fn ensure_error>(error: E) { if tracing::event_enabled!(Level::ERROR) { crate::error!(error = error.as_ref()); diff --git a/relay-pii/src/builtin.rs b/relay-pii/src/builtin.rs index 873182800a..0cb61ea34d 100644 --- a/relay-pii/src/builtin.rs +++ b/relay-pii/src/builtin.rs @@ -1321,7 +1321,6 @@ HdmUCGvfKiF2CodxyLon1XkK8pX+Ap86MbJhluqK ] { for redaction_method in &["mask", "remove", "hash", "replace"] { let key = format!("@{rule_type}:{redaction_method}"); - println!("looking up {key}"); assert!(BUILTIN_RULES_MAP.contains_key(key.as_str())); } } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index a1bbc3bffe..36debfc9bf 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1698,6 +1698,7 @@ mod tests { #[ignore] // Slow. Should probably be a criterion benchmark. #[tokio::test] + #[allow(clippy::print_stdout, reason = "benchmark test")] async fn compare_counts() { let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); let options = SqliteConnectOptions::new() diff --git a/relay/src/main.rs b/relay/src/main.rs index 3a3e9fba81..0a5b271392 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -116,7 +116,11 @@ html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png", html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" )] -#![allow(clippy::derive_partial_eq_without_eq)] +#![allow( + clippy::derive_partial_eq_without_eq, + clippy::print_stdout, + clippy::print_stderr +)] mod cli; mod cliapp; From d9f405c58c92b2324e90bfa21742b73c7935500e Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 8 Oct 2024 15:17:31 +0200 Subject: [PATCH 6/8] ref(pattern): Add a Patterns matcher for multiple patterns (#4101) Adds `Patterns` to support what `glob3` is currently doing, matching multiple patterns at once. Also adds `TypedPatterns` which is like `TypedPattern` just for `Patterns`. Possibly controversial decision: We currently ignore failing patterns when deserializing a sequence of patterns in `glob3`, `TypedPatterns` also silently ignores invalid patterns. This is most likely the default behaviour we want (at least when switching from `glob3` to `TypedPatterns`) and using a custom deserializer for each occurence would be really annoying. --- relay-pattern/src/lib.rs | 202 +++++++++++++++++++++++++++++++++-- relay-pattern/src/typed.rs | 208 ++++++++++++++++++++++++++++++++++++- 2 files changed, 400 insertions(+), 10 deletions(-) diff --git a/relay-pattern/src/lib.rs b/relay-pattern/src/lib.rs index 24f2737810..51a9dab4c8 100644 --- a/relay-pattern/src/lib.rs +++ b/relay-pattern/src/lib.rs @@ -120,14 +120,7 @@ impl Pattern { /// Returns `true` if the pattern matches the passed string. pub fn is_match(&self, haystack: &str) -> bool { - match &self.strategy { - MatchStrategy::Literal(literal) => match_literal(literal, haystack, self.options), - MatchStrategy::Prefix(prefix) => match_prefix(prefix, haystack, self.options), - MatchStrategy::Suffix(suffix) => match_suffix(suffix, haystack, self.options), - MatchStrategy::Contains(contains) => match_contains(contains, haystack, self.options), - MatchStrategy::Static(matches) => *matches, - MatchStrategy::Wildmatch(tokens) => wildmatch::is_match(haystack, tokens, self.options), - } + self.strategy.is_match(haystack, self.options) } } @@ -137,6 +130,45 @@ impl fmt::Display for Pattern { } } +/// A collection of [`Pattern`]s sharing the same configuration. +#[derive(Debug, Clone)] +pub struct Patterns { + strategies: Vec, + options: Options, +} + +impl Patterns { + /// Creates an empty [`Patterns`] instance which never matches anything. + /// + /// ``` + /// # use relay_pattern::Patterns; + /// let patterns = Patterns::empty(); + /// + /// assert!(!patterns.is_match("")); + /// assert!(!patterns.is_match("foobar")); + /// ``` + pub fn empty() -> Self { + Self { + strategies: Vec::new(), + options: Options::default(), + } + } + + /// Returns a [`PatternsBuilder`]. + pub fn builder() -> PatternsBuilder { + PatternsBuilder { + options: Options::default(), + } + } + + /// Returns `true` if any of the contained patterns matches the passed string. + pub fn is_match(&self, haystack: &str) -> bool { + self.strategies + .iter() + .any(|s| s.is_match(haystack, self.options)) + } +} + /// A builder for a [`Pattern`]. #[derive(Debug)] pub struct PatternBuilder<'a> { @@ -165,7 +197,7 @@ impl<'a> PatternBuilder<'a> { self } - /// build a new [`Pattern`] from the passed pattern and configured options. + /// Build a new [`Pattern`] from the passed pattern and configured options. pub fn build(&self) -> Result { let mut parser = Parser::new(self.pattern, self.options); parser.parse().map_err(|kind| Error { @@ -197,6 +229,88 @@ impl<'a> PatternBuilder<'a> { } } +/// A builder for a collection of [`Patterns`]. +#[derive(Debug)] +pub struct PatternsBuilder { + options: Options, +} + +impl PatternsBuilder { + /// If enabled matches the pattern case insensitive. + /// + /// This is disabled by default. + pub fn case_insensitive(&mut self, enabled: bool) -> &mut Self { + self.options.case_insensitive = enabled; + self + } + + /// Returns a [`PatternsBuilderConfigured`] builder which allows adding patterns. + pub fn patterns(&mut self) -> PatternsBuilderConfigured { + PatternsBuilderConfigured { + strategies: Vec::new(), + options: self.options, + } + } + + /// Adds a pattern to the builder and returns the resulting [`PatternsBuilderConfigured`]. + pub fn add(&mut self, pattern: &str) -> Result { + let mut builder = PatternsBuilderConfigured { + strategies: Vec::with_capacity(1), + options: self.options, + }; + builder.add(pattern)?; + Ok(builder) + } +} + +/// A [`PatternsBuilder`] with all options configured. +/// +/// The second step after [`PatternsBuilder`]. +#[derive(Debug)] +pub struct PatternsBuilderConfigured { + strategies: Vec, + options: Options, +} + +impl PatternsBuilderConfigured { + /// Adds a pattern to the builder. + pub fn add(&mut self, pattern: &str) -> Result<&mut Self, Error> { + let mut parser = Parser::new(pattern, self.options); + parser.parse().map_err(|kind| Error { + pattern: pattern.to_owned(), + kind, + })?; + + let strategy = + MatchStrategy::from_tokens(parser.tokens, self.options).map_err(|kind| Error { + pattern: pattern.to_owned(), + kind, + })?; + + self.strategies.push(strategy); + + Ok(self) + } + + /// Builds a [`Patterns`] from the contained patterns. + pub fn build(self) -> Patterns { + Patterns { + strategies: self.strategies, + options: self.options, + } + } + + /// Returns [`Patterns`] containing all added patterns and removes them from the builder. + /// + /// The builder can still be used afterwards, it keeps the configuration. + pub fn take(&mut self) -> Patterns { + Patterns { + strategies: std::mem::take(&mut self.strategies), + options: self.options, + } + } +} + /// Options to influence [`Pattern`] matching behaviour. #[derive(Debug, Clone, Copy, Default)] struct Options { @@ -263,6 +377,18 @@ impl MatchStrategy { Ok(s) } + + /// Returns `true` if the pattern matches the passed string. + pub fn is_match(&self, haystack: &str, options: Options) -> bool { + match &self { + MatchStrategy::Literal(literal) => match_literal(literal, haystack, options), + MatchStrategy::Prefix(prefix) => match_prefix(prefix, haystack, options), + MatchStrategy::Suffix(suffix) => match_suffix(suffix, haystack, options), + MatchStrategy::Contains(contains) => match_contains(contains, haystack, options), + MatchStrategy::Static(matches) => *matches, + MatchStrategy::Wildmatch(tokens) => wildmatch::is_match(haystack, tokens, options), + } + } } #[inline(always)] @@ -1661,4 +1787,62 @@ mod tests { .build() .is_ok()); } + + #[test] + fn test_patterns() { + let patterns = Patterns::builder() + .add("foobaR") + .unwrap() + .add("a*") + .unwrap() + .add("*a") + .unwrap() + .add("[0-9]*baz") + .unwrap() + .take(); + + assert!(patterns.is_match("foobaR")); + assert!(patterns.is_match("abc")); + assert!(patterns.is_match("cba")); + assert!(patterns.is_match("3baz")); + assert!(patterns.is_match("123456789baz")); + assert!(!patterns.is_match("foobar")); + assert!(!patterns.is_match("FOOBAR")); + } + + #[test] + fn test_patterns_case_insensitive() { + let patterns = Patterns::builder() + .case_insensitive(true) + .add("fOObar") + .unwrap() + .add("a*") + .unwrap() + .add("*a") + .unwrap() + .add("[0-9]*baz") + .unwrap() + .take(); + + assert!(patterns.is_match("FooBar")); + assert!(patterns.is_match("abC")); + assert!(patterns.is_match("cbA")); + assert!(patterns.is_match("3BAZ")); + assert!(patterns.is_match("123456789baz")); + assert!(!patterns.is_match("b")); + } + + #[test] + fn test_patterns_take_clears_builder() { + let mut builder = Patterns::builder().add("foo").unwrap(); + + let patterns = builder.take(); + assert!(patterns.is_match("foo")); + assert!(!patterns.is_match("bar")); + + builder.add("bar").unwrap(); + let patterns = builder.build(); + assert!(!patterns.is_match("foo")); + assert!(patterns.is_match("bar")); + } } diff --git a/relay-pattern/src/typed.rs b/relay-pattern/src/typed.rs index 6e444db21f..56835132d6 100644 --- a/relay-pattern/src/typed.rs +++ b/relay-pattern/src/typed.rs @@ -1,7 +1,7 @@ use std::marker::PhantomData; use std::ops::Deref; -use crate::{Error, Pattern}; +use crate::{Error, Pattern, Patterns, PatternsBuilderConfigured}; /// Compile time configuration for a [`TypedPattern`]. pub trait PatternConfig { @@ -99,6 +99,12 @@ impl serde::Serialize for TypedPattern { } } +impl From> for Pattern { + fn from(value: TypedPattern) -> Self { + value.pattern + } +} + impl AsRef for TypedPattern { fn as_ref(&self) -> &Pattern { &self.pattern @@ -113,6 +119,132 @@ impl Deref for TypedPattern { } } +/// [`Patterns`] with a compile time configured [`PatternConfig`]. +pub struct TypedPatterns { + patterns: Patterns, + #[cfg(feature = "serde")] + raw: Vec, + _phantom: PhantomData, +} + +impl TypedPatterns { + pub fn builder() -> TypedPatternsBuilder { + let builder = Patterns::builder() + .case_insensitive(C::CASE_INSENSITIVE) + .patterns(); + + TypedPatternsBuilder { + builder, + raw: Vec::new(), + _phantom: PhantomData, + } + } +} + +/// Deserializes patterns from a sequence of strings. +/// +/// Invalid patterns are ignored while deserializing. +#[cfg(feature = "serde")] +impl<'de, C: PatternConfig> serde::Deserialize<'de> for TypedPatterns { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor(PhantomData); + + impl<'a, C: PatternConfig> serde::de::Visitor<'a> for Visitor { + type Value = TypedPatterns; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a sequence of patterns") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'a>, + { + let mut builder = TypedPatterns::::builder(); + + while let Some(item) = seq.next_element()? { + // Ignore invalid patterns as documented. + let _ = builder.add(item); + } + + Ok(builder.build()) + } + } + + deserializer.deserialize_seq(Visitor(PhantomData)) + } +} + +#[cfg(feature = "serde")] +impl serde::Serialize for TypedPatterns { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.raw.serialize(serializer) + } +} + +impl From> for Patterns { + fn from(value: TypedPatterns) -> Self { + value.patterns + } +} + +impl AsRef for TypedPatterns { + fn as_ref(&self) -> &Patterns { + &self.patterns + } +} + +impl Deref for TypedPatterns { + type Target = Patterns; + + fn deref(&self) -> &Self::Target { + &self.patterns + } +} + +pub struct TypedPatternsBuilder { + builder: PatternsBuilderConfigured, + #[cfg(feature = "serde")] + raw: Vec, + _phantom: PhantomData, +} + +impl TypedPatternsBuilder { + /// Adds a pattern to the builder. + pub fn add(&mut self, pattern: String) -> Result<&mut Self, Error> { + self.builder.add(&pattern)?; + #[cfg(feature = "serde")] + self.raw.push(pattern); + Ok(self) + } + + /// Builds a [`TypedPatterns`] from the contained patterns. + pub fn build(self) -> TypedPatterns { + TypedPatterns { + patterns: self.builder.build(), + #[cfg(feature = "serde")] + raw: self.raw, + _phantom: PhantomData, + } + } + + /// Builds a [`TypedPatterns`] from the contained patterns and clears the builder. + pub fn take(&mut self) -> TypedPatterns { + TypedPatterns { + patterns: self.builder.take(), + #[cfg(feature = "serde")] + raw: std::mem::take(&mut self.raw), + _phantom: PhantomData, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -171,4 +303,78 @@ mod tests { let pattern: TypedPattern = TypedPattern::new("*[rt]x").unwrap(); assert_eq!(serde_json::to_string(&pattern).unwrap(), r#""*[rt]x""#); } + + #[test] + fn test_patterns_default() { + let patterns: TypedPatterns = TypedPatterns::builder() + .add("*[rt]x".to_owned()) + .unwrap() + .add("foobar".to_owned()) + .unwrap() + .take(); + assert!(patterns.is_match("f/o_rx")); + assert!(patterns.is_match("foobar")); + assert!(!patterns.is_match("Foobar")); + } + + #[test] + fn test_patterns_case_insensitive() { + let patterns: TypedPatterns = TypedPatterns::builder() + .add("*[rt]x".to_owned()) + .unwrap() + .add("foobar".to_owned()) + .unwrap() + .take(); + assert!(patterns.is_match("f/o_rx")); + assert!(patterns.is_match("f/o_Rx")); + assert!(patterns.is_match("foobar")); + assert!(patterns.is_match("Foobar")); + } + + #[test] + #[cfg(feature = "serde")] + fn test_patterns_deserialize() { + let pattern: TypedPatterns = + serde_json::from_str(r#"["*[rt]x","foobar"]"#).unwrap(); + assert!(pattern.is_match("foobar_rx")); + assert!(pattern.is_match("FOOBAR")); + } + + #[test] + #[cfg(feature = "serde")] + fn test_patterns_deserialize_err() { + let r: TypedPatterns = + serde_json::from_str(r#"["[invalid","foobar"]"#).unwrap(); + assert!(r.is_match("foobar")); + assert!(r.is_match("FOOBAR")); + + // The invalid element is dropped. + assert_eq!(serde_json::to_string(&r).unwrap(), r#"["foobar"]"#); + } + + #[test] + #[cfg(feature = "serde")] + fn test_patterns_serialize() { + let pattern: TypedPatterns = TypedPatterns::builder() + .add("*[rt]x".to_owned()) + .unwrap() + .add("foobar".to_owned()) + .unwrap() + .take(); + assert_eq!( + serde_json::to_string(&pattern).unwrap(), + r#"["*[rt]x","foobar"]"# + ); + + let pattern: TypedPatterns = TypedPatterns::builder() + .add("*[rt]x".to_owned()) + .unwrap() + .add("foobar".to_owned()) + .unwrap() + .take(); + assert_eq!( + serde_json::to_string(&pattern).unwrap(), + r#"["*[rt]x","foobar"]"# + ); + } } From 89a453ff7787b19f0fb376164c66da305fb257ce Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 8 Oct 2024 15:25:10 +0200 Subject: [PATCH 7/8] ref(server): Send batched metrics batched to project cache (#4121) Follow up from #4103, this now actually forwards batches. Implemented with a custom deserializer to minimize iterations over the parsed data (to compute feature weights) as well as minimizing allocations for contained items, deserializing directly into the proper smallvec. --- relay-server/src/services/processor.rs | 104 ++++++++++++++++--------- 1 file changed, 68 insertions(+), 36 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index d86609a7f9..12888b6c8c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2193,12 +2193,66 @@ impl EnvelopeProcessorService { sent_at, } = message; + /// Custom struct to optimize deserialization. + /// + /// Equivalent to deserializing to `HashMap>` and then transforming + /// to `SmallVec<[(ProjectKey, MetricData); _]>` in a separate step. + /// + /// But implemented with a custom deserializer to rminimize iterations over the parsed data + /// (to compute feature weights) as well as minimizing allocations for contained items by + /// deserializing directly into the proper smallvec. + struct Buckets { + data: SmallVec<[(ProjectKey, MetricData); 1]>, + feature_weights: FeatureWeights, + } + + impl<'de> serde::Deserialize<'de> for Buckets { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor; + + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = Buckets; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a mapping of project key to list of buckets") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut data = smallvec![]; + let mut feature_weights = FeatureWeights::none(); + + while let Some((project_key, buckets)) = map.next_entry::<_, Vec<_>>()? { + feature_weights = + feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); + data.push((project_key, MetricData::Parsed(buckets))); + } + + Ok(Buckets { + data, + feature_weights, + }) + } + } + + deserializer.deserialize_map(Visitor) + } + } + #[derive(serde::Deserialize)] struct Wrapper { - buckets: HashMap>, + buckets: Buckets, } - let buckets = match serde_json::from_slice(&payload) { + let Buckets { + data, + feature_weights, + } = match serde_json::from_slice(&payload) { Ok(Wrapper { buckets }) => buckets, Err(error) => { relay_log::debug!( @@ -2210,21 +2264,14 @@ impl EnvelopeProcessorService { } }; - let mut feature_weights = FeatureWeights::none(); - for (project_key, buckets) in buckets { - feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into()); + cogs.update(feature_weights); - self.inner.addrs.project_cache.send(ProcessMetrics { - data: smallvec![(project_key, MetricData::Parsed(buckets))], - source, - start_time: start_time.into(), - sent_at, - }); - } - - if !feature_weights.is_empty() { - cogs.update(feature_weights); - } + self.inner.addrs.project_cache.send(ProcessMetrics { + data, + source, + start_time: start_time.into(), + sent_at, + }); } fn handle_process_metric_meta(&self, message: ProcessMetricMeta) { @@ -3903,27 +3950,12 @@ mod tests { processor.handle_process_batched_metrics(&mut token, message); let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::ProcessMetrics(pm1) = value else { + let ProjectCache::ProcessMetrics(pm) = value else { panic!() }; - let value = project_cache_rx.recv().await.unwrap(); - let ProjectCache::ProcessMetrics(pm2) = value else { - panic!() - }; - - let mut messages = vec![pm1, pm2]; - messages.sort_by_key(|pm| pm.data[0].0); - let actual = messages - .into_iter() - .map(|mut pm| { - assert_eq!(pm.data.len(), 1); - let (project_key, data) = pm.data.pop().unwrap(); - (project_key, data, pm.source) - }) - .collect::>(); - - assert_debug_snapshot!(actual, @r###" + assert_eq!(pm.source, BucketSource::Internal); + assert_debug_snapshot!(pm.data, @r###" [ ( ProjectKey("11111111111111111111111111111111"), @@ -3951,7 +3983,6 @@ mod tests { }, ], ), - Internal, ), ( ProjectKey("22222222222222222222222222222222"), @@ -3977,9 +4008,10 @@ mod tests { }, ], ), - Internal, ), ] "###); + + assert!(project_cache_rx.try_recv().is_err()); } } From d1e950f20193a19e9b812e911f11e0faa711433b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 8 Oct 2024 15:37:29 +0200 Subject: [PATCH 8/8] fix(tests): Fix flaky `test_readiness_not_enough_memory_bytes` (#4115) Unflake `test_readiness_not_enough_memory` and `test_readiness_not_enough_bytes`. The relay errors expected by these tests did not always occur in order, and not always within 1 second. I confirmed by brute force that these tests no longer fail, even with 100 repetitions: https://github.com/getsentry/relay/actions/runs/11235435151/job/31233375132?pr=4115#step:8:735 --- tests/integration/test_healthchecks.py | 39 +++++++++++++++----------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index c92ff5d7ca..69063a371f 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -2,6 +2,7 @@ Test the health check endpoints """ +import queue import time import tempfile import os @@ -80,6 +81,26 @@ def test_readiness_proxy(mini_sentry, relay): assert response.status_code == 200 +def assert_not_enough_memory(relay, mini_sentry, expected_comparison): + response = wait_get(relay, "/api/relay/healthcheck/ready/") + assert response.status_code == 503 + errors = "" + for _ in range(100): + try: + errors += f"{mini_sentry.test_failures.get(timeout=10)}\n" + except queue.Empty: + break + if ( + "Not enough memory" in errors + and expected_comparison in errors + and "Health check probe 'system memory'" in errors + and "Health check probe 'spool health'" in errors + ): + return + + assert False, f"Not all errors represented: {errors}" + + def test_readiness_not_enough_memory_bytes(mini_sentry, relay): relay = relay( mini_sentry, @@ -87,14 +108,7 @@ def test_readiness_not_enough_memory_bytes(mini_sentry, relay): wait_health_check=False, ) - response = wait_get(relay, "/api/relay/healthcheck/ready/") - error = str(mini_sentry.test_failures.get(timeout=2)) - assert "Not enough memory" in error and ">= 42" in error - error = str(mini_sentry.test_failures.get(timeout=1)) - assert "Health check probe 'system memory'" in error - error = str(mini_sentry.test_failures.get(timeout=1)) - assert "Health check probe 'spool health'" in error - assert response.status_code == 503 + assert_not_enough_memory(relay, mini_sentry, ">= 42") def test_readiness_not_enough_memory_percent(mini_sentry, relay): @@ -103,15 +117,8 @@ def test_readiness_not_enough_memory_percent(mini_sentry, relay): {"relay": {"mode": "proxy"}, "health": {"max_memory_percent": 0.01}}, wait_health_check=False, ) - response = wait_get(relay, "/api/relay/healthcheck/ready/") - error = str(mini_sentry.test_failures.get(timeout=2)) - assert "Not enough memory" in error and ">= 1.00%" in error - error = str(mini_sentry.test_failures.get(timeout=1)) - assert "Health check probe 'system memory'" in error - error = str(mini_sentry.test_failures.get(timeout=1)) - assert "Health check probe 'spool health'" in error - assert response.status_code == 503 + assert_not_enough_memory(relay, mini_sentry, ">= 1.00%") def test_readiness_depends_on_aggregator_being_full(mini_sentry, relay):