diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 25d65d7fa..7fbb95274 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -578,7 +578,7 @@ async fn extension_loop_active( let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await; let telemetry_listener_cancel_token = - setup_telemetry_client(&r.extension_id, logs_agent_channel).await?; + setup_telemetry_client(&r.extension_id, event_bus.get_sender_copy()).await?; let otlp_cancel_token = start_otlp_agent( config, @@ -614,7 +614,7 @@ async fn extension_loop_active( tokio::select! { biased; Some(event) = event_bus.rx.recv() => { - if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await { + if let Some(telemetry_event) = handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await { if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record { break 'flush_end; } @@ -738,7 +738,7 @@ async fn extension_loop_active( break 'next_invocation; } Some(event) = event_bus.rx.recv() => { - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } _ = race_flush_interval.tick() => { let mut locked_metrics = metrics_flushers.lock().await; @@ -776,7 +776,7 @@ async fn extension_loop_active( debug!("Received tombstone event, proceeding with shutdown"); break 'shutdown; } - handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; + handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await; } // Add timeout to prevent hanging indefinitely () = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => { @@ -849,6 +849,7 @@ async fn blocking_flush_all( async fn handle_event_bus_event( event: Event, + logs_agent_channel: Sender, invocation_processor: Arc>, appsec_processor: Option>>, tags_provider: Arc, @@ -862,6 +863,14 @@ async fn handle_event_bus_event( drop(p); } Event::Telemetry(event) => { + // Send telemetry event to logs channel + if let Err(e) = logs_agent_channel.send(event.clone()).await { + debug!( + "Failed to send telemetry event to logs agent channel: {}", + e + ); + } + debug!("Telemetry event received: {:?}", event); match event.record { TelemetryRecord::PlatformInitStart { .. } => { @@ -1210,10 +1219,9 @@ async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> Cancel async fn setup_telemetry_client( extension_id: &str, - logs_agent_channel: Sender, + event_bus: Sender, ) -> Result { - let telemetry_listener = - TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_agent_channel); + let telemetry_listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, event_bus); let cancel_token = telemetry_listener.cancel_token(); tokio::spawn(async move { diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 1cefdbc78..0de069525 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -82,7 +82,6 @@ impl LambdaProcessor { #[allow(clippy::too_many_lines)] async fn get_message(&mut self, event: TelemetryEvent) -> Result> { - let copy = event.clone(); match event.record { TelemetryRecord::Function(v) | TelemetryRecord::Extension(v) => { let message = match v { @@ -115,10 +114,6 @@ impl LambdaProcessor { runtime_version_arn, .. // TODO: check if we could do something with this metrics: `initialization_type` and `phase` } => { - if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { - error!("Failed to send PlatformInitStart to the main event bus: {}", e); - } - let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display @@ -130,23 +125,12 @@ impl LambdaProcessor { None, )) }, - // TODO: check if we could do anything with the fields from `PlatformInitReport` - TelemetryRecord::PlatformInitReport { .. } => { - if let Err(e) = self.event_bus.send(Event::Telemetry(event)).await { - error!("Failed to send PlatformInitReport to the main event bus: {}", e); - } - // We don't need to process any log for this event - Err("Unsupported event type".into()) - } // This is the first log where `request_id` is available // So we set it here and use it in the unprocessed and following logs. TelemetryRecord::PlatformStart { request_id, version, } => { - if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { - error!("Failed to send PlatformStart to the main event bus: {}", e); - } // Set request_id for unprocessed and future logs self.invocation_context.request_id.clone_from(&request_id); @@ -160,10 +144,6 @@ impl LambdaProcessor { )) }, TelemetryRecord::PlatformRuntimeDone { request_id, status, metrics, error_type, .. } => { // TODO: check what to do with rest of the fields - if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { - error!("Failed to send PlatformRuntimeDone to the main event bus: {}", e); - } - let mut message = format!("END RequestId: {request_id}"); let mut result_status = "info".to_string(); if let Some(metrics) = metrics { @@ -188,10 +168,6 @@ impl LambdaProcessor { )) }, TelemetryRecord::PlatformReport { request_id, metrics, .. } => { // TODO: check what to do with rest of the fields - if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await { - error!("Failed to send PlatformReport to the main event bus: {}", e); - } - let mut post_runtime_duration_ms = 0.0; // Calculate `post_runtime_duration_ms` if we've seen a `runtime_duration_ms`. if self.invocation_context.runtime_duration_ms > 0.0 { @@ -222,6 +198,7 @@ impl LambdaProcessor { None, )) }, + // TODO: PlatformInitReport // TODO: PlatformInitRuntimeDone // TODO: PlatformExtension // TODO: PlatformTelemetrySubscription diff --git a/bottlecap/src/telemetry/listener.rs b/bottlecap/src/telemetry/listener.rs index 9f1126359..f83aea886 100644 --- a/bottlecap/src/telemetry/listener.rs +++ b/bottlecap/src/telemetry/listener.rs @@ -1,4 +1,5 @@ use crate::{ + event_bus::Event, http::{extract_request_body, handler_not_found}, telemetry::events::{TelemetryEvent, TelemetryRecord}, }; @@ -22,12 +23,12 @@ pub struct TelemetryListener { host: [u8; 4], port: u16, cancel_token: CancellationToken, - event_bus: Sender, + event_bus: Sender, } impl TelemetryListener { #[must_use] - pub fn new(host: [u8; 4], port: u16, event_bus: Sender) -> Self { + pub fn new(host: [u8; 4], port: u16, event_bus: Sender) -> Self { let cancel_token = CancellationToken::new(); Self { host, @@ -74,15 +75,15 @@ impl TelemetryListener { .with_state(event_bus) } - async fn graceful_shutdown(cancel_token: CancellationToken, event_bus: Sender) { + async fn graceful_shutdown(cancel_token: CancellationToken, event_bus: Sender) { cancel_token.cancelled().await; debug!("Telemetry API | Shutdown signal received, sending tombstone event"); // Send tombstone event to signal shutdown - let tombstone_event = TelemetryEvent { + let tombstone_event = Event::Telemetry(TelemetryEvent { time: Utc::now(), record: TelemetryRecord::PlatformTombstone, - }; + }); if let Err(e) = event_bus.send(tombstone_event).await { debug!("Failed to send tombstone event: {:?}", e); @@ -91,7 +92,7 @@ impl TelemetryListener { debug!("Telemetry API | Shutting down"); } - async fn handle(State(event_bus): State>, request: Request) -> Response { + async fn handle(State(event_bus): State>, request: Request) -> Response { let (_, body) = match extract_request_body(request).await { Ok(r) => r, Err(e) => { @@ -119,7 +120,10 @@ impl TelemetryListener { }; for event in telemetry_events.drain(..) { - event_bus.send(event).await.expect("infallible"); + event_bus + .send(Event::Telemetry(event)) + .await + .expect("infallible"); } (StatusCode::OK, "OK").into_response() @@ -158,9 +162,15 @@ mod tests { // Check that the response is OK assert_eq!(response.status(), axum::http::StatusCode::OK); - let telemetry_event = rx.recv().await.unwrap(); + let event = rx.recv().await.unwrap(); let expected_time = DateTime::parse_from_rfc3339("2024-04-25T17:35:59.944Z").expect("failed to parse time"); + + let telemetry_event = match event { + Event::Telemetry(event) => event, + _ => panic!("Expected telemetry event"), + }; + assert_eq!(telemetry_event.time, expected_time); assert_eq!(telemetry_event.record, TelemetryRecord::PlatformInitStart { initialization_type: InitType::OnDemand,