Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

implement liveness checks based on rdkafka health #41

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 344 additions & 0 deletions capture/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use std::collections::HashMap;
use std::ops::Add;
use std::sync::{Arc, RwLock};

use time::Duration;
use tokio::sync::mpsc;
use tracing::{info, warn};

/// Health reporting for components of the service.
///
/// The capture server contains several asynchronous loops, and
/// the process can only be trusted with user data if all the
/// loops are properly running and reporting.
///
/// HealthRegistry allows an arbitrary number of components to
/// be registered and report their health. The process' health
/// status is the combination of these individual health status:
/// - if any component is unhealthy, the process is unhealthy
/// - if all components recently reported healthy, the process is healthy
/// - if a component failed to report healthy for its defined deadline,
/// it is considered unhealthy, and the check fails.
///
/// Trying to merge the k8s concepts of liveness and readiness in
/// a single state is full of foot-guns, so HealthRegistry does not
/// try to do it. Each probe should have its separate instance of
/// the registry to avoid confusions.

#[derive(Default, Debug)]
pub struct HealthStatus {
/// The overall status: true of all components are healthy
pub healthy: bool,
/// Current status of each registered component, for display
pub components: HashMap<String, ComponentStatus>,
}
impl IntoResponse for HealthStatus {
/// Computes the axum status code based on the overall health status,
/// and prints each component status in the body for debugging.
fn into_response(self) -> Response {
let body = format!("{:?}", self);
match self.healthy {
true => (StatusCode::OK, body),
false => (StatusCode::INTERNAL_SERVER_ERROR, body),
}
.into_response()
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ComponentStatus {
/// Automatically set when a component is newly registered
Starting,
/// Recently reported healthy, will need to report again before the date
HealthyUntil(time::OffsetDateTime),
/// Reported unhealthy
Unhealthy,
/// Automatically set when the HealthyUntil deadline is reached
Stalled,
}
struct HealthMessage {
component: String,
status: ComponentStatus,
}

pub struct HealthHandle {
component: String,
deadline: Duration,
sender: mpsc::Sender<HealthMessage>,
}

impl HealthHandle {
/// Asynchronously report healthy, returns when the message is queued.
/// Must be called more frequently than the configured deadline.
pub async fn report_healthy(&self) {
self.report_status(ComponentStatus::HealthyUntil(
time::OffsetDateTime::now_utc().add(self.deadline),
))
.await
}

/// Asynchronously report component status, returns when the message is queued.
pub async fn report_status(&self, status: ComponentStatus) {
let message = HealthMessage {
component: self.component.clone(),
status,
};
if let Err(err) = self.sender.send(message).await {
warn!("failed to report heath status: {}", err)
}
}

/// Synchronously report as healthy, returns when the message is queued.
/// Must be called more frequently than the configured deadline.
pub fn report_healthy_blocking(&self) {
self.report_status_blocking(ComponentStatus::HealthyUntil(
time::OffsetDateTime::now_utc().add(self.deadline),
))
}

/// Asynchronously report component status, returns when the message is queued.
pub fn report_status_blocking(&self, status: ComponentStatus) {
let message = HealthMessage {
component: self.component.clone(),
status,
};
if let Err(err) = self.sender.blocking_send(message) {
warn!("failed to report heath status: {}", err)
}
}
}

#[derive(Clone)]
pub struct HealthRegistry {
name: String,
components: Arc<RwLock<HashMap<String, ComponentStatus>>>,
sender: mpsc::Sender<HealthMessage>,
}

impl HealthRegistry {
pub fn new(name: &str) -> Self {
let (tx, mut rx) = mpsc::channel::<HealthMessage>(16);
let registry = Self {
name: name.to_owned(),
components: Default::default(),
sender: tx,
};

let components = registry.components.clone();
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if let Ok(mut map) = components.write() {
_ = map.insert(message.component, message.status);
} else {
// Poisoned mutex: Just warn, the probes will fail and the process restart
warn!("poisoned HeathRegistry mutex")
}
}
});

registry
}

/// Registers a new component in the registry. The returned handle should be passed
/// to the component, to allow it to frequently report its health status.
pub async fn register(&self, component: String, deadline: Duration) -> HealthHandle {
let handle = HealthHandle {
component,
deadline,
sender: self.sender.clone(),
};
handle.report_status(ComponentStatus::Starting).await;
handle
}

/// Returns the overall process status, computed from the status of all the components
/// currently registered. Can be used as an axum handler.
pub fn get_status(&self) -> HealthStatus {
let components = self
.components
.read()
.expect("poisoned HeathRegistry mutex");

let result = HealthStatus {
healthy: !components.is_empty(), // unhealthy if no component has registered yet
components: Default::default(),
};
let now = time::OffsetDateTime::now_utc();

let result = components
.iter()
.fold(result, |mut result, (name, status)| {
match status {
ComponentStatus::HealthyUntil(until) => {
if until.gt(&now) {
_ = result.components.insert(name.clone(), status.clone())
} else {
result.healthy = false;
_ = result
.components
.insert(name.clone(), ComponentStatus::Stalled)
}
}
_ => {
result.healthy = false;
_ = result.components.insert(name.clone(), status.clone())
}
}
result
});
match result.healthy {
true => info!("{} health check ok", self.name),
false => warn!("{} health check failed: {:?}", self.name, result.components),
}
result
}
}

