From fe12513ec9b6388a6af4582b6709afc1b64f474b Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 Nov 2023 18:47:13 +0100 Subject: [PATCH 1/3] implement liveness checks based on rdkafka health --- capture/src/health.rs | 338 +++++++++++++++++++++++++++++++++ capture/src/lib.rs | 1 + capture/src/router.rs | 4 + capture/src/server.rs | 17 +- capture/src/sink.rs | 13 +- capture/tests/django_compat.rs | 11 +- 6 files changed, 378 insertions(+), 6 deletions(-) create mode 100644 capture/src/health.rs diff --git a/capture/src/health.rs b/capture/src/health.rs new file mode 100644 index 0000000..1c4624a --- /dev/null +++ b/capture/src/health.rs @@ -0,0 +1,338 @@ +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use std::collections::HashMap; +use std::ops::Add; +use std::sync::{Arc, RwLock}; + +use time::Duration; +use tokio::sync::mpsc; +use tracing::log::warn; + +/// Health reporting for components of the service. +/// +/// The capture server contains several asynchronous loops, and +/// the process can only be trusted with user data if all the +/// loops are properly running and reporting. +/// +/// HealthRegistry allows an arbitrary number of components to +/// be registered and report their health. The process' health +/// status is the combination of these individual health status: +/// - if any component is unhealthy, the process is unhealthy +/// - if all components recently reported healthy, the process is healthy +/// - if a component failed to report healthy for its defined deadline, +/// it is considered unhealthy, and the check fails. +/// +/// Trying to merge the k8s concepts of liveness and readiness in +/// a single state is full of foot-guns, so HealthRegistry does not +/// try to do it. Each probe should have its separate instance of +/// the registry to avoid confusions. + +#[derive(Default, Debug)] +pub struct HealthStatus { + /// The overall status: true of all components are healthy + pub healthy: bool, + /// Current status of each registered component, for display + pub components: HashMap, +} +impl IntoResponse for HealthStatus { + /// Computes the axum status code based on the overall health status, + /// and prints each component status in the body for debugging. + fn into_response(self) -> Response { + let body = format!("{:?}", self); + match self.healthy { + true => (StatusCode::OK, body), + false => (StatusCode::INTERNAL_SERVER_ERROR, body), + } + .into_response() + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum ComponentStatus { + /// Automatically set when a component is newly registered + Starting, + /// Recently reported healthy, will need to report again before the date + HealthyUntil(time::OffsetDateTime), + /// Reported unhealthy + Unhealthy, + /// Automatically set when the HealthyUntil deadline is reached + Stalled, +} +struct HealthMessage { + component: String, + status: ComponentStatus, +} + +pub struct HealthHandle { + component: String, + deadline: Duration, + sender: mpsc::Sender, +} + +impl HealthHandle { + /// Asynchronously report healthy, returns when the message is queued. + /// Must be called more frequently than the configured deadline. + pub async fn report_healthy(&self) { + self.report_status(ComponentStatus::HealthyUntil( + time::OffsetDateTime::now_utc().add(self.deadline), + )) + .await + } + + /// Asynchronously report component status, returns when the message is queued. + pub async fn report_status(&self, status: ComponentStatus) { + let message = HealthMessage { + component: self.component.clone(), + status, + }; + if let Err(err) = self.sender.send(message).await { + warn!("failed to report heath status: {}", err) + } + } + + /// Synchronously report as healthy, returns when the message is queued. + /// Must be called more frequently than the configured deadline. + pub fn report_healthy_blocking(&self) { + self.report_status_blocking(ComponentStatus::HealthyUntil( + time::OffsetDateTime::now_utc().add(self.deadline), + )) + } + + /// Asynchronously report component status, returns when the message is queued. + pub fn report_status_blocking(&self, status: ComponentStatus) { + let message = HealthMessage { + component: self.component.clone(), + status, + }; + if let Err(err) = self.sender.blocking_send(message) { + warn!("failed to report heath status: {}", err) + } + } +} + +#[derive(Clone)] +pub struct HealthRegistry { + components: Arc>>, + sender: mpsc::Sender, +} + +impl HealthRegistry { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + let (tx, mut rx) = mpsc::channel::(16); + let registry = Self { + components: Default::default(), + sender: tx, + }; + + let components = registry.components.clone(); + tokio::spawn(async move { + while let Some(message) = rx.recv().await { + if let Ok(mut map) = components.write() { + _ = map.insert(message.component, message.status); + } else { + // Poisoned mutex: Just warn, the probes will fail and the process restart + warn!("poisoned HeathRegistry mutex") + } + } + }); + + registry + } + + /// Registers a new component in the registry. The returned handle should be passed + /// to the component, to allow it to frequently report its health status. + pub async fn register(&self, component: String, deadline: Duration) -> HealthHandle { + let handle = HealthHandle { + component, + deadline, + sender: self.sender.clone(), + }; + handle.report_status(ComponentStatus::Starting).await; + handle + } + + /// Returns the overall process status, computed from the status of all the components + /// currently registered. Can be used as an axum handler. + pub fn get_status(&self) -> HealthStatus { + let components = self + .components + .read() + .expect("poisoned HeathRegistry mutex"); + + let result = HealthStatus { + healthy: !components.is_empty(), // unhealthy if no component has registered yet + components: Default::default(), + }; + let now = time::OffsetDateTime::now_utc(); + + components + .iter() + .fold(result, |mut result, (name, status)| { + match status { + ComponentStatus::HealthyUntil(until) => { + if until.gt(&now) { + _ = result.components.insert(name.clone(), status.clone()) + } else { + result.healthy = false; + _ = result + .components + .insert(name.clone(), ComponentStatus::Stalled) + } + } + _ => { + result.healthy = false; + _ = result.components.insert(name.clone(), status.clone()) + } + } + result + }) + } +} + +#[cfg(test)] +mod tests { + use crate::health::{ComponentStatus, HealthRegistry, HealthStatus}; + use axum::http::StatusCode; + use axum::response::IntoResponse; + use std::ops::{Add, Sub}; + use time::{Duration, OffsetDateTime}; + + async fn assert_or_retry(check: F) + where + F: Fn() -> bool, + { + assert_or_retry_for_duration(check, Duration::seconds(5)).await + } + + async fn assert_or_retry_for_duration(check: F, timeout: Duration) + where + F: Fn() -> bool, + { + let deadline = OffsetDateTime::now_utc().add(timeout); + while !check() && OffsetDateTime::now_utc().lt(&deadline) { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + assert!(check()) + } + #[tokio::test] + async fn defaults_to_unhealthy() { + let registry = HealthRegistry::new(); + assert!(!registry.get_status().healthy); + } + + #[tokio::test] + async fn one_component() { + let registry = HealthRegistry::new(); + + // New components are registered in Starting + let handle = registry + .register("one".to_string(), Duration::seconds(30)) + .await; + assert_or_retry(|| registry.get_status().components.len() == 1).await; + let mut status = registry.get_status(); + assert!(!status.healthy); + assert_eq!( + status.components.get("one"), + Some(&ComponentStatus::Starting) + ); + + // Status goes healthy once the component reports + handle.report_healthy().await; + assert_or_retry(|| registry.get_status().healthy).await; + status = registry.get_status(); + assert_eq!(status.components.len(), 1); + + // Status goes unhealthy if the components says so + handle.report_status(ComponentStatus::Unhealthy).await; + assert_or_retry(|| !registry.get_status().healthy).await; + status = registry.get_status(); + assert_eq!(status.components.len(), 1); + assert_eq!( + status.components.get("one"), + Some(&ComponentStatus::Unhealthy) + ); + } + + #[tokio::test] + async fn staleness_check() { + let registry = HealthRegistry::new(); + let handle = registry + .register("one".to_string(), Duration::seconds(30)) + .await; + + // Status goes healthy once the component reports + handle.report_healthy().await; + assert_or_retry(|| registry.get_status().healthy).await; + let mut status = registry.get_status(); + assert_eq!(status.components.len(), 1); + + // If the component's ping is too old, it is considered stalled and the healthcheck fails + // FIXME: we should mock the time instead + handle + .report_status(ComponentStatus::HealthyUntil( + OffsetDateTime::now_utc().sub(Duration::seconds(1)), + )) + .await; + assert_or_retry(|| !registry.get_status().healthy).await; + status = registry.get_status(); + assert_eq!(status.components.len(), 1); + assert_eq!( + status.components.get("one"), + Some(&ComponentStatus::Stalled) + ); + } + + #[tokio::test] + async fn several_components() { + let registry = HealthRegistry::new(); + let handle1 = registry + .register("one".to_string(), Duration::seconds(30)) + .await; + let handle2 = registry + .register("two".to_string(), Duration::seconds(30)) + .await; + assert_or_retry(|| registry.get_status().components.len() == 2).await; + + // First component going healthy is not enough + handle1.report_healthy().await; + assert_or_retry(|| { + registry.get_status().components.get("one").unwrap() != &ComponentStatus::Starting + }) + .await; + assert!(!registry.get_status().healthy); + + // Second component going healthy brings the health to green + handle2.report_healthy().await; + assert_or_retry(|| { + registry.get_status().components.get("two").unwrap() != &ComponentStatus::Starting + }) + .await; + assert!(registry.get_status().healthy); + + // First component going unhealthy takes down the health to red + handle1.report_status(ComponentStatus::Unhealthy).await; + assert_or_retry(|| !registry.get_status().healthy).await; + + // First component recovering returns the health to green + handle1.report_healthy().await; + assert_or_retry(|| registry.get_status().healthy).await; + + // Second component going unhealthy takes down the health to red + handle2.report_status(ComponentStatus::Unhealthy).await; + assert_or_retry(|| !registry.get_status().healthy).await; + } + + #[tokio::test] + async fn into_response() { + let nok = HealthStatus::default().into_response(); + assert_eq!(nok.status(), StatusCode::INTERNAL_SERVER_ERROR); + + let ok = HealthStatus { + healthy: true, + components: Default::default(), + } + .into_response(); + assert_eq!(ok.status(), StatusCode::OK); + } +} diff --git a/capture/src/lib.rs b/capture/src/lib.rs index 70f7548..50f6705 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -3,6 +3,7 @@ pub mod billing_limits; pub mod capture; pub mod config; pub mod event; +pub mod health; pub mod prometheus; pub mod redis; pub mod router; diff --git a/capture/src/router.rs b/capture/src/router.rs index 9acc7f4..bae787c 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -9,6 +9,7 @@ use axum::{ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; +use crate::health::HealthRegistry; use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource}; use crate::prometheus::{setup_metrics_recorder, track_metrics}; @@ -31,6 +32,7 @@ pub fn router< R: Client + Send + Sync + 'static, >( timesource: TZ, + liveness: HealthRegistry, sink: S, redis: Arc, billing: BillingLimiter, @@ -54,6 +56,8 @@ pub fn router< let router = Router::new() // TODO: use NormalizePathLayer::trim_trailing_slash .route("/", get(index)) + .route("/_readiness", get(index)) + .route("/_liveness", get(move || ready(liveness.get_status()))) .route("/i/v0/e", post(capture::event).options(capture::options)) .route("/i/v0/e/", post(capture::event).options(capture::options)) .layer(TraceLayer::new_for_http()) diff --git a/capture/src/server.rs b/capture/src/server.rs index e4372ae..c6b11b5 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -6,13 +6,15 @@ use time::Duration; use crate::billing_limits::BillingLimiter; use crate::config::Config; +use crate::health::{ComponentStatus, HealthRegistry}; use crate::redis::RedisClient; use crate::{router, sink}; - pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where F: Future, { + let liveness = HealthRegistry::new(); + let redis_client = Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); @@ -20,17 +22,28 @@ where .expect("failed to create billing limiter"); let app = if config.print_sink { + // Print sink is only used for local debug, don't allow a container with it to run on prod + liveness + .register("print_sink".to_string(), Duration::seconds(30)) + .await + .report_status(ComponentStatus::Unhealthy) + .await; router::router( crate::time::SystemTime {}, + liveness, sink::PrintSink {}, redis_client, billing, config.export_prometheus, ) } else { - let sink = sink::KafkaSink::new(config.kafka).unwrap(); + let sink_liveness = liveness + .register("rdkafka".to_string(), Duration::seconds(30)) + .await; + let sink = sink::KafkaSink::new(config.kafka, sink_liveness).unwrap(); router::router( crate::time::SystemTime {}, + liveness, sink, redis_client, billing, diff --git a/capture/src/sink.rs b/capture/src/sink.rs index d6b60d8..f044df0 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -13,6 +13,7 @@ use tracing::{debug, info}; use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::event::ProcessedEvent; +use crate::health::HealthHandle; use crate::prometheus::report_dropped_events; #[async_trait] @@ -45,10 +46,16 @@ impl EventSink for PrintSink { } } -struct KafkaContext; +struct KafkaContext { + liveness: HealthHandle, +} impl rdkafka::ClientContext for KafkaContext { fn stats(&self, stats: rdkafka::Statistics) { + // Signal liveness, as the main rdkafka loop is running and calling us + self.liveness.report_healthy_blocking(); + + // Update exported metrics gauge!("capture_kafka_callback_queue_depth", stats.replyq as f64); gauge!("capture_kafka_producer_queue_depth", stats.msg_cnt as f64); gauge!( @@ -107,7 +114,7 @@ pub struct KafkaSink { } impl KafkaSink { - pub fn new(config: KafkaConfig) -> anyhow::Result { + pub fn new(config: KafkaConfig, liveness: HealthHandle) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); let mut client_config = ClientConfig::new(); @@ -129,7 +136,7 @@ impl KafkaSink { debug!("rdkafka configuration: {:?}", client_config); let producer: FutureProducer = - client_config.create_with_context(KafkaContext)?; + client_config.create_with_context(KafkaContext { liveness })?; // Ping the cluster to make sure we can reach brokers, fail after 10 seconds _ = producer.client().fetch_metadata( diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index d418996..3c47cc1 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -7,6 +7,7 @@ use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; use capture::billing_limits::BillingLimiter; use capture::event::ProcessedEvent; +use capture::health::HealthRegistry; use capture::redis::MockRedisClient; use capture::router::router; use capture::sink::EventSink; @@ -76,6 +77,7 @@ impl EventSink for MemorySink { async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let file = File::open(REQUESTS_DUMP_FILE_NAME)?; let reader = BufReader::new(file); + let liveness = HealthRegistry::new(); let mut mismatches = 0; @@ -100,7 +102,14 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let billing = BillingLimiter::new(Duration::weeks(1), redis.clone()) .expect("failed to create billing limiter"); - let app = router(timesource, sink.clone(), redis, billing, false); + let app = router( + timesource, + liveness.clone(), + sink.clone(), + redis, + billing, + false, + ); let client = TestClient::new(app); let mut req = client.post(&format!("/i/v0{}", case.path)).body(raw_body); From 897fb903b6e19b639bcdbf5cb27208ba46947665 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 Nov 2023 19:04:20 +0100 Subject: [PATCH 2/3] log health check result --- capture/src/health.rs | 24 +++++++++++++++--------- capture/src/server.rs | 2 +- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/capture/src/health.rs b/capture/src/health.rs index 1c4624a..dcddbe4 100644 --- a/capture/src/health.rs +++ b/capture/src/health.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, RwLock}; use time::Duration; use tokio::sync::mpsc; -use tracing::log::warn; +use tracing::{info, warn}; /// Health reporting for components of the service. /// @@ -112,15 +112,16 @@ impl HealthHandle { #[derive(Clone)] pub struct HealthRegistry { + name: String, components: Arc>>, sender: mpsc::Sender, } impl HealthRegistry { - #[allow(clippy::new_without_default)] - pub fn new() -> Self { + pub fn new(name: &str) -> Self { let (tx, mut rx) = mpsc::channel::(16); let registry = Self { + name: name.to_owned(), components: Default::default(), sender: tx, }; @@ -166,7 +167,7 @@ impl HealthRegistry { }; let now = time::OffsetDateTime::now_utc(); - components + let result = components .iter() .fold(result, |mut result, (name, status)| { match status { @@ -186,7 +187,12 @@ impl HealthRegistry { } } result - }) + }); + match result.healthy { + true => info!("{} health check ok", self.name), + false => warn!("{} health check failed: {:?}", self.name, result.components), + } + result } } @@ -217,13 +223,13 @@ mod tests { } #[tokio::test] async fn defaults_to_unhealthy() { - let registry = HealthRegistry::new(); + let registry = HealthRegistry::new("liveness"); assert!(!registry.get_status().healthy); } #[tokio::test] async fn one_component() { - let registry = HealthRegistry::new(); + let registry = HealthRegistry::new("liveness"); // New components are registered in Starting let handle = registry @@ -256,7 +262,7 @@ mod tests { #[tokio::test] async fn staleness_check() { - let registry = HealthRegistry::new(); + let registry = HealthRegistry::new("liveness"); let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; @@ -285,7 +291,7 @@ mod tests { #[tokio::test] async fn several_components() { - let registry = HealthRegistry::new(); + let registry = HealthRegistry::new("liveness"); let handle1 = registry .register("one".to_string(), Duration::seconds(30)) .await; diff --git a/capture/src/server.rs b/capture/src/server.rs index c6b11b5..8c40fd3 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -13,7 +13,7 @@ pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where F: Future, { - let liveness = HealthRegistry::new(); + let liveness = HealthRegistry::new("liveness"); let redis_client = Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); From de443cf422e6b45a4a99f81455f97ba121ebe62e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 Nov 2023 19:07:10 +0100 Subject: [PATCH 3/3] fix test --- capture/tests/django_compat.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index 3c47cc1..b95c78e 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -77,7 +77,7 @@ impl EventSink for MemorySink { async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let file = File::open(REQUESTS_DUMP_FILE_NAME)?; let reader = BufReader::new(file); - let liveness = HealthRegistry::new(); + let liveness = HealthRegistry::new("dummy"); let mut mismatches = 0;