diff --git a/crates/rustapi-extras/src/insight/export.rs b/crates/rustapi-extras/src/insight/export.rs index 0591eeb..33e797d 100644 --- a/crates/rustapi-extras/src/insight/export.rs +++ b/crates/rustapi-extras/src/insight/export.rs @@ -222,73 +222,89 @@ pub struct WebhookExporter { config: WebhookConfig, buffer: Arc>>, #[cfg(feature = "webhook")] - client: reqwest::Client, + sender: tokio::sync::mpsc::Sender>, + #[cfg(not(feature = "webhook"))] + _marker: std::marker::PhantomData<()>, } impl WebhookExporter { /// Create a new webhook exporter. pub fn new(config: WebhookConfig) -> Self { #[cfg(feature = "webhook")] - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(config.timeout_secs)) - .build() - .expect("Failed to build HTTP client"); + { + // Allow buffering up to 100 batches before dropping + let (tx, mut rx) = tokio::sync::mpsc::channel::>(100); + let config_clone = config.clone(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(config_clone.timeout_secs)) + .build() + .expect("Failed to build HTTP client"); + + rt.block_on(async move { + while let Some(insights) = rx.recv().await { + let mut request = client.post(&config_clone.url).json(&insights); + + if let Some(ref auth_value) = config_clone.auth_header { + request = request.header("Authorization", auth_value); + } + + // Add custom headers + for (k, v) in &config_clone.headers { + request = request.header(k, v); + } + + match request.send().await { + Ok(response) => { + if !response.status().is_success() { + tracing::error!( + "Webhook returned status {}", + response.status() + ); + } + } + Err(e) => { + tracing::error!("Webhook error: {}", e); + } + } + } + }); + }); + Self { + config, + buffer: Arc::new(Mutex::new(Vec::new())), + sender: tx, + } + } + + #[cfg(not(feature = "webhook"))] Self { config, buffer: Arc::new(Mutex::new(Vec::new())), - #[cfg(feature = "webhook")] - client, + _marker: std::marker::PhantomData, } } /// Send insights to the webhook. #[cfg(feature = "webhook")] fn send_insights(&self, insights: &[InsightData]) -> ExportResult<()> { - use std::sync::mpsc; - - // Use a channel to get the result from the async context - let (tx, rx) = mpsc::channel(); - let client = self.client.clone(); - let url = self.config.url.clone(); - let auth = self.config.auth_header.clone(); - let insights = insights.to_vec(); - - // Spawn a blocking task to run the async request - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let result = rt.block_on(async { - let mut request = client.post(&url).json(&insights); - - if let Some(auth_value) = auth { - request = request.header("Authorization", auth_value); - } - - match request.send().await { - Ok(response) => { - if response.status().is_success() { - Ok(()) - } else { - Err(ExportError::Unavailable(format!( - "Webhook returned status {}", - response.status() - ))) - } - } - Err(e) => Err(ExportError::Unavailable(e.to_string())), - } - }); - - let _ = tx.send(result); - }); - - // Wait for the result with timeout - rx.recv_timeout(std::time::Duration::from_secs(self.config.timeout_secs + 1)) - .map_err(|_| ExportError::Unavailable("Webhook request timed out".to_string()))? + match self.sender.try_send(insights.to_vec()) { + Ok(_) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!("Webhook exporter channel full, dropping batch"); + Ok(()) + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err( + ExportError::Unavailable("Webhook worker channel closed".to_string()), + ), + } } /// Send insights to the webhook (stub when webhook feature is disabled). diff --git a/crates/rustapi-extras/tests/webhook_performance.rs b/crates/rustapi-extras/tests/webhook_performance.rs new file mode 100644 index 0000000..ab510db --- /dev/null +++ b/crates/rustapi-extras/tests/webhook_performance.rs @@ -0,0 +1,55 @@ +#![cfg(feature = "webhook")] + +use rustapi_extras::insight::export::{InsightExporter, WebhookConfig, WebhookExporter}; +use rustapi_extras::insight::InsightData; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::test] +async fn test_webhook_blocking_behavior() { + // Start a dummy server that sleeps + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (mut socket, _) = listener.accept().await.unwrap(); + tokio::spawn(async move { + let mut buf = [0; 1024]; + let _ = socket.read(&mut buf).await; + // Simulate slow processing + tokio::time::sleep(Duration::from_millis(500)).await; + let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + let _ = socket.write_all(response.as_bytes()).await; + }); + } + }); + + // Configure exporter with batch size 1 to trigger send immediately + let config = WebhookConfig::new(format!("http://{}", addr)) + .batch_size(1) + .timeout(2); + + let exporter = WebhookExporter::new(config); + let insight = InsightData::new("test", "GET", "/"); + + let start = Instant::now(); + // This should trigger a send because batch_size is 1. + // In current implementation, it blocks waiting for response. + match exporter.export(&insight) { + Ok(_) => println!("Export successful"), + Err(e) => println!("Export failed: {:?}", e), + } + let duration = start.elapsed(); + + println!("Export took: {:?}", duration); + + // If it blocks, it should take at least 500ms (due to 500ms server sleep or timeout) + if duration.as_millis() >= 400 { + panic!( + "Performance regression: Export blocked for {:?}. Expected non-blocking behavior.", + duration + ); + } +}