Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ async fn extension_loop_active(
let dogstatsd_cancel_token = start_dogstatsd(metrics_aggr_handle.clone()).await;

let telemetry_listener_cancel_token =
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;
setup_telemetry_client(&r.extension_id, event_bus.get_sender_copy()).await?;

let otlp_cancel_token = start_otlp_agent(
config,
Expand Down Expand Up @@ -614,7 +614,7 @@ async fn extension_loop_active(
tokio::select! {
biased;
Some(event) = event_bus.rx.recv() => {
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
if let Some(telemetry_event) = handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
break 'flush_end;
}
Expand Down Expand Up @@ -738,7 +738,7 @@ async fn extension_loop_active(
break 'next_invocation;
}
Some(event) = event_bus.rx.recv() => {
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
}
_ = race_flush_interval.tick() => {
let mut locked_metrics = metrics_flushers.lock().await;
Expand Down Expand Up @@ -776,7 +776,7 @@ async fn extension_loop_active(
debug!("Received tombstone event, proceeding with shutdown");
break 'shutdown;
}
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
handle_event_bus_event(event, logs_agent_channel.clone(), invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
}
// Add timeout to prevent hanging indefinitely
() = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => {
Expand Down Expand Up @@ -849,6 +849,7 @@ async fn blocking_flush_all(

async fn handle_event_bus_event(
event: Event,
logs_agent_channel: Sender<TelemetryEvent>,
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
tags_provider: Arc<TagProvider>,
Expand All @@ -862,6 +863,14 @@ async fn handle_event_bus_event(
drop(p);
}
Event::Telemetry(event) => {
// Send telemetry event to logs channel
if let Err(e) = logs_agent_channel.send(event.clone()).await {
debug!(
"Failed to send telemetry event to logs agent channel: {}",
e
);
}

debug!("Telemetry event received: {:?}", event);
match event.record {
TelemetryRecord::PlatformInitStart { .. } => {
Expand Down Expand Up @@ -1210,10 +1219,9 @@ async fn start_dogstatsd(metrics_aggr_handle: MetricsAggregatorHandle) -> Cancel

async fn setup_telemetry_client(
extension_id: &str,
logs_agent_channel: Sender<TelemetryEvent>,
event_bus: Sender<Event>,
) -> Result<CancellationToken> {
let telemetry_listener =
TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_agent_channel);
let telemetry_listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, event_bus);

let cancel_token = telemetry_listener.cancel_token();
tokio::spawn(async move {
Expand Down
25 changes: 1 addition & 24 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ impl LambdaProcessor {

#[allow(clippy::too_many_lines)]
async fn get_message(&mut self, event: TelemetryEvent) -> Result<Message, Box<dyn Error>> {
let copy = event.clone();
match event.record {
TelemetryRecord::Function(v) | TelemetryRecord::Extension(v) => {
let message = match v {
Expand Down Expand Up @@ -115,10 +114,6 @@ impl LambdaProcessor {
runtime_version_arn,
.. // TODO: check if we could do something with this metrics: `initialization_type` and `phase`
} => {
if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await {
error!("Failed to send PlatformInitStart to the main event bus: {}", e);
}

let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display
let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display

Expand All @@ -130,23 +125,12 @@ impl LambdaProcessor {
None,
))
},
// TODO: check if we could do anything with the fields from `PlatformInitReport`
TelemetryRecord::PlatformInitReport { .. } => {
if let Err(e) = self.event_bus.send(Event::Telemetry(event)).await {
error!("Failed to send PlatformInitReport to the main event bus: {}", e);
}
// We don't need to process any log for this event
Err("Unsupported event type".into())
}
// This is the first log where `request_id` is available
// So we set it here and use it in the unprocessed and following logs.
TelemetryRecord::PlatformStart {
request_id,
version,
} => {
if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await {
error!("Failed to send PlatformStart to the main event bus: {}", e);
}
// Set request_id for unprocessed and future logs
self.invocation_context.request_id.clone_from(&request_id);

Expand All @@ -160,10 +144,6 @@ impl LambdaProcessor {
))
},
TelemetryRecord::PlatformRuntimeDone { request_id, status, metrics, error_type, .. } => { // TODO: check what to do with rest of the fields
if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await {
error!("Failed to send PlatformRuntimeDone to the main event bus: {}", e);
}

let mut message = format!("END RequestId: {request_id}");
let mut result_status = "info".to_string();
if let Some(metrics) = metrics {
Expand All @@ -188,10 +168,6 @@ impl LambdaProcessor {
))
},
TelemetryRecord::PlatformReport { request_id, metrics, .. } => { // TODO: check what to do with rest of the fields
if let Err(e) = self.event_bus.send(Event::Telemetry(copy)).await {
error!("Failed to send PlatformReport to the main event bus: {}", e);
}

let mut post_runtime_duration_ms = 0.0;
// Calculate `post_runtime_duration_ms` if we've seen a `runtime_duration_ms`.
if self.invocation_context.runtime_duration_ms > 0.0 {
Expand Down Expand Up @@ -222,6 +198,7 @@ impl LambdaProcessor {
None,
))
},
// TODO: PlatformInitReport
// TODO: PlatformInitRuntimeDone
// TODO: PlatformExtension
// TODO: PlatformTelemetrySubscription
Expand Down
26 changes: 18 additions & 8 deletions bottlecap/src/telemetry/listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
event_bus::Event,
http::{extract_request_body, handler_not_found},
telemetry::events::{TelemetryEvent, TelemetryRecord},
};
Expand All @@ -22,12 +23,12 @@ pub struct TelemetryListener {
host: [u8; 4],
port: u16,
cancel_token: CancellationToken,
event_bus: Sender<TelemetryEvent>,
event_bus: Sender<Event>,
}

impl TelemetryListener {
#[must_use]
pub fn new(host: [u8; 4], port: u16, event_bus: Sender<TelemetryEvent>) -> Self {
pub fn new(host: [u8; 4], port: u16, event_bus: Sender<Event>) -> Self {
let cancel_token = CancellationToken::new();
Self {
host,
Expand Down Expand Up @@ -74,15 +75,15 @@ impl TelemetryListener {
.with_state(event_bus)
}

async fn graceful_shutdown(cancel_token: CancellationToken, event_bus: Sender<TelemetryEvent>) {
async fn graceful_shutdown(cancel_token: CancellationToken, event_bus: Sender<Event>) {
cancel_token.cancelled().await;
debug!("Telemetry API | Shutdown signal received, sending tombstone event");

// Send tombstone event to signal shutdown
let tombstone_event = TelemetryEvent {
let tombstone_event = Event::Telemetry(TelemetryEvent {
time: Utc::now(),
record: TelemetryRecord::PlatformTombstone,
};
});

if let Err(e) = event_bus.send(tombstone_event).await {
debug!("Failed to send tombstone event: {:?}", e);
Expand All @@ -91,7 +92,7 @@ impl TelemetryListener {
debug!("Telemetry API | Shutting down");
}

async fn handle(State(event_bus): State<Sender<TelemetryEvent>>, request: Request) -> Response {
async fn handle(State(event_bus): State<Sender<Event>>, request: Request) -> Response {
let (_, body) = match extract_request_body(request).await {
Ok(r) => r,
Err(e) => {
Expand Down Expand Up @@ -119,7 +120,10 @@ impl TelemetryListener {
};

for event in telemetry_events.drain(..) {
event_bus.send(event).await.expect("infallible");
event_bus
.send(Event::Telemetry(event))
.await
.expect("infallible");
}

(StatusCode::OK, "OK").into_response()
Expand Down Expand Up @@ -158,9 +162,15 @@ mod tests {
// Check that the response is OK
assert_eq!(response.status(), axum::http::StatusCode::OK);

let telemetry_event = rx.recv().await.unwrap();
let event = rx.recv().await.unwrap();
let expected_time =
DateTime::parse_from_rfc3339("2024-04-25T17:35:59.944Z").expect("failed to parse time");

let telemetry_event = match event {
Event::Telemetry(event) => event,
_ => panic!("Expected telemetry event"),
};

assert_eq!(telemetry_event.time, expected_time);
assert_eq!(telemetry_event.record, TelemetryRecord::PlatformInitStart {
initialization_type: InitType::OnDemand,
Expand Down
Loading