#[cfg(test)]
mod tests {
use crate::health::{ComponentStatus, HealthRegistry, HealthStatus};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use std::ops::{Add, Sub};
use time::{Duration, OffsetDateTime};

async fn assert_or_retry<F>(check: F)
where
F: Fn() -> bool,
{
assert_or_retry_for_duration(check, Duration::seconds(5)).await
}

async fn assert_or_retry_for_duration<F>(check: F, timeout: Duration)
where
F: Fn() -> bool,
{
let deadline = OffsetDateTime::now_utc().add(timeout);
while !check() && OffsetDateTime::now_utc().lt(&deadline) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
assert!(check())
}
#[tokio::test]
async fn defaults_to_unhealthy() {
let registry = HealthRegistry::new("liveness");
assert!(!registry.get_status().healthy);
}

#[tokio::test]
async fn one_component() {
let registry = HealthRegistry::new("liveness");

// New components are registered in Starting
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
assert_or_retry(|| registry.get_status().components.len() == 1).await;
let mut status = registry.get_status();
assert!(!status.healthy);
assert_eq!(
status.components.get("one"),
Some(&ComponentStatus::Starting)
);

// Status goes healthy once the component reports
handle.report_healthy().await;
assert_or_retry(|| registry.get_status().healthy).await;
status = registry.get_status();
assert_eq!(status.components.len(), 1);

// Status goes unhealthy if the components says so
handle.report_status(ComponentStatus::Unhealthy).await;
assert_or_retry(|| !registry.get_status().healthy).await;
status = registry.get_status();
assert_eq!(status.components.len(), 1);
assert_eq!(
status.components.get("one"),
Some(&ComponentStatus::Unhealthy)
);
}

#[tokio::test]
async fn staleness_check() {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;

// Status goes healthy once the component reports
handle.report_healthy().await;
assert_or_retry(|| registry.get_status().healthy).await;
let mut status = registry.get_status();
assert_eq!(status.components.len(), 1);

// If the component's ping is too old, it is considered stalled and the healthcheck fails
// FIXME: we should mock the time instead
handle
.report_status(ComponentStatus::HealthyUntil(
OffsetDateTime::now_utc().sub(Duration::seconds(1)),
))
.await;
assert_or_retry(|| !registry.get_status().healthy).await;
status = registry.get_status();
assert_eq!(status.components.len(), 1);
assert_eq!(
status.components.get("one"),
Some(&ComponentStatus::Stalled)
);
}

#[tokio::test]
async fn several_components() {
let registry = HealthRegistry::new("liveness");
let handle1 = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let handle2 = registry
.register("two".to_string(), Duration::seconds(30))
.await;
assert_or_retry(|| registry.get_status().components.len() == 2).await;

// First component going healthy is not enough
handle1.report_healthy().await;
assert_or_retry(|| {
registry.get_status().components.get("one").unwrap() != &ComponentStatus::Starting
})
.await;
assert!(!registry.get_status().healthy);

// Second component going healthy brings the health to green
handle2.report_healthy().await;
assert_or_retry(|| {
registry.get_status().components.get("two").unwrap() != &ComponentStatus::Starting
})
.await;
assert!(registry.get_status().healthy);

// First component going unhealthy takes down the health to red
handle1.report_status(ComponentStatus::Unhealthy).await;
assert_or_retry(|| !registry.get_status().healthy).await;

// First component recovering returns the health to green
handle1.report_healthy().await;
assert_or_retry(|| registry.get_status().healthy).await;

// Second component going unhealthy takes down the health to red
handle2.report_status(ComponentStatus::Unhealthy).await;
assert_or_retry(|| !registry.get_status().healthy).await;
}

#[tokio::test]
async fn into_response() {
let nok = HealthStatus::default().into_response();
assert_eq!(nok.status(), StatusCode::INTERNAL_SERVER_ERROR);

let ok = HealthStatus {
healthy: true,
components: Default::default(),
}
.into_response();
assert_eq!(ok.status(), StatusCode::OK);
}
}
1 change: 1 addition & 0 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod billing_limits;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod prometheus;
pub mod redis;
pub mod router;
Expand Down
4 changes: 4 additions & 0 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use axum::{
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource};

use crate::prometheus::{setup_metrics_recorder, track_metrics};
Expand All @@ -31,6 +32,7 @@ pub fn router<
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
liveness: HealthRegistry,
sink: S,
redis: Arc<R>,
billing: BillingLimiter,
Expand All @@ -54,6 +56,8 @@ pub fn router<
let router = Router::new()
// TODO: use NormalizePathLayer::trim_trailing_slash
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())))
.route("/i/v0/e", post(capture::event).options(capture::options))
.route("/i/v0/e/", post(capture::event).options(capture::options))
.layer(TraceLayer::new_for_http())
Expand Down
Loading