diff --git a/Cargo.toml b/Cargo.toml index b8b4d08..318b180 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ serde_json = "1.0.96" governor = "0.5.1" tower_governor = "0.0.4" time = { version = "0.3.20", features = ["formatting", "macros", "parsing", "serde"] } -tower-http = { version = "0.4.0", features = ["trace"] } +tower-http = { version = "0.4.0", features = ["cors", "trace"] } bytes = "1" anyhow = "1.0" flate2 = "1.0" diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 6475940..45fc5f7 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -104,6 +104,12 @@ pub async fn event( })) } +pub async fn options() -> Result, CaptureError> { + Ok(Json(CaptureResponse { + status: CaptureResponseCode::Ok, + })) +} + pub fn process_single_event( event: &RawEvent, context: &ProcessingContext, diff --git a/capture/src/router.rs b/capture/src/router.rs index d6cf60e..d5a3a03 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -1,10 +1,12 @@ use std::future::ready; use std::sync::Arc; +use axum::http::Method; use axum::{ routing::{get, post}, Router, }; +use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::trace::TraceLayer; use crate::health::HealthRegistry; @@ -43,14 +45,22 @@ pub fn router< billing, }; + // Very permissive CORS policy, as old SDK versions + // and reverse proxies might send funky headers. + let cors = CorsLayer::new() + .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) + .allow_headers(Any) + .allow_origin(AllowOrigin::mirror_request()); + let router = Router::new() // TODO: use NormalizePathLayer::trim_trailing_slash .route("/", get(index)) .route("/_readiness", get(index)) .route("/_liveness", get(move || ready(liveness.get_status()))) - .route("/i/v0/e", post(capture::event)) - .route("/i/v0/e/", post(capture::event)) + .route("/i/v0/e", post(capture::event).options(capture::options)) + .route("/i/v0/e/", post(capture::event).options(capture::options)) .layer(TraceLayer::new_for_http()) + .layer(cors) .layer(axum::middleware::from_fn(track_metrics)) .with_state(state); diff --git a/capture/src/sink.rs b/capture/src/sink.rs index b2c8ca8..63c3f35 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -26,7 +26,7 @@ pub struct PrintSink {} #[async_trait] impl EventSink for PrintSink { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - tracing::info!("single event: {:?}", event); + info!("single event: {:?}", event); counter!("capture_events_ingested_total", 1); Ok(()) @@ -38,7 +38,7 @@ impl EventSink for PrintSink { histogram!("capture_event_batch_size", events.len() as f64); counter!("capture_events_ingested_total", events.len() as u64); for event in events { - tracing::info!("event: {:?}", event); + info!("event: {:?}", event); } Ok(()) @@ -185,12 +185,16 @@ impl KafkaSink { #[async_trait] impl EventSink for KafkaSink { async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - Self::kafka_send(self.producer.clone(), self.topic.clone(), event).await + Self::kafka_send(self.producer.clone(), self.topic.clone(), event).await?; + + histogram!("capture_event_batch_size", 1.0); + counter!("capture_events_ingested_total", 1); + Ok(()) } async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { let mut set = JoinSet::new(); - + let batch_size = events.len(); for event in events { let producer = self.producer.clone(); let topic = self.topic.clone(); @@ -201,6 +205,8 @@ impl EventSink for KafkaSink { // Await on all the produce promises while (set.join_next().await).is_some() {} + histogram!("capture_event_batch_size", batch_size as f64); + counter!("capture_events_ingested_total", batch_size as u64); Ok(()) } }