Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into xvello/health
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Nov 2, 2023
2 parents 5c54a3e + 4b3634b commit b68ecf4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde_json = "1.0.96"
governor = "0.5.1"
tower_governor = "0.0.4"
time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] }
tower-http = { version = "0.4.0", features = ["trace"] }
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
bytes = "1"
anyhow = "1.0"
flate2 = "1.0"
Expand Down
6 changes: 6 additions & 0 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub async fn event(
}))
}

pub async fn options() -> Result<Json<CaptureResponse>, CaptureError> {
Ok(Json(CaptureResponse {
status: CaptureResponseCode::Ok,
}))
}

pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
Expand Down
14 changes: 12 additions & 2 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::future::ready;
use std::sync::Arc;

use axum::http::Method;
use axum::{
routing::{get, post},
Router,
};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
Expand Down Expand Up @@ -43,14 +45,22 @@ pub fn router<
billing,
};

// Very permissive CORS policy, as old SDK versions
// and reverse proxies might send funky headers.
let cors = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers(Any)
.allow_origin(AllowOrigin::mirror_request());

let router = Router::new()
// TODO: use NormalizePathLayer::trim_trailing_slash
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())))
.route("/i/v0/e", post(capture::event))
.route("/i/v0/e/", post(capture::event))
.route("/i/v0/e", post(capture::event).options(capture::options))
.route("/i/v0/e/", post(capture::event).options(capture::options))
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
.with_state(state);

Expand Down
14 changes: 10 additions & 4 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct PrintSink {}
#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
tracing::info!("single event: {:?}", event);
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
Expand All @@ -38,7 +38,7 @@ impl EventSink for PrintSink {
histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
tracing::info!("event: {:?}", event);
info!("event: {:?}", event);
}

Ok(())
Expand Down Expand Up @@ -185,12 +185,16 @@ impl KafkaSink {
#[async_trait]
impl EventSink for KafkaSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
Self::kafka_send(self.producer.clone(), self.topic.clone(), event).await
Self::kafka_send(self.producer.clone(), self.topic.clone(), event).await?;

histogram!("capture_event_batch_size", 1.0);
counter!("capture_events_ingested_total", 1);
Ok(())
}

async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let mut set = JoinSet::new();

let batch_size = events.len();
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();
Expand All @@ -201,6 +205,8 @@ impl EventSink for KafkaSink {
// Await on all the produce promises
while (set.join_next().await).is_some() {}

histogram!("capture_event_batch_size", batch_size as f64);
counter!("capture_events_ingested_total", batch_size as u64);
Ok(())
}
}

0 comments on commit b68ecf4

Please sign in to comment.