Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat(capture): send historical_migration batches to separate topic #30

Merged
merged 9 commits into from
Apr 30, 2024
Merged
6 changes: 4 additions & 2 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
Copy link
Contributor Author

@xvello xvello Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need a better story around specifying 10 topics over several clusters, but that's for future us

kafka_tls: false,
},
otel_url: None,
Expand All @@ -61,9 +62,10 @@ pub struct ServerHandle {
}

impl ServerHandle {
pub async fn for_topic(topic: &EphemeralTopic) -> Self {
pub async fn for_topics(main: &EphemeralTopic, historical: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.kafka.kafka_topic = main.topic_name().to_string();
config.kafka.kafka_historical_topic = historical.topic_name().to_string();
Self::for_config(config).await
}
pub async fn for_config(config: Config) -> Self {
Expand Down
112 changes: 100 additions & 12 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ async fn it_captures_one_event() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
Expand All @@ -24,7 +26,7 @@ async fn it_captures_one_event() -> Result<()> {
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

let event = topic.next_event()?;
let event = main_topic.next_event()?;
assert_json_include!(
actual: event,
expected: json!({
Expand All @@ -37,14 +39,15 @@ async fn it_captures_one_event() -> Result<()> {
}

#[tokio::test]
async fn it_captures_a_batch() -> Result<()> {
async fn it_captures_a_posthogjs_array() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;
let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!([{
"token": token,
Expand All @@ -59,14 +62,98 @@ async fn it_captures_a_batch() -> Result<()> {
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
})
);

Ok(())
}

#[tokio::test]
async fn it_captures_a_batch() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
"batch": [{
"event": "event1",
"distinct_id": distinct_id1
},{
"event": "event2",
"distinct_id": distinct_id2
}]
});
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
})
);

Ok(())
}
#[tokio::test]
async fn it_captures_a_historical_batch() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
"historical_migration": true,
xvello marked this conversation as resolved.
Show resolved Hide resolved
"batch": [{
"event": "event1",
"distinct_id": distinct_id1
},{
"event": "event2",
"distinct_id": distinct_id2
}]
});
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: histo_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: histo_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
Expand Down Expand Up @@ -175,8 +262,9 @@ async fn it_trims_distinct_id() -> Result<()> {
let distinct_id2 = random_string("id", 222);
let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;
let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!([{
"token": token,
Expand All @@ -191,14 +279,14 @@ async fn it_trims_distinct_id() -> Result<()> {
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": trimmed_distinct_id2
Expand Down
9 changes: 8 additions & 1 deletion capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ impl IntoResponse for CaptureError {
}
}

#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum DataType {
AnalyticsMain,
AnalyticsHistorical,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
#[serde(skip_serializing)]
pub data_type: DataType,
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
Expand Down
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ pub struct KafkaConfig {
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_topic: String,
#[envconfig(default = "events_plugin_ingestion_historical")]
pub kafka_historical_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
}
48 changes: 26 additions & 22 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config::KafkaConfig;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -80,8 +80,9 @@ impl rdkafka::ClientContext for KafkaContext {
#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: OverflowLimiter,
main_topic: String,
historical_topic: String,
}

impl KafkaSink {
Expand Down Expand Up @@ -128,7 +129,8 @@ impl KafkaSink {
Ok(KafkaSink {
producer,
partition,
topic: config.kafka_topic,
main_topic: config.kafka_topic,
historical_topic: config.kafka_historical_topic,
})
}

Expand All @@ -137,22 +139,27 @@ impl KafkaSink {
self.producer.flush(Duration::new(30, 0))
}

async fn kafka_send(
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
limited: bool,
) -> Result<DeliveryFuture, CaptureError> {
async fn kafka_send(&self, event: ProcessedEvent) -> Result<DeliveryFuture, CaptureError> {
let payload = serde_json::to_string(&event).map_err(|e| {
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

let key = event.key();
let partition_key = if limited { None } else { Some(key.as_str()) };
let event_key = event.key();
let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we could get rid of locality in historical too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocked by person processing idempotence, but to be kept in mind. Part of the solution for person processing might be in-memory caches that would benefit from locality.
If we get more instances of historical being overwhelmed, we could implement a very-high threshold overflow detection here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm particularly concerned since historical doesn't overflow, but we also do have a higher SLA.

I do understand that the solution is not here but in person processing. Hoping there won't be much trouble until we get there.

DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
}
}
};

match producer.send_result(FutureRecord {
topic: topic.as_str(),
match self.producer.send_result(FutureRecord {
topic,
payload: Some(&payload),
partition: None,
key: partition_key,
Expand Down Expand Up @@ -206,9 +213,7 @@ impl KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
let ack = self.kafka_send(event).await?;
histogram!("capture_event_batch_size").record(1.0);
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
Expand All @@ -220,12 +225,8 @@ impl Event for KafkaSink {
let mut set = JoinSet::new();
let batch_size = events.len();
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();
let limited = self.partition.is_limited(&event.key());

// We await kafka_send to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;
let ack = self.kafka_send(event).await?;

// Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
Expand Down Expand Up @@ -259,7 +260,7 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
Expand All @@ -305,6 +307,7 @@ mod tests {

let (cluster, sink) = start_on_mocked_sink().await;
let event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand Down Expand Up @@ -336,6 +339,7 @@ mod tests {
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct PrintSink {}
#[async_trait]
impl Event for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
info!("single {:?} event: {:?}", event.data_type, event);
xvello marked this conversation as resolved.
Show resolved Hide resolved
counter!("capture_events_ingested_total").increment(1);

Ok(())
Expand All @@ -22,7 +22,7 @@ impl Event for PrintSink {
histogram!("capture_event_batch_size").record(events.len() as f64);
counter!("capture_events_ingested_total").increment(events.len() as u64);
for event in events {
info!("event: {:?}", event);
info!("{:?} event: {:?}", event.data_type, event);
}

Ok(())
Expand Down
9 changes: 8 additions & 1 deletion capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::v0_request::{Compression, ProcessingContext, RawRequest};
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode, ProcessedEvent},
api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent},
router, sinks,
utils::uuid_v7,
v0_request::{EventFormData, EventQuery, RawEvent},
Expand Down Expand Up @@ -125,6 +125,7 @@ pub async fn event(
token,
now: state.timesource.current_time(),
client_ip: ip.to_string(),
is_historical,
};

let billing_limited = state
Expand Down Expand Up @@ -174,12 +175,18 @@ pub fn process_single_event(
return Err(CaptureError::MissingEventName);
}

let data_type = match context.is_historical {
true => DataType::AnalyticsHistorical,
false => DataType::AnalyticsMain,
};

let data = serde_json::to_string(&event).map_err(|e| {
tracing::error!("failed to encode data field: {}", e);
CaptureError::NonRetryableSinkError
})?;

Ok(ProcessedEvent {
data_type,
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id: event.extract_distinct_id()?,
ip: context.client_ip.clone(),
Expand Down
Loading
Loading