diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ada767ecda..28548848d3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -365,12 +365,6 @@ jobs: REVISION: "${{ github.event.pull_request.head.sha || github.sha }}" steps: - - name: Install cosign - uses: sigstore/cosign-installer@v3.6.0 - - - name: Install regctl - uses: regclient/actions/regctl-installer@2dac4eff5925ed07edbfe12d2d11af6304df29a6 - - name: Login to DockerHub run: docker login --username=sentrybuilder --password ${{ secrets.DOCKER_HUB_RW_TOKEN }} @@ -378,14 +372,14 @@ jobs: run: | # We push 3 tags to Dockerhub: # 1) the full sha of the commit - regctl image copy "${GHCR_DOCKER_IMAGE}:${REVISION}" "${DH_DOCKER_IMAGE}:${REVISION}" + docker buildx imagetools create --tag "${DH_DOCKER_IMAGE}:${REVISION}" "${GHCR_DOCKER_IMAGE}:${REVISION}" # 2) the short sha SHORT_SHA=$(echo ${GITHUB_SHA} | cut -c1-8) - regctl image copy "${GHCR_DOCKER_IMAGE}:${REVISION}" "${DH_DOCKER_IMAGE}:${SHORT_SHA}" + docker buildx imagetools create --tag "${DH_DOCKER_IMAGE}:${SHORT_SHA}" "${GHCR_DOCKER_IMAGE}:${REVISION}" # 3) nightly - regctl image copy "${GHCR_DOCKER_IMAGE}:nightly" "${DH_DOCKER_IMAGE}:nightly" + docker buildx imagetools create --tag "${DH_DOCKER_IMAGE}:nightly" "${GHCR_DOCKER_IMAGE}:${REVISION}" publish-to-gcr: timeout-minutes: 5 @@ -413,9 +407,6 @@ jobs: if: "!startsWith(github.ref, 'refs/heads/release-library/') && !github.event.pull_request.head.repo.fork && github.actor != 'dependabot[bot]' && needs.build-setup.outputs.full_ci == 'true'" steps: - - name: Install cosign - uses: sigstore/cosign-installer@v3.6.0 - - name: Google Auth id: auth uses: google-github-actions/auth@v2 @@ -434,15 +425,12 @@ jobs: run: | gcloud auth configure-docker us-central1-docker.pkg.dev - - name: Install regctl - uses: regclient/actions/regctl-installer@2dac4eff5925ed07edbfe12d2d11af6304df29a6 - - name: Copy Image from GHCR to AR - run: regctl image copy "${GHCR_DOCKER_IMAGE}:${REVISION}" "${AR_DOCKER_IMAGE}:${REVISION}" + run: docker buildx imagetools create --tag "${AR_DOCKER_IMAGE}:${REVISION}" "${GHCR_DOCKER_IMAGE}:${REVISION}" - name: Copy Nightly from GHCR to AR if: github.ref == 'refs/heads/master' - run: regctl image copy "${GHCR_DOCKER_IMAGE}:nightly" "${AR_DOCKER_IMAGE}:nightly" + run: docker buildx imagetools create --tag "${AR_DOCKER_IMAGE}:nightly" "${GHCR_DOCKER_IMAGE}:nightly" gocd-artifacts: timeout-minutes: 5 @@ -508,7 +496,7 @@ jobs: test_integration: name: Integration Tests runs-on: ubuntu-latest - timeout-minutes: 20 + timeout-minutes: 30 # Skip redundant checks for library releases if: "!startsWith(github.ref, 'refs/heads/release-library/')" diff --git a/CHANGELOG.md b/CHANGELOG.md index aef9b78171..6e9333d66c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,6 @@ - Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073)) - Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068)) - Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080)) -- Prevent span extraction when quota is active to reduce load on redis. ([#4097](https://github.com/getsentry/relay/pull/4097)) ## 24.9.0 diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index b53df37f6b..afc9b3fb78 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -53,7 +53,6 @@ use smallvec::SmallVec; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::extractors::{PartialMeta, RequestMeta}; -use crate::utils::SeqCount; pub const CONTENT_TYPE: &str = "application/x-sentry-envelope"; @@ -872,27 +871,6 @@ impl Item { self.headers.other.insert(name.into(), value.into()) } - /// Counts how many spans are contained in a transaction payload. - /// - /// The transaction itself represents a span as well, so this function returns - /// `len(event.spans) + 1`. - /// - /// Returns zero if - /// - the item is not a transaction, - /// - the spans have already been extracted (in which case they are represented elsewhere). - pub fn count_nested_spans(&self) -> usize { - #[derive(Debug, Deserialize)] - struct PartialEvent { - spans: SeqCount, - } - - if self.ty() != &ItemType::Transaction || self.spans_extracted() { - return 0; - } - - serde_json::from_slice::(&self.payload()).map_or(0, |event| event.spans.0 + 1) - } - /// Determines whether the given item creates an event. /// /// This is only true for literal events and crash report attachments. diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index 98b76fd22b..09a06fa6a2 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -49,12 +49,14 @@ impl Extractable for Span { /// If this is a transaction event with spans, metrics will also be extracted from the spans. pub fn extract_metrics( event: &mut Event, + spans_extracted: bool, config: CombinedMetricExtractionConfig<'_>, max_tag_value_size: usize, span_extraction_sample_rate: Option, ) -> Vec { let mut metrics = generic::extract_metrics(event, config); - if sample(span_extraction_sample_rate.unwrap_or(1.0)) { + // If spans were already extracted for an event, we rely on span processing to extract metrics. + if !spans_extracted && sample(span_extraction_sample_rate.unwrap_or(1.0)) { extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics); } @@ -1200,6 +1202,7 @@ mod tests { extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config(features, None).combined(), 200, None, @@ -1410,6 +1413,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1466,6 +1470,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1497,6 +1502,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1759,6 +1765,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1899,7 +1906,13 @@ mod tests { ); let config = binding.combined(); - let _ = extract_metrics(event.value_mut().as_mut().unwrap(), config, 200, None); + let _ = extract_metrics( + event.value_mut().as_mut().unwrap(), + false, + config, + 200, + None, + ); insta::assert_debug_snapshot!(&event.value().unwrap()._metrics_summary); insta::assert_debug_snapshot!( diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index b3915b7924..9b09ffa6d0 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -168,7 +168,7 @@ pub trait EventProcessing {} /// A trait for processing groups that can be dynamically sampled. pub trait Sampling { /// Whether dynamic sampling should run under the given project's conditions. - fn supports_sampling(project_state: &ProjectInfo) -> bool; + fn supports_sampling(project_info: &ProjectInfo) -> bool; /// Whether reservoir sampling applies to this processing group (a.k.a. data type). fn supports_reservoir_sampling() -> bool; @@ -178,9 +178,9 @@ processing_group!(TransactionGroup, Transaction); impl EventProcessing for TransactionGroup {} impl Sampling for TransactionGroup { - fn supports_sampling(project_state: &ProjectInfo) -> bool { + fn supports_sampling(project_info: &ProjectInfo) -> bool { // For transactions, we require transaction metrics to be enabled before sampling. - matches!(&project_state.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()) + matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()) } fn supports_reservoir_sampling() -> bool { @@ -199,9 +199,9 @@ processing_group!(CheckInGroup, CheckIn); processing_group!(SpanGroup, Span); impl Sampling for SpanGroup { - fn supports_sampling(project_state: &ProjectInfo) -> bool { + fn supports_sampling(project_info: &ProjectInfo) -> bool { // If no metrics could be extracted, do not sample anything. - matches!(&project_state.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()) + matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()) } fn supports_reservoir_sampling() -> bool { @@ -720,14 +720,14 @@ struct ProcessEnvelopeState<'a, Group> { extracted_metrics: ProcessingExtractedMetrics, /// The state of the project that this envelope belongs to. - project_state: Arc, + project_info: Arc, /// The config of this Relay instance. config: Arc, /// The state of the project that initiated the current trace. /// This is the config used for trace-based dynamic sampling. - sampling_project_state: Option>, + sampling_project_info: Option>, /// The id of the project that this envelope is ingested into. /// @@ -798,7 +798,7 @@ impl<'a, Group> ProcessEnvelopeState<'a, Group> { fn should_filter(&self, feature: Feature) -> bool { match self.config.relay_mode() { RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false, - RelayMode::Managed => !self.project_state.has_feature(feature), + RelayMode::Managed => !self.project_info.has_feature(feature), } } } @@ -1250,15 +1250,15 @@ impl EnvelopeProcessorService { config: Arc, mut managed_envelope: TypedEnvelope, project_id: ProjectId, - project_state: Arc, - sampling_project_state: Option>, + project_info: Arc, + sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> ProcessEnvelopeState { let envelope = managed_envelope.envelope_mut(); // Set the event retention. Effectively, this value will only be available in processing // mode when the full project config is queried from the upstream. - if let Some(retention) = project_state.config.event_retention { + if let Some(retention) = project_info.config.event_retention { envelope.set_retention(retention); } @@ -1290,9 +1290,9 @@ impl EnvelopeProcessorService { spans_extracted: false, metrics: Metrics::default(), extracted_metrics, - project_state, + project_info, config, - sampling_project_state, + sampling_project_info, project_id, managed_envelope, reservoir, @@ -1310,9 +1310,9 @@ impl EnvelopeProcessorService { None => return Ok(()), }; - let project_state = &state.project_state; + let project_info = &state.project_info; let global_config = self.inner.global_config.current(); - let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas()); + let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas()); if quotas.is_empty() { return Ok(()); @@ -1378,7 +1378,7 @@ impl EnvelopeProcessorService { // it is not present in the actual project config payload. let global = self.inner.global_config.current(); let combined_config = { - let config = match &state.project_state.config.metric_extraction { + let config = match &state.project_info.config.metric_extraction { ErrorBoundary::Ok(ref config) if config.is_supported() => config, _ => return Ok(()), }; @@ -1403,7 +1403,7 @@ impl EnvelopeProcessorService { }; // Require a valid transaction metrics config. - let tx_config = match &state.project_state.config.transaction_metrics { + let tx_config = match &state.project_info.config.transaction_metrics { Some(ErrorBoundary::Ok(tx_config)) => tx_config, Some(ErrorBoundary::Err(e)) => { relay_log::debug!("Failed to parse legacy transaction metrics config: {e}"); @@ -1431,6 +1431,7 @@ impl EnvelopeProcessorService { let metrics = crate::metrics_extraction::event::extract_metrics( event, + state.spans_extracted, combined_config, self.inner .config @@ -1444,7 +1445,7 @@ impl EnvelopeProcessorService { .extracted_metrics .extend_project_metrics(metrics, Some(sampling_decision)); - if !state.project_state.has_feature(Feature::DiscardTransaction) { + if !state.project_info.has_feature(Feature::DiscardTransaction) { let transaction_from_dsc = state .managed_envelope .envelope() @@ -1505,7 +1506,7 @@ impl EnvelopeProcessorService { let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice(); let retention_days: i64 = state - .project_state + .project_info .config .event_retention .unwrap_or(DEFAULT_EVENT_RETENTION) @@ -1523,7 +1524,7 @@ impl EnvelopeProcessorService { }; let key_id = state - .project_state + .project_info .get_public_key_config() .and_then(|key| Some(key.numeric_id?.to_string())); if full_normalization && key_id.is_none() { @@ -1538,7 +1539,7 @@ impl EnvelopeProcessorService { client: request_meta.client().map(str::to_owned), key_id, protocol_version: Some(request_meta.version().to_string()), - grouping_config: state.project_state.config.grouping_config.clone(), + grouping_config: state.project_info.config.grouping_config.clone(), client_ip: client_ipaddr.as_ref(), client_sample_rate: state .managed_envelope @@ -1555,20 +1556,20 @@ impl EnvelopeProcessorService { .max_name_length .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD), ), - breakdowns_config: state.project_state.config.breakdowns_v2.as_ref(), - performance_score: state.project_state.config.performance_score.as_ref(), + breakdowns_config: state.project_info.config.breakdowns_v2.as_ref(), + performance_score: state.project_info.config.performance_score.as_ref(), normalize_user_agent: Some(true), transaction_name_config: TransactionNameConfig { - rules: &state.project_state.config.tx_name_rules, + rules: &state.project_info.config.tx_name_rules, }, device_class_synthesis_config: state - .project_state + .project_info .has_feature(Feature::DeviceClassSynthesis), enrich_spans: state - .project_state + .project_info .has_feature(Feature::ExtractSpansFromEvent) || state - .project_state + .project_info .has_feature(Feature::ExtractCommonSpanMetricsFromEvent), max_tag_value_length: self .inner @@ -1579,12 +1580,12 @@ impl EnvelopeProcessorService { is_renormalize: false, remove_other: full_normalization, emit_event_errors: full_normalization, - span_description_rules: state.project_state.config.span_description_rules.as_ref(), + span_description_rules: state.project_info.config.span_description_rules.as_ref(), geoip_lookup: self.inner.geoip_lookup.as_ref(), ai_model_costs: ai_model_costs.as_ref(), enable_trimming: true, measurements: Some(CombinedMeasurementsConfig::new( - state.project_state.config().measurements.as_ref(), + state.project_info.config().measurements.as_ref(), global_config.measurements.as_ref(), )), normalize_spans: true, @@ -1595,7 +1596,7 @@ impl EnvelopeProcessorService { .and_then(|ctx| ctx.replay_id), span_allowed_hosts: http_span_allowed_hosts, scrub_mongo_description: if state - .project_state + .project_info .has_feature(Feature::ScrubMongoDbDescriptions) { ScrubMongoDescription::Enabled @@ -1749,7 +1750,7 @@ impl EnvelopeProcessorService { self.extract_transaction_metrics(state, SamplingDecision::Keep, profile_id)?; if state - .project_state + .project_info .has_feature(Feature::ExtractSpansFromEvent) { span::extract_from_event(state, &global_config, server_sample_rate); @@ -1880,8 +1881,8 @@ impl EnvelopeProcessorService { &self, mut managed_envelope: ManagedEnvelope, project_id: ProjectId, - project_state: Arc, - sampling_project_state: Option>, + project_info: Arc, + sampling_project_info: Option>, reservoir_counters: Arc>>, ) -> Result { // Get the group from the managed envelope context, and if it's not set, try to guess it @@ -1889,7 +1890,7 @@ impl EnvelopeProcessorService { let group = managed_envelope.group(); // Pre-process the envelope headers. - if let Some(sampling_state) = sampling_project_state.as_ref() { + if let Some(sampling_state) = sampling_project_info.as_ref() { // Both transactions and standalone span envelopes need a normalized DSC header // to make sampling rules based on the segment/transaction name work correctly. managed_envelope @@ -1904,8 +1905,8 @@ impl EnvelopeProcessorService { self.inner.config.clone(), managed_envelope, project_id, - project_state, - sampling_project_state, + project_info, + sampling_project_info, reservoir_counters, ); match self.$fn(&mut state) { @@ -2380,7 +2381,7 @@ impl EnvelopeProcessorService { fn rate_limit_buckets( &self, scoping: Scoping, - project_state: &ProjectInfo, + project_info: &ProjectInfo, mut buckets: Vec, ) -> Vec { let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { @@ -2393,7 +2394,7 @@ impl EnvelopeProcessorService { .filter_map(|bucket| bucket.name.try_namespace()) .counts(); - let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas()); + let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas()); for (namespace, quantity) in namespaces { let item_scoping = scoping.metric_bucket(namespace); @@ -2429,7 +2430,7 @@ impl EnvelopeProcessorService { } } - match MetricsLimiter::create(buckets, project_state.config.quotas.clone(), scoping) { + match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) { Err(buckets) => buckets, Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter), } diff --git a/relay-server/src/services/processor/attachment.rs b/relay-server/src/services/processor/attachment.rs index a5188dc363..21814def5e 100644 --- a/relay-server/src/services/processor/attachment.rs +++ b/relay-server/src/services/processor/attachment.rs @@ -50,7 +50,7 @@ pub fn create_placeholders(state: &mut ProcessEnvelopeState< /// logic; otherwise the entire attachment is treated as a single binary blob. pub fn scrub(state: &mut ProcessEnvelopeState) { let envelope = state.managed_envelope.envelope_mut(); - if let Some(ref config) = state.project_state.config.pii_config { + if let Some(ref config) = state.project_info.config.pii_config { let minidump = envelope .get_item_by_mut(|item| item.attachment_type() == Some(&AttachmentType::Minidump)); diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index b37c6eb375..cb81de7efc 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -39,7 +39,7 @@ use crate::utils::{self, SamplingResult}; /// /// If there is no transaction event in the envelope, this function will do nothing. pub fn ensure_dsc(state: &mut ProcessEnvelopeState) { - if state.envelope().dsc().is_some() && state.sampling_project_state.is_some() { + if state.envelope().dsc().is_some() && state.sampling_project_info.is_some() { return; } @@ -48,13 +48,13 @@ pub fn ensure_dsc(state: &mut ProcessEnvelopeState) { let Some(event) = state.event.value() else { return; }; - let Some(key_config) = state.project_state.get_public_key_config() else { + let Some(key_config) = state.project_info.get_public_key_config() else { return; }; if let Some(dsc) = utils::dsc_from_event(key_config.public_key, event) { state.envelope_mut().set_dsc(dsc); - state.sampling_project_state = Some(state.project_state.clone()); + state.sampling_project_info = Some(state.project_info.clone()); } } @@ -63,16 +63,16 @@ pub fn run(state: &mut ProcessEnvelopeState) -> SamplingResult where Group: Sampling, { - if !Group::supports_sampling(&state.project_state) { + if !Group::supports_sampling(&state.project_info) { return SamplingResult::Pending; } - let sampling_config = match state.project_state.config.sampling { + let sampling_config = match state.project_info.config.sampling { Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), _ => None, }; - let root_state = state.sampling_project_state.as_ref(); + let root_state = state.sampling_project_info.as_ref(); let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), _ => None, @@ -196,7 +196,7 @@ pub fn tag_error_with_sampling_decision( return; }; - let root_state = state.sampling_project_state.as_ref(); + let root_state = state.sampling_project_info.as_ref(); let sampling_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { Some(ErrorBoundary::Ok(ref config)) => config, _ => return, @@ -414,14 +414,14 @@ mod tests { ..Event::default() }; - let mut project_state = state_with_rule_and_condition( + let mut project_info = state_with_rule_and_condition( Some(0.0), RuleType::Transaction, RuleCondition::all(), ); if let Some(version) = version { - project_state.config.transaction_metrics = + project_info.config.transaction_metrics = ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig { version, ..Default::default() @@ -429,7 +429,7 @@ mod tests { .into(); } - let project_state = Arc::new(project_state); + let project_info = Arc::new(project_info); let envelope = new_envelope(false, "foo"); ProcessEnvelopeState:: { @@ -437,8 +437,8 @@ mod tests { metrics: Default::default(), extracted_metrics: ProcessingExtractedMetrics::new(), config: config.clone(), - project_state, - sampling_project_state: None, + project_info, + sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: ManagedEnvelope::new( envelope, @@ -708,8 +708,8 @@ mod tests { metrics: Default::default(), extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), - project_state: project_info, - sampling_project_state: { + project_info, + sampling_project_info: { let mut state = ProjectInfo::default(); state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default()); diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 3d3c853ac5..1759d0d078 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -284,7 +284,7 @@ pub fn filter( }; let client_ip = state.managed_envelope.envelope().meta().client_addr(); - let filter_settings = &state.project_state.config.filter_settings; + let filter_settings = &state.project_info.config.filter_settings; metric!(timer(RelayTimers::EventProcessingFiltering), { relay_filter::should_filter(event, client_ip, filter_settings, global_config.filters()) @@ -303,7 +303,7 @@ pub fn filter( let supported_generic_filters = global_config.filters.is_ok() && relay_filter::are_generic_filters_supported( global_config.filters().map(|f| f.version), - state.project_state.config.filter_settings.generic.version, + state.project_info.config.filter_settings.generic.version, ); if supported_generic_filters { Ok(FiltersStatus::Ok) @@ -319,7 +319,7 @@ pub fn scrub( state: &mut ProcessEnvelopeState, ) -> Result<(), ProcessingError> { let event = &mut state.event; - let config = &state.project_state.config; + let config = &state.project_info.config; if config.datascrubbing_settings.scrub_data { if let Some(event) = event.value_mut() { diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 586685e627..559f61555c 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -89,7 +89,7 @@ pub fn transfer_id( /// Processes profiles and set the profile ID in the profile context on the transaction if successful. pub fn process(state: &mut ProcessEnvelopeState) -> Option { - let profiling_enabled = state.project_state.has_feature(Feature::Profiling); + let profiling_enabled = state.project_info.has_feature(Feature::Profiling); let mut profile_id = None; state.managed_envelope.retain_items(|item| match item.ty() { diff --git a/relay-server/src/services/processor/profile_chunk.rs b/relay-server/src/services/processor/profile_chunk.rs index 938ff0ba6d..93037b30f8 100644 --- a/relay-server/src/services/processor/profile_chunk.rs +++ b/relay-server/src/services/processor/profile_chunk.rs @@ -15,9 +15,7 @@ use { /// Removes profile chunks from the envelope if the feature is not enabled. pub fn filter(state: &mut ProcessEnvelopeState) { - let continuous_profiling_enabled = state - .project_state - .has_feature(Feature::ContinuousProfiling); + let continuous_profiling_enabled = state.project_info.has_feature(Feature::ContinuousProfiling); state.managed_envelope.retain_items(|item| match item.ty() { ItemType::ProfileChunk if !continuous_profiling_enabled => ItemAction::DropSilently, _ => ItemAction::Keep, @@ -27,9 +25,7 @@ pub fn filter(state: &mut ProcessEnvelopeState) { /// Processes profile chunks. #[cfg(feature = "processing")] pub fn process(state: &mut ProcessEnvelopeState, config: &Config) { - let continuous_profiling_enabled = state - .project_state - .has_feature(Feature::ContinuousProfiling); + let continuous_profiling_enabled = state.project_info.has_feature(Feature::ContinuousProfiling); state.managed_envelope.retain_items(|item| match item.ty() { ItemType::ProfileChunk => { if !continuous_profiling_enabled { diff --git a/relay-server/src/services/processor/replay.rs b/relay-server/src/services/processor/replay.rs index e26aa3c73f..848d08bb37 100644 --- a/relay-server/src/services/processor/replay.rs +++ b/relay-server/src/services/processor/replay.rs @@ -35,7 +35,7 @@ pub fn process( // If the replay video feature is not enabled check the envelope items for a // replay video event. if state - .project_state + .project_info .has_feature(Feature::SessionReplayVideoDisabled) && count_replay_video_events(state) > 0 { @@ -47,12 +47,12 @@ pub fn process( let meta = state.envelope().meta(); ReplayProcessingConfig { - config: &state.project_state.config, + config: &state.project_info.config, global_config, geoip_lookup, event_id: state.envelope().event_id(), - project_id: state.project_state.project_id, - organization_id: state.project_state.organization_id, + project_id: state.project_info.project_id, + organization_id: state.project_info.organization_id, client_addr: meta.client_addr(), user_agent: RawUserAgentInfo { user_agent: meta.user_agent().map(|s| s.to_owned()), @@ -62,7 +62,7 @@ pub fn process( }; let mut scrubber = if state - .project_state + .project_info .has_feature(Feature::SessionReplayRecordingScrubbing) { let datascrubbing_config = rpc @@ -83,7 +83,7 @@ pub fn process( for item in state.managed_envelope.envelope_mut().items_mut() { if state - .project_state + .project_info .has_feature(Feature::SessionReplayCombinedEnvelopeItems) { item.set_replay_combined_payload(true); diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index 329b2e511f..3bc54a4557 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -132,7 +132,7 @@ pub fn process_client_reports( } let retention_days = state - .project_state + .project_info .config() .event_retention .unwrap_or(DEFAULT_EVENT_RETENTION); diff --git a/relay-server/src/services/processor/session.rs b/relay-server/src/services/processor/session.rs index 8565130676..bc889a4642 100644 --- a/relay-server/src/services/processor/session.rs +++ b/relay-server/src/services/processor/session.rs @@ -24,7 +24,7 @@ use crate::utils::ItemAction; /// are out of range after clock drift correction. pub fn process(state: &mut ProcessEnvelopeState, config: &Config) { let received = state.managed_envelope.received_at(); - let metrics_config = state.project_state.config().session_metrics; + let metrics_config = state.project_info.config().session_metrics; let envelope = state.managed_envelope.envelope_mut(); let client = envelope.meta().client().map(|x| x.to_owned()); let client_addr = envelope.meta().client_addr(); diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 785e093ae6..3e709a4357 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -56,21 +56,21 @@ pub fn process( // once for all spans in the envelope. let sampling_result = dynamic_sampling::run(state); - let span_metrics_extraction_config = match state.project_state.config.metric_extraction { + let span_metrics_extraction_config = match state.project_info.config.metric_extraction { ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config), _ => None, }; let normalize_span_config = NormalizeSpanConfig::new( &state.config, global_config, - state.project_state.config(), + state.project_info.config(), &state.managed_envelope, state.envelope().meta().client_addr().map(IpAddr::from), geo_lookup, ); let client_ip = state.managed_envelope.envelope().meta().client_addr(); - let filter_settings = &state.project_state.config.filter_settings; + let filter_settings = &state.project_info.config.filter_settings; let mut dynamic_sampling_dropped_spans = 0; state.managed_envelope.retain_items(|item| { @@ -149,7 +149,7 @@ pub fn process( return ItemAction::DropSilently; } - if let Err(e) = scrub(&mut annotated_span, &state.project_state.config) { + if let Err(e) = scrub(&mut annotated_span, &state.project_info.config) { relay_log::error!("failed to scrub span: {e}"); } @@ -322,7 +322,7 @@ pub fn extract_from_event( .max_tag_value_length, &[], if state - .project_state + .project_info .config .features .has(Feature::ScrubMongoDbDescriptions) @@ -364,7 +364,7 @@ pub fn extract_from_event( /// Removes the transaction in case the project has made the transition to spans-only. pub fn maybe_discard_transaction(state: &mut ProcessEnvelopeState) { if state.event_type() == Some(EventType::Transaction) - && state.project_state.has_feature(Feature::DiscardTransaction) + && state.project_info.has_feature(Feature::DiscardTransaction) { state.remove_event(); state.managed_envelope.update(); @@ -781,13 +781,13 @@ mod tests { ); let dummy_envelope = Envelope::parse_bytes(bytes).unwrap(); - let mut project_state = ProjectInfo::default(); - project_state + let mut project_info = ProjectInfo::default(); + project_info .config .features .0 .insert(Feature::ExtractCommonSpanMetricsFromEvent); - let project_state = Arc::new(project_state); + let project_info = Arc::new(project_info); let event = Event { ty: EventType::Transaction.into(), @@ -820,8 +820,8 @@ mod tests { metrics: Default::default(), extracted_metrics: ProcessingExtractedMetrics::new(), config: Arc::new(Config::default()), - project_state, - sampling_project_state: None, + project_info, + sampling_project_info: None, project_id: ProjectId::new(42), managed_envelope: managed_envelope.try_into().unwrap(), event_metrics_extracted: false, diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index b198d686b3..778eca9f89 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -12,6 +12,7 @@ use relay_system::{Addr, BroadcastChannel}; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::envelope::ItemType; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; @@ -19,6 +20,7 @@ use crate::services::project::state::ExpiryState; use crate::services::project_cache::{ CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, }; +use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; @@ -555,9 +557,19 @@ impl Project { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }); - let (enforcement, mut rate_limits) = + let (mut enforcement, mut rate_limits) = envelope_limiter.compute(envelope.envelope_mut(), &scoping)?; + let check_nested_spans = state + .as_ref() + .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent)); + + // If we can extract spans from the event, we want to try and count the number of nested + // spans to correctly emit negative outcomes in case the transaction itself is dropped. + if check_nested_spans { + sync_spans_to_enforcement(&envelope, &mut enforcement); + } + enforcement.apply_with_outcomes(&mut envelope); envelope.update(); @@ -586,9 +598,52 @@ impl Project { } } +/// Adds category limits for the nested spans inside a transaction. +/// +/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted +/// as top-level spans, thus if we limited a transaction, we want to count and emit negative +/// outcomes for each of the spans nested inside that transaction. +fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { + if !enforcement.is_event_active() { + return; + } + + let spans_count = count_nested_spans(envelope); + if spans_count == 0 { + return; + } + + if enforcement.event.is_active() { + enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); + } + + if enforcement.event_indexed.is_active() { + enforcement.spans_indexed = enforcement + .event_indexed + .clone_for(DataCategory::SpanIndexed, spans_count); + } +} + +/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). +fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { + #[derive(Debug, Deserialize)] + struct PartialEvent { + spans: SeqCount, + } + + envelope + .envelope() + .items() + .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) + .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) + // We do + 1, since we count the transaction itself because it will be extracted + // as a span and counted during the slow path of rate limiting. + .map_or(0, |event| event.spans.0 + 1) +} + #[cfg(test)] mod tests { - use crate::envelope::{ContentType, Envelope, Item, ItemType}; + use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::services::processor::ProcessingGroup; use relay_base_schema::project::ProjectId; @@ -720,7 +775,27 @@ mod tests { RequestMeta::new(dsn) } - const EVENT_WITH_SPANS: &str = r#"{ + #[test] + fn test_track_nested_spans_outcomes() { + let mut project = create_project(Some(json!({ + "features": [ + "organizations:indexed-spans-extraction" + ], + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + }))); + + let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); + + let mut transaction = Item::new(ItemType::Transaction); + transaction.set_payload( + ContentType::Json, + r#"{ "event_id": "52df9022835246eeb317dbd739ccd059", "type": "transaction", "transaction": "I have a stale timestamp, but I'm recent!", @@ -746,27 +821,8 @@ mod tests { "trace_id": "ff62a8b040f340bda5d830223def1d81" } ] -}"#; - - #[test] - fn test_track_nested_spans_outcomes() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS); +}"#, + ); envelope.add_item(transaction); @@ -796,59 +852,4 @@ mod tests { assert_eq!(outcome.quantity, expected_quantity); } } - - #[test] - fn test_track_nested_spans_outcomes_span_quota() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["span_indexed"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS); - - envelope.add_item(transaction); - - let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let (test_store, _) = Addr::custom(); - - let managed_envelope = ManagedEnvelope::new( - envelope, - outcome_aggregator.clone(), - test_store, - ProcessingGroup::Transaction, - ); - - let CheckedEnvelope { - envelope, - rate_limits: _, - } = project.check_envelope(managed_envelope).unwrap(); - let envelope = envelope.unwrap(); - let transaction_item = envelope - .envelope() - .items() - .find(|i| *i.ty() == ItemType::Transaction) - .unwrap(); - assert!(transaction_item.spans_extracted()); - - drop(outcome_aggregator); - - let expected = [(DataCategory::SpanIndexed, 3)]; - - for (expected_category, expected_quantity) in expected { - let outcome = outcome_aggregator_rx.blocking_recv().unwrap(); - assert_eq!(outcome.category, expected_category); - assert_eq!(outcome.quantity, expected_quantity); - } - } } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index db18c0b234..4adaf92c53 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -217,9 +217,6 @@ impl EnvelopeSummary { summary.profile_quantity += source_quantities.profiles; } - // Also count nested spans: - summary.span_quantity += item.count_nested_spans(); - summary.payload_size += item.len(); summary.set_quantity(item); } @@ -451,7 +448,6 @@ impl Enforcement { envelope .envelope_mut() .retain_items(|item| self.retain_item(item)); - self.track_outcomes(envelope); } @@ -462,12 +458,6 @@ impl Enforcement { return false; } - if item.ty() == &ItemType::Transaction && self.spans_indexed.is_active() { - // We cannot remove nested spans from the transaction, but we can prevent them - // from being extracted into standalone spans. - item.set_spans_extracted(true); - } - // When checking limits for categories that have an indexed variant, // we only have to check the more specific, the indexed, variant // to determine whether an item is limited. diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index aa19326800..1c419b372e 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -6,7 +6,7 @@ import re import uuid from copy import deepcopy -from queue import Queue +from queue import Empty, Queue import pytest @@ -41,8 +41,7 @@ def __init__(self, server_address, app): self.captured_events = Queue() self.captured_outcomes = Queue() self.captured_metrics = Queue() - self.test_failures = [] - self.reraise_test_failures = True + self.test_failures = Queue() self.hits = {} self.known_relays = {} self.fail_on_relay_error = True @@ -64,9 +63,21 @@ def hit(self, path): self.hits.setdefault(path, 0) self.hits[path] += 1 + def current_test_failures(self): + """Return current list of test failures without waiting for additional failures.""" + try: + while failure := self.test_failures.get_nowait(): + yield failure + except Empty: + return + + def clear_test_failures(self): + """Reset test failures to an empty queue.""" + self.test_failures = Queue() + def format_failures(self): s = "" - for route, error in self.test_failures: + for route, error in self.current_test_failures(): s += f"> {route}: {error}\n" return s @@ -296,7 +307,7 @@ def store_internal_error_event(): if event is not None and sentry.fail_on_relay_error: error = AssertionError("Relay sent us event: " + get_error_message(event)) - sentry.test_failures.append(("/api/666/envelope/", error)) + sentry.test_failures.put(("/api/666/envelope/", error)) return jsonify({"event_id": uuid.uuid4().hex}) @@ -452,10 +463,10 @@ def fail(e): raise e def reraise_test_failures(): - if sentry.test_failures and sentry.reraise_test_failures: + if not sentry.test_failures.empty(): pytest.fail( "{n} exceptions happened in mini_sentry:\n\n{failures}".format( - n=len(sentry.test_failures), failures=sentry.format_failures() + n=sentry.test_failures.qsize(), failures=sentry.format_failures() ) ) diff --git a/tests/integration/fixtures/relay.py b/tests/integration/fixtures/relay.py index fc55f0d031..f0da584cb8 100644 --- a/tests/integration/fixtures/relay.py +++ b/tests/integration/fixtures/relay.py @@ -1,5 +1,6 @@ import json import os +from queue import Queue import sys import uuid import signal @@ -211,11 +212,11 @@ def inner( relay.wait_relay_health_check() # Filter out health check failures, which can happen during startup - mini_sentry.test_failures = [ - f - for f in mini_sentry.test_failures - if "Health check probe" not in str(f) - ] + filtered_test_failures = Queue() + for f in mini_sentry.current_test_failures(): + if "Health check probe" not in str(f): + filtered_test_failures.put(f) + mini_sentry.test_failures = filtered_test_failures return relay diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index d3e42def0f..e48de5c0b3 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -110,7 +110,7 @@ def get_project_config(): relay.shutdown(sig=signal.SIGINT) pytest.raises(queue.Empty, lambda: mini_sentry.captured_events.get(timeout=1)) - failures = mini_sentry.test_failures + failures = mini_sentry.current_test_failures() assert failures # we are expecting at least a dropped unfinished future error @@ -121,7 +121,7 @@ def get_project_config(): dropped_unfinished_error_found = True assert dropped_unfinished_error_found finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() @pytest.mark.parametrize("trailing_slash", [True, False]) diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index d75ce5519e..c2600df7cb 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -14,9 +14,9 @@ def test_invalid_kafka_config_should_fail(mini_sentry, relay_with_processing): relay = relay_with_processing(options=options, wait_health_check=False) assert relay.wait_for_exit() != 0 - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get_nowait()) assert "__unknown" in error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get_nowait()) assert "profiles" in error.lower() @@ -26,5 +26,5 @@ def test_invalid_topics_raise_error(mini_sentry, relay_with_processing): relay = relay_with_processing(options=options, wait_health_check=False) assert relay.wait_for_exit() != 0 - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get_nowait()) assert "failed to validate the topic with name" in error diff --git a/tests/integration/test_envelope.py b/tests/integration/test_envelope.py index da03da2c61..eff1805820 100644 --- a/tests/integration/test_envelope.py +++ b/tests/integration/test_envelope.py @@ -465,12 +465,12 @@ def get_project_config(): include_global = True # Clear errors because we log error when we request global config yet we dont receive it. - assert len(mini_sentry.test_failures) > 0 - assert {str(e) for _, e in mini_sentry.test_failures} == { + assert not mini_sentry.test_failures.empty() + assert {str(e) for _, e in mini_sentry.current_test_failures()} == { "Relay sent us event: global config missing in upstream response" } finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() envelopes = [] # Check that we received exactly {envelope_qty} envelopes. diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index a98179fc47..aed453f07a 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -53,7 +53,7 @@ def test_readiness(mini_sentry, relay): mini_sentry.app.view_functions["check_challenge"] = original_check_challenge relay.wait_relay_health_check() finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() response = relay.get("/api/relay/healthcheck/ready/") assert response.status_code == 200 @@ -69,7 +69,7 @@ def test_readiness_flag(mini_sentry, relay): response = wait_get(relay, "/api/relay/healthcheck/ready/") assert response.status_code == 200 finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_readiness_proxy(mini_sentry, relay): @@ -88,12 +88,11 @@ def test_readiness_not_enough_memory_bytes(mini_sentry, relay): ) response = wait_get(relay, "/api/relay/healthcheck/ready/") - time.sleep(1.0) # Wait for error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get(timeout=2)) assert "Not enough memory" in error and ">= 42" in error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get(timeout=1)) assert "Health check probe 'system memory'" in error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get(timeout=1)) assert "Health check probe 'spool health'" in error assert response.status_code == 503 @@ -105,12 +104,12 @@ def test_readiness_not_enough_memory_percent(mini_sentry, relay): wait_health_check=False, ) response = wait_get(relay, "/api/relay/healthcheck/ready/") - time.sleep(1.0) # Wait for error - error = str(mini_sentry.test_failures.pop(0)) + + error = str(mini_sentry.test_failures.get(timeout=2)) assert "Not enough memory" in error and ">= 1.00%" in error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get(timeout=1)) assert "Health check probe 'system memory'" in error - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get(timeout=1)) assert "Health check probe 'spool health'" in error assert response.status_code == 503 @@ -123,8 +122,8 @@ def test_readiness_depends_on_aggregator_being_full(mini_sentry, relay): ) response = wait_get(relay, "/api/relay/healthcheck/ready/") - time.sleep(0.3) # Wait for error - error = str(mini_sentry.test_failures.pop()) + + error = str(mini_sentry.test_failures.get(timeout=1)) assert "Health check probe 'aggregator'" in error assert response.status_code == 503 @@ -140,12 +139,11 @@ def test_readiness_depends_on_aggregator_being_full_after_metrics(mini_sentry, r for _ in range(100): response = wait_get(relay, "/api/relay/healthcheck/ready/") - print(response, response.status_code) if response.status_code == 503: - error = str(mini_sentry.test_failures.pop()) - assert "Health check probe 'aggregator'" in error - error = str(mini_sentry.test_failures.pop()) + error = str(mini_sentry.test_failures.get(timeout=1)) assert "aggregator limit exceeded" in error + error = str(mini_sentry.test_failures.get(timeout=1)) + assert "Health check probe 'aggregator'" in error return time.sleep(0.1) @@ -153,7 +151,7 @@ def test_readiness_depends_on_aggregator_being_full_after_metrics(mini_sentry, r def test_readiness_disk_spool(mini_sentry, relay): - mini_sentry.reraise_test_failures = False + mini_sentry.fail_on_relay_error = False temp = tempfile.mkdtemp() dbfile = os.path.join(temp, "buffer.db") diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 91e8730608..c168a1b2e2 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1182,7 +1182,7 @@ def test_transaction_metrics_not_extracted_on_unsupported_version( tx_consumer.assert_empty() if unsupported_version < TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION: - error = str(mini_sentry.test_failures.pop(0)) + error = str(mini_sentry.test_failures.get_nowait()) assert "Processing Relay outdated" in error metrics_consumer.assert_empty() @@ -1206,21 +1206,19 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): relay = relay(mini_sentry, options=TEST_CONFIG) relay.send_transaction(project_id, tx) - # The only envelopes received should be outcomes for {Span,Transaction}[Indexed]?: - reports = [mini_sentry.get_client_report() for _ in range(4)] + # The only envelopes received should be outcomes for Transaction{,Indexed}: + reports = [mini_sentry.get_client_report() for _ in range(2)] filtered_events = [ outcome for report in reports for outcome in report["filtered_events"] ] filtered_events.sort(key=lambda x: x["category"]) assert filtered_events == [ - {"reason": "release-version", "category": "span", "quantity": 2}, - {"reason": "release-version", "category": "span_indexed", "quantity": 2}, {"reason": "release-version", "category": "transaction", "quantity": 1}, {"reason": "release-version", "category": "transaction_indexed", "quantity": 1}, ] - assert mini_sentry.captured_events.empty + assert mini_sentry.captured_events.empty() def test_transaction_name_too_long( diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 99c143a376..f1f15910f2 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -254,7 +254,7 @@ def _send_event(relay, project_id=42, event_type="error", event_id=None, trace_i return event_id -@pytest.mark.parametrize("event_type", ["transaction"]) +@pytest.mark.parametrize("event_type", ["error", "transaction"]) def test_outcomes_non_processing(relay, mini_sentry, event_type): """ Test basic outcome functionality. @@ -272,12 +272,10 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type): [ DataCategory.TRANSACTION, DataCategory.TRANSACTION_INDEXED, - DataCategory.SPAN, - DataCategory.SPAN_INDEXED, ] if event_type == "transaction" - else [1] - ) # Error + else [DataCategory.ERROR] + ) outcomes = [] for _ in expected_categories: @@ -481,7 +479,7 @@ def test_outcome_forwarding( _send_event(downstream_relay, event_type=event_type) - expected_categories = [1] if event_type == "error" else [2, 9, 12, 16] + expected_categories = [1] if event_type == "error" else [2, 9] outcomes = outcomes_consumer.get_outcomes(n=len(expected_categories)) outcomes.sort(key=lambda x: x["category"]) @@ -754,7 +752,7 @@ def _get_span_payload(): "category,outcome_categories", [ ("session", []), - ("transaction", ["transaction", "transaction_indexed", "span", "span_indexed"]), + ("transaction", ["transaction", "transaction_indexed"]), ("user_report_v2", ["user_report_v2"]), ], ) @@ -771,10 +769,10 @@ def test_outcomes_rate_limit( relay = relay_with_processing(config) project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - reason_code = "banned" + reason_code = "transactions are banned" project_config["config"]["quotas"] = [ { - "id": "some_id", + "id": "transaction category", "categories": [category], "limit": 0, "window": 1600, @@ -1644,19 +1642,17 @@ def test_profile_outcomes_rate_limited( outcomes = outcomes_consumer.get_outcomes() outcomes.sort(key=lambda o: sorted(o.items())) - expected_categories_and_quantities = [ - (DataCategory.PROFILE, 1), - (DataCategory.PROFILE_INDEXED, 1), + expected_categories = [ + DataCategory.PROFILE, + DataCategory.PROFILE_INDEXED, ] # Profile, ProfileIndexed if quota_category == "transaction": # Transaction got rate limited as well: - expected_categories_and_quantities += [ - (DataCategory.TRANSACTION, 1), - (DataCategory.TRANSACTION_INDEXED, 1), - (DataCategory.SPAN, 2), - (DataCategory.SPAN_INDEXED, 2), + expected_categories += [ + DataCategory.TRANSACTION, + DataCategory.TRANSACTION_INDEXED, ] # Transaction, TransactionIndexed - expected_categories_and_quantities.sort() + expected_categories.sort() expected_outcomes = [ { @@ -1665,11 +1661,11 @@ def test_profile_outcomes_rate_limited( "org_id": 1, "outcome": 2, # RateLimited "project_id": 42, - "quantity": quantity, + "quantity": 1, "reason": "profiles_exceeded", "timestamp": time_within_delta(), } - for (category, quantity) in expected_categories_and_quantities + for category in expected_categories ] assert outcomes == expected_outcomes, outcomes @@ -1930,6 +1926,14 @@ def test_span_outcomes_invalid( # Create an envelope with an invalid profile: def make_envelope(): envelope = Envelope() + payload = _get_event_payload("transaction") + payload["spans"][0].pop("span_id", None) + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(payload).encode()), + type="transaction", + ) + ) payload = _get_span_payload() payload.pop("span_id", None) envelope.add_item( @@ -1943,7 +1947,7 @@ def make_envelope(): envelope = make_envelope() upstream.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=2) + outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=4) outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ @@ -1954,13 +1958,15 @@ def make_envelope(): "outcome": 3, # Invalid "project_id": 42, "quantity": 1, - "reason": "invalid_span", + "reason": reason, "source": "pop-relay", "timestamp": time_within_delta(), } - for category in [ - DataCategory.SPAN, - DataCategory.SPAN_INDEXED, + for (category, reason) in [ + (DataCategory.TRANSACTION, "invalid_transaction"), + (DataCategory.TRANSACTION_INDEXED, "invalid_transaction"), + (DataCategory.SPAN, "invalid_span"), + (DataCategory.SPAN_INDEXED, "invalid_span"), ] ] diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index 1d040c8d9b..0b54189bd9 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -260,11 +260,11 @@ def test_unparsable_project_config(mini_sentry, relay): def assert_clear_test_failures(): try: - assert {str(e) for _, e in mini_sentry.test_failures} == { + assert {str(e) for _, e in mini_sentry.current_test_failures()} == { f"Relay sent us event: error fetching project state {public_key}: invalid type: integer `99`, expected a string", } finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() # Event is not propagated, relay logs an error: relay.send_event(project_key) @@ -343,11 +343,11 @@ def test_cached_project_config(mini_sentry, relay): try: relay.send_event(project_key) time.sleep(0.5) - assert {str(e) for _, e in mini_sentry.test_failures} == { + assert {str(e) for _, e in mini_sentry.current_test_failures()} == { f"Relay sent us event: error fetching project state {public_key}: invalid type: integer `99`, expected a string", } finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_get_global_config(mini_sentry, relay): diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index 0da23c2b59..a934319c2c 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -38,7 +38,7 @@ def test_local_project_config(mini_sentry, relay): dsn_key = config["publicKeys"][0]["publicKey"] relay.wait_relay_health_check() - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() relay.send_event(project_id, dsn_key=dsn_key) event = mini_sentry.captured_events.get(timeout=1).get_event() @@ -148,11 +148,10 @@ def get_project_config(): assert event["logentry"] == {"formatted": "Hello, World!"} assert retry_count == 3 - if mini_sentry.test_failures: - for _, error in mini_sentry.test_failures: - assert isinstance(error, (socket.error, AssertionError)) + for _, error in mini_sentry.current_test_failures(): + assert isinstance(error, (socket.error, AssertionError)) finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_query_retry_maxed_out(mini_sentry, relay_with_processing, events_consumer): @@ -190,18 +189,18 @@ def get_project_config(): ) # No error messages yet - assert not mini_sentry.test_failures + assert mini_sentry.test_failures.empty() try: relay.send_event(42) time.sleep(query_timeout) assert request_count == 1 + RETRIES - assert {str(e) for _, e in mini_sentry.test_failures} == { + assert {str(e) for _, e in mini_sentry.current_test_failures()} == { "Relay sent us event: error fetching project states: upstream request returned error 500 Internal Server Error: no error details", } finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() @pytest.mark.parametrize("disabled", (True, False)) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index fe46a93ed7..bd883b5ed8 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -8,12 +8,10 @@ from requests import HTTPError from sentry_sdk.envelope import Envelope, Item, PayloadRef - from .consts import ( METRICS_EXTRACTION_MIN_SUPPORTED_VERSION, TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, ) -from .test_envelope import generate_transaction_item from .test_metrics import TEST_CONFIG from .test_store import make_transaction @@ -1268,84 +1266,6 @@ def summarize_outcomes(): assert usage_metrics() == (1, 2) -def test_rate_limit_spans_without_redis( - mini_sentry, - relay, -): - """Rate limits for total spans are enforced and no metrics are emitted.""" - relay = relay(mini_sentry, TEST_CONFIG) - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["features"] = [ - "projects:span-metrics-extraction", - ] - project_config["config"]["transactionMetrics"] = { - "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION - } - - project_config["config"]["quotas"] = [ - { - "categories": ["span_indexed"], - "limit": 0, - "window": int(datetime.now(UTC).timestamp()), - "id": uuid.uuid4(), - "reasonCode": "foo", - }, - ] - project_config["config"]["sampling"] = ( - { # Drop everything, to trigger metrics extraction - "version": 2, - "rules": [ - { - "id": 1, - "samplingValue": {"type": "sampleRate", "value": 0.0}, - "type": "transaction", - "condition": {"op": "and", "inner": []}, - } - ], - } - ) - - # Send an error event to populate the project cache: - relay.send_event(project_id) - envelope = mini_sentry.captured_events.get() - assert [item.type for item in envelope.items] == ["event"] - - with pytest.raises(HTTPError) as e: - relay.send_transaction(project_id, generate_transaction_item()) - assert ( - e.value.response.headers["x-sentry-rate-limits"] - == "60:span_indexed:organization:foo" - ) - - # Spans were rate limited - client_report = mini_sentry.get_client_report() - del client_report["timestamp"] - assert client_report == { - "discarded_events": [], - "rate_limited_events": [ - {"reason": "foo", "category": "span_indexed", "quantity": 2} - ], - } - - # Transaction was dynamically sampled - client_report = mini_sentry.get_client_report() - del client_report["timestamp"] - assert client_report == { - "discarded_events": [], - "filtered_sampling_events": [ - {"reason": "Sampled:0", "category": "transaction_indexed", "quantity": 1} - ], - } - - # Metrics were received regardless - metrics = mini_sentry.get_metrics() - assert any(metric["name"] == "c:spans/usage@none" for metric in metrics) - - # Nothing else was sent upstream - assert mini_sentry.captured_events.empty() - - @pytest.mark.parametrize( "tags, expected_tags", [ diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index fc139dec54..8056d9407d 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -193,7 +193,7 @@ def configure_static_project(dir): assert event["logentry"] == {"formatted": "Hello, World!"} sleep(1) # Regression test: Relay tried to issue a request for 0 states - if mini_sentry.test_failures: + if not mini_sentry.test_failures.empty(): raise AssertionError( f"Exceptions happened in mini_sentry: {mini_sentry.format_failures()}" ) @@ -230,7 +230,7 @@ def test_store_with_low_memory(mini_sentry, relay): pytest.raises(queue.Empty, lambda: mini_sentry.captured_events.get(timeout=1)) found_queue_error = False - for _, error in mini_sentry.test_failures: + for _, error in mini_sentry.current_test_failures(): assert isinstance(error, AssertionError) if "failed to queue envelope" in str(error): found_queue_error = True @@ -238,7 +238,7 @@ def test_store_with_low_memory(mini_sentry, relay): assert found_queue_error finally: - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_store_max_concurrent_requests(mini_sentry, relay): @@ -954,7 +954,7 @@ def server_error(*args, **kwargs): assert event["logentry"] == {"formatted": "Hello, World!"} finally: # Relay reports authentication errors, which is fine. - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_events_are_retried(relay, mini_sentry): @@ -1181,7 +1181,7 @@ def counted_check_challenge(*args, **kwargs): evt.clear() assert evt.wait(2) # clear authentication errors accumulated until now - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() # check that we have had some auth that succeeded auth_count_3 = counter[0] assert auth_count_2 < auth_count_3 @@ -1250,7 +1250,7 @@ def counted_check_challenge(*args, **kwargs): # to be sure verify that we have only been called once (after failing) assert counter[1] == 1 # clear authentication errors accumulated until now - mini_sentry.test_failures.clear() + mini_sentry.clear_test_failures() def test_buffer_events_during_outage(relay, mini_sentry):