Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
carve out the limiters and sinks sub-modules (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Dec 13, 2023
1 parent 454b96d commit 1a7e87a
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 89 deletions.
4 changes: 2 additions & 2 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
Expand Down
12 changes: 6 additions & 6 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn it_captures_a_batch() -> Result<()> {
}

#[tokio::test]
async fn it_is_limited_with_burst() -> Result<()> {
async fn it_overflows_events_on_burst() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -87,8 +87,8 @@ async fn it_is_limited_with_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down Expand Up @@ -125,7 +125,7 @@ async fn it_is_limited_with_burst() -> Result<()> {
}

#[tokio::test]
async fn it_does_not_partition_limit_different_ids() -> Result<()> {
async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -136,8 +136,8 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down
6 changes: 3 additions & 3 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use metrics::counter;
use time::OffsetDateTime;
use tracing::instrument;

use crate::billing_limits::QuotaResource;
use crate::event::{Compression, ProcessingContext};
use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
router, sink,
router, sinks,
utils::uuid_v7,
};

Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureEr

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sink::EventSink + Send + Sync>,
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
Expand Down
4 changes: 2 additions & 2 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct Config {
pub otel_url: Option<String>,

#[envconfig(default = "100")]
pub per_second_limit: NonZeroU32,
pub overflow_per_second_limit: NonZeroU32,

#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,
pub overflow_burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys

Expand Down
5 changes: 2 additions & 3 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
pub mod api;
pub mod billing_limits;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod limiters;
pub mod prometheus;
pub mod redis;
pub mod router;
pub mod server;
pub mod sink;
pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
use time::Duration;

use crate::{
billing_limits::{BillingLimiter, QuotaResource},
limiters::billing::{BillingLimiter, QuotaResource},
redis::MockRedisClient,
};

Expand Down
2 changes: 2 additions & 0 deletions capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod billing;
pub mod overflow;
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/// When a customer is writing too often to the same key, we get hot partitions. This negatively
/// affects our write latency and cluster health. We try to provide ordering guarantees wherever
/// possible, but this does require that we map key -> partition.
/// The analytics ingestion pipeline provides ordering guarantees for events of the same
/// token and distinct_id. We currently achieve this through a locality constraint on the
/// Kafka partition (consistent partition hashing through a computed key).
///
/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
/// Volume spikes to a given key can create lag on the destination partition and induce
/// ingestion lag. To protect the downstream systems, capture can relax this locality
/// constraint when bursts are detected. When that happens, the excess traffic will be
/// spread across all partitions and be processed by the overflow consumer, without
/// strict ordering guarantees.
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::sync::Arc;
Expand All @@ -16,12 +17,12 @@ use rand::Rng;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
pub struct OverflowLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
forced_keys: HashSet<String>,
}

impl PartitionLimiter {
impl OverflowLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option<String>) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));
Expand All @@ -31,7 +32,7 @@ impl PartitionLimiter {
Some(values) => values.split(',').map(String::from).collect(),
};

PartitionLimiter {
OverflowLimiter {
limiter,
forced_keys,
}
Expand Down Expand Up @@ -71,12 +72,12 @@ impl PartitionLimiter {

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use std::num::NonZeroU32;

#[tokio::test]
async fn low_limits() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
None,
Expand All @@ -89,7 +90,7 @@ mod tests {

#[tokio::test]
async fn bursting() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(3).unwrap(),
None,
Expand All @@ -109,7 +110,7 @@ mod tests {
let key_three = String::from("three");
let forced_keys = Some(String::from("one,three"));

let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
forced_keys,
Expand Down
6 changes: 3 additions & 3 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource};
use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sink::EventSink + Send + Sync>,
pub sink: Arc<dyn sinks::Event + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
Expand All @@ -28,7 +28,7 @@ async fn index() -> &'static str {

pub fn router<
TZ: TimeSource + Send + Sync + 'static,
S: sink::EventSink + Send + Sync + 'static,
S: sinks::Event + Send + Sync + 'static,
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
Expand Down
22 changes: 10 additions & 12 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::sync::Arc;

use time::Duration;

use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::limiters::billing::BillingLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};
use crate::router;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -34,7 +36,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
sink::PrintSink {},
PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -44,9 +46,9 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = PartitionLimiter::new(
config.per_second_limit,
config.burst_limit,
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
Expand All @@ -55,18 +57,14 @@ where
partition.report_metrics().await;
});
}

{
// Ensure that the rate limiter state does not grow unbounded

let partition = partition.clone();

tokio::spawn(async move {
partition.clean_state().await;
});
}

let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down
51 changes: 11 additions & 40 deletions capture/src/sink.rs → capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Duration;

use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand All @@ -15,38 +14,9 @@ use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
use crate::sinks::Event;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -113,14 +83,14 @@ impl rdkafka::ClientContext for KafkaContext {
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: PartitionLimiter,
partition: OverflowLimiter,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: PartitionLimiter,
partition: OverflowLimiter,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -234,7 +204,7 @@ impl KafkaSink {
}

#[async_trait]
impl EventSink for KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
Expand Down Expand Up @@ -294,8 +264,9 @@ mod tests {
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::Event;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand All @@ -310,7 +281,7 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
Expand Down
13 changes: 13 additions & 0 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;

pub mod kafka;
pub mod print;

#[async_trait]
pub trait Event {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
Loading

0 comments on commit 1a7e87a

Please sign in to comment.