diff --git a/crates/host/src/wasmbus/host_config.rs b/crates/host/src/wasmbus/host_config.rs index a575e50b67..d8a58b7cd2 100644 --- a/crates/host/src/wasmbus/host_config.rs +++ b/crates/host/src/wasmbus/host_config.rs @@ -1,5 +1,7 @@ use crate::OciConfig; +use core::net::SocketAddr; + use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -76,6 +78,8 @@ pub struct Host { pub heartbeat_interval: Option, /// Experimental features that can be enabled in the host pub experimental_features: Features, + /// HTTP administration endpoint address + pub http_admin: Option, } /// Configuration for wasmCloud policy service @@ -126,6 +130,7 @@ impl Default for Host { max_components: MAX_COMPONENTS, heartbeat_interval: None, experimental_features: Features::default(), + http_admin: None, } } } diff --git a/crates/host/src/wasmbus/mod.rs b/crates/host/src/wasmbus/mod.rs index 3740fec5ed..db483ed997 100644 --- a/crates/host/src/wasmbus/mod.rs +++ b/crates/host/src/wasmbus/mod.rs @@ -1,5 +1,7 @@ #![allow(clippy::type_complexity)] +use core::sync::atomic::Ordering; + use std::collections::btree_map::Entry as BTreeMapEntry; use std::collections::hash_map::{self, Entry}; use std::collections::{BTreeMap, HashMap}; @@ -12,6 +14,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::process::Stdio; use std::str::FromStr; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -25,11 +28,13 @@ use cloudevents::{EventBuilder, EventBuilderV10}; use futures::future::Either; use futures::stream::{AbortHandle, Abortable, SelectAll}; use futures::{join, stream, try_join, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use hyper_util::rt::{TokioExecutor, TokioIo}; use nkeys::{KeyPair, KeyPairType, XKey}; use secrecy::Secret; use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpListener; use tokio::sync::{broadcast, mpsc, watch, RwLock, Semaphore}; use tokio::task::{JoinHandle, JoinSet}; use tokio::time::{interval_at, Instant}; @@ -393,6 +398,10 @@ pub struct Host { Arc, Arc, async_nats::Client>>>>>>, /// Experimental features to enable in the host that gate functionality experimental_features: Features, + ready: Arc, + /// A set of host tasks + #[allow(unused)] + tasks: JoinSet<()>, } #[derive(Debug, Serialize, Deserialize, Default)] @@ -890,6 +899,59 @@ impl Host { debug!("Feature flags: {:?}", config.experimental_features); + let mut tasks = JoinSet::new(); + let ready = Arc::::default(); + if let Some(addr) = config.http_admin { + let socket = TcpListener::bind(addr) + .await + .context("failed to start health endpoint")?; + let ready = Arc::clone(&ready); + let svc = hyper::service::service_fn(move |req| { + const OK: &str = r#"{"status":"ok"}"#; + const FAIL: &str = r#"{"status":"failure"}"#; + let ready = Arc::clone(&ready); + async move { + let (http::request::Parts { method, uri, .. }, _) = req.into_parts(); + match (method.as_str(), uri.path_and_query().map(|pq| pq.as_str())) { + ("GET", Some("/livez")) => Ok(http::Response::new( + http_body_util::Full::new(Bytes::from(OK)), + )), + ("GET", Some("/readyz")) => { + if ready.load(Ordering::Relaxed) { + Ok(http::Response::new(http_body_util::Full::new(Bytes::from( + OK, + )))) + } else { + Ok(http::Response::new(http_body_util::Full::new(Bytes::from( + FAIL, + )))) + } + } + (method, Some(path)) => { + Err(format!("method `{method}` not supported for path `{path}`")) + } + (method, None) => Err(format!("method `{method}` not supported")), + } + } + }); + let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + tasks.spawn(async move { + loop { + let stream = match socket.accept().await { + Ok((stream, _)) => stream, + Err(err) => { + error!(?err, "failed to accept health endpoint connection"); + continue; + } + }; + let svc = svc.clone(); + if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await { + error!(?err, "failed to serve connection"); + } + } + }); + } + let host = Host { components: Arc::default(), event_builder, @@ -923,6 +985,8 @@ impl Host { metrics: Arc::new(metrics), max_execution_time: max_execution_time_ms, messaging_links: Arc::default(), + ready: Arc::clone(&ready), + tasks, }; let host = Arc::new(host); @@ -1043,6 +1107,7 @@ impl Host { }) .await; + ready.store(true, Ordering::Relaxed); host.publish_event("host_started", start_evt) .await .context("failed to publish start event")?; @@ -1052,6 +1117,7 @@ impl Host { ); Ok((Arc::clone(&host), async move { + ready.store(false, Ordering::Relaxed); heartbeat_abort.abort(); queue_abort.abort(); data_watch_abort.abort(); @@ -1067,7 +1133,7 @@ impl Host { .context("failed to publish stop event")?; // Before we exit, make sure to flush all messages or we may lose some that we've // thought were sent (like the host_stopped event) - try_join!(host.ctl_nats.flush(), host.rpc_nats.flush(),) + try_join!(host.ctl_nats.flush(), host.rpc_nats.flush()) .context("failed to flush NATS clients")?; Ok(()) })) @@ -1532,6 +1598,7 @@ impl Host { payload: impl AsRef<[u8]>, transport_host_id: &str, ) -> anyhow::Result> { + self.ready.store(false, Ordering::Relaxed); // Allow an empty payload to be used for stopping hosts let timeout = if payload.as_ref().is_empty() { None diff --git a/src/main.rs b/src/main.rs index 5603de8517..eebb2f38d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use core::net::SocketAddr; + use std::collections::{HashMap, HashSet}; use std::env; use std::path::PathBuf; @@ -344,6 +346,10 @@ struct Args { hide = true )] help_markdown: bool, + + #[clap(long = "http-admin", env = "WASMCLOUD_HTTP_ADMIN")] + /// HTTP administration endpoint address + http_admin: Option, } const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); @@ -510,6 +516,7 @@ async fn main() -> anyhow::Result<()> { heartbeat_interval: args.heartbeat_interval, // NOTE(brooks): Summing the feature flags "OR"s the multiple flags together. experimental_features: args.experimental_features.into_iter().sum(), + http_admin: args.http_admin, })) .await .context("failed to initialize host")?;