Skip to content

Commit

Permalink
feat(host): implement health check API
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Dec 23, 2024
1 parent 6d7b07d commit 85ede3c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
5 changes: 5 additions & 0 deletions crates/host/src/wasmbus/host_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::OciConfig;

use core::net::SocketAddr;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -76,6 +78,8 @@ pub struct Host {
pub heartbeat_interval: Option<Duration>,
/// Experimental features that can be enabled in the host
pub experimental_features: Features,
/// HTTP administration endpoint address
pub http_admin: Option<SocketAddr>,
}

/// Configuration for wasmCloud policy service
Expand Down Expand Up @@ -126,6 +130,7 @@ impl Default for Host {
max_components: MAX_COMPONENTS,
heartbeat_interval: None,
experimental_features: Features::default(),
http_admin: None,
}
}
}
69 changes: 68 additions & 1 deletion crates/host/src/wasmbus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::type_complexity)]

use core::sync::atomic::Ordering;

use std::collections::btree_map::Entry as BTreeMapEntry;
use std::collections::hash_map::{self, Entry};
use std::collections::{BTreeMap, HashMap};
Expand All @@ -12,6 +14,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::process::Stdio;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand All @@ -25,11 +28,13 @@ use cloudevents::{EventBuilder, EventBuilderV10};
use futures::future::Either;
use futures::stream::{AbortHandle, Abortable, SelectAll};
use futures::{join, stream, try_join, Stream, StreamExt, TryFutureExt, TryStreamExt};
use hyper_util::rt::{TokioExecutor, TokioIo};
use nkeys::{KeyPair, KeyPairType, XKey};
use secrecy::Secret;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, mpsc, watch, RwLock, Semaphore};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{interval_at, Instant};
Expand Down Expand Up @@ -393,6 +398,10 @@ pub struct Host {
Arc<RwLock<HashMap<Arc<str>, Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>>>>,
/// Experimental features to enable in the host that gate functionality
experimental_features: Features,
ready: Arc<AtomicBool>,
/// A set of host tasks
#[allow(unused)]
tasks: JoinSet<()>,
}

#[derive(Debug, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -890,6 +899,59 @@ impl Host {

debug!("Feature flags: {:?}", config.experimental_features);

let mut tasks = JoinSet::new();
let ready = Arc::<AtomicBool>::default();
if let Some(addr) = config.http_admin {
let socket = TcpListener::bind(addr)
.await
.context("failed to start health endpoint")?;
let ready = Arc::clone(&ready);
let svc = hyper::service::service_fn(move |req| {
const OK: &str = r#"{"status":"ok"}"#;
const FAIL: &str = r#"{"status":"failure"}"#;
let ready = Arc::clone(&ready);
async move {
let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
match (method.as_str(), uri.path_and_query().map(|pq| pq.as_str())) {
("GET", Some("/livez")) => Ok(http::Response::new(
http_body_util::Full::new(Bytes::from(OK)),
)),
("GET", Some("/readyz")) => {
if ready.load(Ordering::Relaxed) {
Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
OK,
))))
} else {
Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
FAIL,
))))
}
}
(method, Some(path)) => {
Err(format!("method `{method}` not supported for path `{path}`"))
}
(method, None) => Err(format!("method `{method}` not supported")),
}
}
});
let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
tasks.spawn(async move {
loop {
let stream = match socket.accept().await {
Ok((stream, _)) => stream,
Err(err) => {
error!(?err, "failed to accept health endpoint connection");
continue;
}
};
let svc = svc.clone();
if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
error!(?err, "failed to serve connection");
}
}
});
}

let host = Host {
components: Arc::default(),
event_builder,
Expand Down Expand Up @@ -923,6 +985,8 @@ impl Host {
metrics: Arc::new(metrics),
max_execution_time: max_execution_time_ms,
messaging_links: Arc::default(),
ready: Arc::clone(&ready),
tasks,
};

let host = Arc::new(host);
Expand Down Expand Up @@ -1043,6 +1107,7 @@ impl Host {
})
.await;

ready.store(true, Ordering::Relaxed);
host.publish_event("host_started", start_evt)
.await
.context("failed to publish start event")?;
Expand All @@ -1052,6 +1117,7 @@ impl Host {
);

Ok((Arc::clone(&host), async move {
ready.store(false, Ordering::Relaxed);
heartbeat_abort.abort();
queue_abort.abort();
data_watch_abort.abort();
Expand All @@ -1067,7 +1133,7 @@ impl Host {
.context("failed to publish stop event")?;
// Before we exit, make sure to flush all messages or we may lose some that we've
// thought were sent (like the host_stopped event)
try_join!(host.ctl_nats.flush(), host.rpc_nats.flush(),)
try_join!(host.ctl_nats.flush(), host.rpc_nats.flush())
.context("failed to flush NATS clients")?;
Ok(())
}))
Expand Down Expand Up @@ -1532,6 +1598,7 @@ impl Host {
payload: impl AsRef<[u8]>,
transport_host_id: &str,
) -> anyhow::Result<CtlResponse<()>> {
self.ready.store(false, Ordering::Relaxed);
// Allow an empty payload to be used for stopping hosts
let timeout = if payload.as_ref().is_empty() {
None
Expand Down
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::net::SocketAddr;

use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
Expand Down Expand Up @@ -344,6 +346,10 @@ struct Args {
hide = true
)]
help_markdown: bool,

#[clap(long = "http-admin", env = "WASMCLOUD_HTTP_ADMIN")]
/// HTTP administration endpoint address
http_admin: Option<SocketAddr>,
}

const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -510,6 +516,7 @@ async fn main() -> anyhow::Result<()> {
heartbeat_interval: args.heartbeat_interval,
// NOTE(brooks): Summing the feature flags "OR"s the multiple flags together.
experimental_features: args.experimental_features.into_iter().sum(),
http_admin: args.http_admin,
}))
.await
.context("failed to initialize host")?;
Expand Down

0 comments on commit 85ede3c

Please sign in to comment.