Skip to content

Commit

Permalink
ref(quotas): Cache and enforce indexed quotas in processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Oct 4, 2024
1 parent 5513190 commit d288d84
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 172 deletions.
65 changes: 9 additions & 56 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ impl RateLimits {
///
/// If no limits or quotas match, then the returned `RateLimits` instance evalutes `is_ok`.
/// Otherwise, it contains rate limits that match the given scoping.
pub fn check_with_quotas(&self, quotas: &[Quota], scoping: ItemScoping<'_>) -> Self {
pub fn check_with_quotas<'a>(
&self,
quotas: impl IntoIterator<Item = &'a Quota>,
scoping: ItemScoping<'_>,
) -> Self {
let mut applied_limits = Self::new();

for quota in quotas {
Expand Down Expand Up @@ -396,13 +400,7 @@ impl<'a> IntoIterator for &'a RateLimits {
/// Like [`RateLimits`], a collection of scoped rate limits but with all the checks
/// necessary to cache the limits.
///
/// The data structure makes sure no expired rate limits are enforced as well
/// as removing any indexed rate limit.
///
/// Cached rate limits don't enforce indexed rate limits because at the time of the check
/// the decision whether an envelope is sampled or not is not yet known. Additionally
/// even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
/// and cannot be dropped too early.
/// The data structure makes sure no expired rate limits are enforced.
#[derive(Debug, Default)]
pub struct CachedRateLimits(RateLimits);

Expand All @@ -415,13 +413,7 @@ impl CachedRateLimits {
/// Adds a limit to this collection.
///
/// See also: [`RateLimits::add`].
pub fn add(&mut self, mut limit: RateLimit) {
if !limit.categories.is_empty() {
limit.categories.retain(|category| !category.is_indexed());
if limit.categories.is_empty() {
return;
}
}
pub fn add(&mut self, limit: RateLimit) {
self.0.add(limit);
}

Expand Down Expand Up @@ -1083,7 +1075,7 @@ mod tests {
});

rate_limits1.add(RateLimit {
categories: smallvec![DataCategory::Transaction],
categories: smallvec![DataCategory::TransactionIndexed],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: RetryAfter::from_secs(1),
Expand Down Expand Up @@ -1114,7 +1106,7 @@ mod tests {
),
RateLimit(
categories: [
transaction,
transaction_indexed,
],
scope: Organization(42),
reason_code: None,
Expand Down Expand Up @@ -1166,43 +1158,4 @@ mod tests {
)
"###);
}

#[test]
fn test_cached_rate_limits_indexed() {
let mut cached = CachedRateLimits::new();

cached.add(RateLimit {
categories: smallvec![DataCategory::Transaction, DataCategory::TransactionIndexed],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: RetryAfter::from_secs(5),
namespaces: smallvec![],
});

cached.add(RateLimit {
categories: smallvec![DataCategory::TransactionIndexed],
scope: RateLimitScope::Project(ProjectId::new(21)),
reason_code: None,
retry_after: RetryAfter::from_secs(5),
namespaces: smallvec![],
});

let rate_limits = cached.current_limits();

insta::assert_ron_snapshot!(rate_limits, @r###"
RateLimits(
limits: [
RateLimit(
categories: [
transaction,
],
scope: Organization(42),
reason_code: None,
retry_after: RetryAfter(5),
namespaces: [],
),
],
)
"###);
}
}
158 changes: 115 additions & 43 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use smallvec::{smallvec, SmallVec};
#[cfg(feature = "processing")]
use {
crate::services::store::{Store, StoreEnvelope},
crate::utils::{sample, Enforcement, EnvelopeLimiter, ItemAction},
crate::utils::{sample, ApplyLimits, Enforcement, EnvelopeLimiter, ItemAction},
itertools::Itertools,
relay_cardinality::{
CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
Expand Down Expand Up @@ -722,6 +722,10 @@ struct ProcessEnvelopeState<'a, Group> {
/// The state of the project that this envelope belongs to.
project_info: Arc<ProjectInfo>,

/// Currently active cached rate limits of the project this envelope belongs to.
#[cfg_attr(not(feature = "processing"), expect(dead_code))]
rate_limits: RateLimits,

/// The config of this Relay instance.
config: Arc<Config>,

Expand Down Expand Up @@ -832,9 +836,15 @@ pub struct ProcessEnvelopeResponse {
/// - Rate limiters and inbound filters on events in processing mode.
#[derive(Debug)]
pub struct ProcessEnvelope {
/// Envelope to process.
pub envelope: ManagedEnvelope,
/// The project info.
pub project_info: Arc<ProjectInfo>,
/// Currently active cached rate limits for this project.
pub rate_limits: RateLimits,
/// Root sampling project info.
pub sampling_project_info: Option<Arc<ProjectInfo>>,
/// Sampling reservoir counters.
pub reservoir_counters: ReservoirCounters,
}

Expand Down Expand Up @@ -1251,6 +1261,7 @@ impl EnvelopeProcessorService {
mut managed_envelope: TypedEnvelope<G>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
rate_limits: RateLimits,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir_counters: Arc<Mutex<BTreeMap<RuleId, i64>>>,
) -> ProcessEnvelopeState<G> {
Expand Down Expand Up @@ -1291,6 +1302,7 @@ impl EnvelopeProcessorService {
metrics: Metrics::default(),
extracted_metrics,
project_info,
rate_limits,
config,
sampling_project_info,
project_id,
Expand All @@ -1305,55 +1317,41 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState<G>,
) -> Result<(), ProcessingError> {
let rate_limiter = match self.inner.rate_limiter.as_ref() {
Some(rate_limiter) => rate_limiter,
None => return Ok(()),
};
self.enforce_cached_quotas(state)?;
self.enforce_consistend_quotas(state)?;
Ok(())
}

let project_info = &state.project_info;
#[cfg(feature = "processing")]
fn enforce_cached_quotas<G>(
&self,
state: &mut ProcessEnvelopeState<G>,
) -> Result<(), ProcessingError> {
let global_config = self.inner.global_config.current();
let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());

if quotas.is_empty() {
return Ok(());
}

let event_category = state.event_category();
let _ = RateLimiter::Cached.enforce(&global_config, state)?;

// When invoking the rate limiter, capture if the event item has been rate limited to also
// remove it from the processing state eventually.
let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, quantity| {
rate_limiter.is_rate_limited(quotas, item_scope, quantity, false)
});

// Tell the envelope limiter about the event, since it has been removed from the Envelope at
// this stage in processing.
if let Some(category) = event_category {
envelope_limiter.assume_event(category);
}
Ok(())
}

let scoping = state.managed_envelope.scoping();
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.is_event_active();
#[cfg(feature = "processing")]
fn enforce_consistend_quotas<G>(
&self,
state: &mut ProcessEnvelopeState<G>,
) -> Result<(), ProcessingError> {
let global_config = self.inner.global_config.current();
let rate_limiter = match self.inner.rate_limiter.as_ref() {
Some(rate_limiter) => rate_limiter,
None => return Ok(()),
};

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.apply_enforcement(&enforcement);
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if event_active {
state.remove_event();
debug_assert!(state.envelope().is_empty());
}
let limits = RateLimiter::Consistent(rate_limiter).enforce(&global_config, state)?;

// Update cached rate limits with the freshly computed ones.
if !limits.is_empty() {
self.inner
.addrs
.project_cache
.send(UpdateRateLimits::new(scoping.project_key, limits));
self.inner.addrs.project_cache.send(UpdateRateLimits::new(
state.managed_envelope.scoping().project_key,
limits,
));
}

Ok(())
Expand Down Expand Up @@ -1748,6 +1746,13 @@ impl EnvelopeProcessorService {
// Always extract metrics in processing Relays for sampled items.
self.extract_transaction_metrics(state, SamplingDecision::Keep, profile_id)?;

// Enforce cached quotas here, because it is relatively cheap and can prevent
// us from extracting spans when it's not necessary.
//
// But do it *after* metric extraction to make sure we do not drop due to an indexed
// quota and never extract any metrics.
self.enforce_cached_quotas(state)?;

if state
.project_info
.has_feature(Feature::ExtractSpansFromEvent)
Expand Down Expand Up @@ -1881,6 +1886,7 @@ impl EnvelopeProcessorService {
mut managed_envelope: ManagedEnvelope,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
rate_limits: RateLimits,
sampling_project_info: Option<Arc<ProjectInfo>>,
reservoir_counters: Arc<Mutex<BTreeMap<RuleId, i64>>>,
) -> Result<ProcessingStateResult, ProcessingError> {
Expand All @@ -1905,6 +1911,7 @@ impl EnvelopeProcessorService {
managed_envelope,
project_id,
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
);
Expand Down Expand Up @@ -1981,6 +1988,7 @@ impl EnvelopeProcessorService {
let ProcessEnvelope {
envelope: mut managed_envelope,
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
} = message;
Expand Down Expand Up @@ -2029,6 +2037,7 @@ impl EnvelopeProcessorService {
managed_envelope,
project_id,
project_info,
rate_limits,
sampling_project_info,
reservoir_counters,
) {
Expand Down Expand Up @@ -2906,6 +2915,67 @@ impl Service for EnvelopeProcessorService {
}
}

#[cfg(feature = "processing")]
enum RateLimiter<'a> {
Cached,
Consistent(&'a RedisRateLimiter),
}

#[cfg(feature = "processing")]
impl<'a> RateLimiter<'a> {
fn enforce<G>(
&self,
global_config: &GlobalConfig,
state: &mut ProcessEnvelopeState<G>,
) -> Result<RateLimits, ProcessingError> {
if state.envelope().is_empty() {
return Ok(RateLimits::default());
}

let quotas = CombinedQuotas::new(global_config, state.project_info.get_quotas());
if quotas.is_empty() {
return Ok(RateLimits::default());
}

let event_category = state.event_category();

// When invoking the rate limiter, capture if the event item has been rate limited to also
// remove it from the processing state eventually.
let mut envelope_limiter =
EnvelopeLimiter::new(ApplyLimits::All, |item_scope, quantity| match self {
RateLimiter::Cached => Ok(state.rate_limits.check_with_quotas(quotas, item_scope)),
RateLimiter::Consistent(rl) => Ok::<_, ProcessingError>(
rl.is_rate_limited(quotas, item_scope, quantity, false)?,
),
});

// Tell the envelope limiter about the event, since it has been removed from the Envelope at
// this stage in processing.
if let Some(category) = event_category {
envelope_limiter.assume_event(category);
}

let scoping = state.managed_envelope.scoping();
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter.compute(state.managed_envelope.envelope_mut(), &scoping)?
});
let event_active = enforcement.is_event_active();

// Use the same rate limits as used for the envelope on the metrics.
// Those rate limits should not be checked for expiry or similar to ensure a consistent
// limiting of envelope items and metrics.
state.extracted_metrics.apply_enforcement(&enforcement);
enforcement.apply_with_outcomes(&mut state.managed_envelope);

if event_active {
state.remove_event();
debug_assert!(state.envelope().is_empty());
}

Ok(limits)
}
}

fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
let envelope_body: Vec<u8> = match http_encoding {
HttpEncoding::Identity => return Ok(body.clone()),
Expand Down Expand Up @@ -3463,6 +3533,7 @@ mod tests {
let message = ProcessEnvelope {
envelope,
project_info: Arc::new(project_info),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
Expand Down Expand Up @@ -3525,6 +3596,7 @@ mod tests {
let process_message = ProcessEnvelope {
envelope: managed_envelope,
project_info: Arc::new(ProjectInfo::default()),
rate_limits: Default::default(),
sampling_project_info: None,
reservoir_counters: ReservoirCounters::default(),
};
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ mod tests {
let message = ProcessEnvelope {
envelope: ManagedEnvelope::new(envelope, outcome_aggregator, test_store, group),
project_info: Arc::new(ProjectInfo::default()),
rate_limits: Default::default(),
sampling_project_info,
reservoir_counters: ReservoirCounters::default(),
};
Expand Down Expand Up @@ -438,6 +439,7 @@ mod tests {
extracted_metrics: ProcessingExtractedMetrics::new(),
config: config.clone(),
project_info,
rate_limits: Default::default(),
sampling_project_info: None,
project_id: ProjectId::new(42),
managed_envelope: ManagedEnvelope::new(
Expand Down Expand Up @@ -709,6 +711,7 @@ mod tests {
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
project_info,
rate_limits: Default::default(),
sampling_project_info: {
let mut state = ProjectInfo::default();
state.config.metric_extraction =
Expand Down
Loading

0 comments on commit d288d84

Please sign in to comment.