From 1a7e87aa267aa36d6988a186a8cb4f7851582e15 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 13 Dec 2023 15:33:45 +0100 Subject: [PATCH] carve out the limiters and sinks sub-modules (#70) --- capture-server/tests/common.rs | 4 +- capture-server/tests/events.rs | 12 ++--- capture/src/capture.rs | 6 +-- capture/src/config.rs | 4 +- capture/src/lib.rs | 5 +- .../billing.rs} | 2 +- capture/src/limiters/mod.rs | 2 + .../overflow.rs} | 29 ++++++----- capture/src/router.rs | 6 +-- capture/src/server.rs | 22 ++++---- capture/src/{sink.rs => sinks/kafka.rs} | 51 ++++--------------- capture/src/sinks/mod.rs | 13 +++++ capture/src/sinks/print.rs | 31 +++++++++++ capture/tests/django_compat.rs | 6 +-- 14 files changed, 104 insertions(+), 89 deletions(-) rename capture/src/{billing_limits.rs => limiters/billing.rs} (99%) create mode 100644 capture/src/limiters/mod.rs rename capture/src/{partition_limits.rs => limiters/overflow.rs} (80%) rename capture/src/{sink.rs => sinks/kafka.rs} (91%) create mode 100644 capture/src/sinks/mod.rs create mode 100644 capture/src/sinks/print.rs diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index ce31897..214ecc8 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -28,8 +28,8 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), - burst_limit: NonZeroU32::new(5).unwrap(), - per_second_limit: NonZeroU32::new(10).unwrap(), + overflow_burst_limit: NonZeroU32::new(5).unwrap(), + overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible diff --git a/capture-server/tests/events.rs b/capture-server/tests/events.rs index b38ac5a..56fcdf7 100644 --- a/capture-server/tests/events.rs +++ b/capture-server/tests/events.rs @@ -77,7 +77,7 @@ async fn it_captures_a_batch() -> Result<()> { } #[tokio::test] -async fn it_is_limited_with_burst() -> Result<()> { +async fn it_overflows_events_on_burst() -> Result<()> { setup_tracing(); let token = random_string("token", 16); @@ -87,8 +87,8 @@ async fn it_is_limited_with_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); - config.burst_limit = NonZeroU32::new(2).unwrap(); - config.per_second_limit = NonZeroU32::new(1).unwrap(); + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config); @@ -125,7 +125,7 @@ async fn it_is_limited_with_burst() -> Result<()> { } #[tokio::test] -async fn it_does_not_partition_limit_different_ids() -> Result<()> { +async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { setup_tracing(); let token = random_string("token", 16); @@ -136,8 +136,8 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); - config.burst_limit = NonZeroU32::new(1).unwrap(); - config.per_second_limit = NonZeroU32::new(1).unwrap(); + config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); let server = ServerHandle::for_config(config); diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 37e2872..6a90378 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -15,14 +15,14 @@ use metrics::counter; use time::OffsetDateTime; use tracing::instrument; -use crate::billing_limits::QuotaResource; use crate::event::{Compression, ProcessingContext}; +use crate::limiters::billing::QuotaResource; use crate::prometheus::report_dropped_events; use crate::token::validate_token; use crate::{ api::{CaptureError, CaptureResponse, CaptureResponseCode}, event::{EventFormData, EventQuery, ProcessedEvent, RawEvent}, - router, sink, + router, sinks, utils::uuid_v7, }; @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result( - sink: Arc, + sink: Arc, events: &'a [RawEvent], context: &'a ProcessingContext, ) -> Result<(), CaptureError> { diff --git a/capture/src/config.rs b/capture/src/config.rs index 69a085d..0c6ab1c 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -14,10 +14,10 @@ pub struct Config { pub otel_url: Option, #[envconfig(default = "100")] - pub per_second_limit: NonZeroU32, + pub overflow_per_second_limit: NonZeroU32, #[envconfig(default = "1000")] - pub burst_limit: NonZeroU32, + pub overflow_burst_limit: NonZeroU32, pub overflow_forced_keys: Option, // Coma-delimited keys diff --git a/capture/src/lib.rs b/capture/src/lib.rs index eea915c..058e994 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -1,15 +1,14 @@ pub mod api; -pub mod billing_limits; pub mod capture; pub mod config; pub mod event; pub mod health; -pub mod partition_limits; +pub mod limiters; pub mod prometheus; pub mod redis; pub mod router; pub mod server; -pub mod sink; +pub mod sinks; pub mod time; pub mod token; pub mod utils; diff --git a/capture/src/billing_limits.rs b/capture/src/limiters/billing.rs similarity index 99% rename from capture/src/billing_limits.rs rename to capture/src/limiters/billing.rs index 9fa0fdd..b908519 100644 --- a/capture/src/billing_limits.rs +++ b/capture/src/limiters/billing.rs @@ -166,7 +166,7 @@ mod tests { use time::Duration; use crate::{ - billing_limits::{BillingLimiter, QuotaResource}, + limiters::billing::{BillingLimiter, QuotaResource}, redis::MockRedisClient, }; diff --git a/capture/src/limiters/mod.rs b/capture/src/limiters/mod.rs new file mode 100644 index 0000000..58b2dcc --- /dev/null +++ b/capture/src/limiters/mod.rs @@ -0,0 +1,2 @@ +pub mod billing; +pub mod overflow; diff --git a/capture/src/partition_limits.rs b/capture/src/limiters/overflow.rs similarity index 80% rename from capture/src/partition_limits.rs rename to capture/src/limiters/overflow.rs index 7059b45..0e91a99 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/limiters/overflow.rs @@ -1,11 +1,12 @@ -/// When a customer is writing too often to the same key, we get hot partitions. This negatively -/// affects our write latency and cluster health. We try to provide ordering guarantees wherever -/// possible, but this does require that we map key -> partition. +/// The analytics ingestion pipeline provides ordering guarantees for events of the same +/// token and distinct_id. We currently achieve this through a locality constraint on the +/// Kafka partition (consistent partition hashing through a computed key). /// -/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition -/// before it causes a negative impact. In this case, instead of passing the error to the customer -/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the -/// customers data will be spread across all partitions. +/// Volume spikes to a given key can create lag on the destination partition and induce +/// ingestion lag. To protect the downstream systems, capture can relax this locality +/// constraint when bursts are detected. When that happens, the excess traffic will be +/// spread across all partitions and be processed by the overflow consumer, without +/// strict ordering guarantees. use std::collections::HashSet; use std::num::NonZeroU32; use std::sync::Arc; @@ -16,12 +17,12 @@ use rand::Rng; // See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads #[derive(Clone)] -pub struct PartitionLimiter { +pub struct OverflowLimiter { limiter: Arc, clock::DefaultClock>>, forced_keys: HashSet, } -impl PartitionLimiter { +impl OverflowLimiter { pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option) -> Self { let quota = Quota::per_second(per_second).allow_burst(burst); let limiter = Arc::new(governor::RateLimiter::dashmap(quota)); @@ -31,7 +32,7 @@ impl PartitionLimiter { Some(values) => values.split(',').map(String::from).collect(), }; - PartitionLimiter { + OverflowLimiter { limiter, forced_keys, } @@ -71,12 +72,12 @@ impl PartitionLimiter { #[cfg(test)] mod tests { - use crate::partition_limits::PartitionLimiter; + use crate::limiters::overflow::OverflowLimiter; use std::num::NonZeroU32; #[tokio::test] async fn low_limits() { - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), None, @@ -89,7 +90,7 @@ mod tests { #[tokio::test] async fn bursting() { - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap(), None, @@ -109,7 +110,7 @@ mod tests { let key_three = String::from("three"); let forced_keys = Some(String::from("one,three")); - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), forced_keys, diff --git a/capture/src/router.rs b/capture/src/router.rs index 6f2f044..d02e63f 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -10,13 +10,13 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; use crate::health::HealthRegistry; -use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource}; +use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource}; use crate::prometheus::{setup_metrics_recorder, track_metrics}; #[derive(Clone)] pub struct State { - pub sink: Arc, + pub sink: Arc, pub timesource: Arc, pub redis: Arc, pub billing: BillingLimiter, @@ -28,7 +28,7 @@ async fn index() -> &'static str { pub fn router< TZ: TimeSource + Send + Sync + 'static, - S: sink::EventSink + Send + Sync + 'static, + S: sinks::Event + Send + Sync + 'static, R: Client + Send + Sync + 'static, >( timesource: TZ, diff --git a/capture/src/server.rs b/capture/src/server.rs index 9b8d60c..22a1f3b 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -4,12 +4,14 @@ use std::sync::Arc; use time::Duration; -use crate::billing_limits::BillingLimiter; use crate::config::Config; use crate::health::{ComponentStatus, HealthRegistry}; -use crate::partition_limits::PartitionLimiter; +use crate::limiters::billing::BillingLimiter; +use crate::limiters::overflow::OverflowLimiter; use crate::redis::RedisClient; -use crate::{router, sink}; +use crate::router; +use crate::sinks::kafka::KafkaSink; +use crate::sinks::print::PrintSink; pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where @@ -34,7 +36,7 @@ where router::router( crate::time::SystemTime {}, liveness, - sink::PrintSink {}, + PrintSink {}, redis_client, billing, config.export_prometheus, @@ -44,9 +46,9 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = PartitionLimiter::new( - config.per_second_limit, - config.burst_limit, + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, config.overflow_forced_keys, ); if config.export_prometheus { @@ -55,18 +57,14 @@ where partition.report_metrics().await; }); } - { // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { partition.clean_state().await; }); } - - let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) + let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); router::router( diff --git a/capture/src/sink.rs b/capture/src/sinks/kafka.rs similarity index 91% rename from capture/src/sink.rs rename to capture/src/sinks/kafka.rs index af83e20..dc57c11 100644 --- a/capture/src/sink.rs +++ b/capture/src/sinks/kafka.rs @@ -2,11 +2,10 @@ use std::time::Duration; use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; -use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; -use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; -use rdkafka::producer::{DeliveryFuture, Producer}; +use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; +use rdkafka::ClientConfig; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; @@ -15,38 +14,9 @@ use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::event::ProcessedEvent; use crate::health::HealthHandle; -use crate::partition_limits::PartitionLimiter; +use crate::limiters::overflow::OverflowLimiter; use crate::prometheus::report_dropped_events; - -#[async_trait] -pub trait EventSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; -} - -pub struct PrintSink {} - -#[async_trait] -impl EventSink for PrintSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - info!("single event: {:?}", event); - counter!("capture_events_ingested_total", 1); - - Ok(()) - } - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { - let span = tracing::span!(tracing::Level::INFO, "batch of events"); - let _enter = span.enter(); - - histogram!("capture_event_batch_size", events.len() as f64); - counter!("capture_events_ingested_total", events.len() as u64); - for event in events { - info!("event: {:?}", event); - } - - Ok(()) - } -} +use crate::sinks::Event; struct KafkaContext { liveness: HealthHandle, @@ -113,14 +83,14 @@ impl rdkafka::ClientContext for KafkaContext { pub struct KafkaSink { producer: FutureProducer, topic: String, - partition: PartitionLimiter, + partition: OverflowLimiter, } impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: PartitionLimiter, + partition: OverflowLimiter, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -234,7 +204,7 @@ impl KafkaSink { } #[async_trait] -impl EventSink for KafkaSink { +impl Event for KafkaSink { #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { let limited = self.partition.is_limited(&event.key()); @@ -294,8 +264,9 @@ mod tests { use crate::config; use crate::event::ProcessedEvent; use crate::health::HealthRegistry; - use crate::partition_limits::PartitionLimiter; - use crate::sink::{EventSink, KafkaSink}; + use crate::limiters::overflow::OverflowLimiter; + use crate::sinks::kafka::KafkaSink; + use crate::sinks::Event; use crate::utils::uuid_v7; use rand::distributions::Alphanumeric; use rand::Rng; @@ -310,7 +281,7 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = PartitionLimiter::new( + let limiter = OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, diff --git a/capture/src/sinks/mod.rs b/capture/src/sinks/mod.rs new file mode 100644 index 0000000..0747f0e --- /dev/null +++ b/capture/src/sinks/mod.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; + +pub mod kafka; +pub mod print; + +#[async_trait] +pub trait Event { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; +} diff --git a/capture/src/sinks/print.rs b/capture/src/sinks/print.rs new file mode 100644 index 0000000..50bc1ad --- /dev/null +++ b/capture/src/sinks/print.rs @@ -0,0 +1,31 @@ +use async_trait::async_trait; +use metrics::{counter, histogram}; +use tracing::log::info; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; +use crate::sinks::Event; + +pub struct PrintSink {} + +#[async_trait] +impl Event for PrintSink { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { + info!("single event: {:?}", event); + counter!("capture_events_ingested_total", 1); + + Ok(()) + } + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { + let span = tracing::span!(tracing::Level::INFO, "batch of events"); + let _enter = span.enter(); + + histogram!("capture_event_batch_size", events.len() as f64); + counter!("capture_events_ingested_total", events.len() as u64); + for event in events { + info!("event: {:?}", event); + } + + Ok(()) + } +} diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index d1d075b..5d77899 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -5,12 +5,12 @@ use axum_test_helper::TestClient; use base64::engine::general_purpose; use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; -use capture::billing_limits::BillingLimiter; use capture::event::ProcessedEvent; use capture::health::HealthRegistry; +use capture::limiters::billing::BillingLimiter; use capture::redis::MockRedisClient; use capture::router::router; -use capture::sink::EventSink; +use capture::sinks::Event; use capture::time::TimeSource; use serde::Deserialize; use serde_json::{json, Value}; @@ -61,7 +61,7 @@ impl MemorySink { } #[async_trait] -impl EventSink for MemorySink { +impl Event for MemorySink { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { self.events.lock().unwrap().push(event); Ok(())