diff --git a/Cargo.lock b/Cargo.lock index 48c21af86c..e06f30b73e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.1.1", "async-executor", - "async-io 2.2.2", + "async-io 2.3.0", "async-lock 3.3.0", "blocking", "futures-lite 2.2.0", @@ -636,9 +636,9 @@ dependencies = [ [[package]] name = "async-io" -version = "2.2.2" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6afaa937395a620e33dc6a742c593c01aced20aa376ffb0f628121198578ccc7" +checksum = "fb41eb19024a91746eba0773aa5e16036045bbf45733766661099e182ea6a744" dependencies = [ "async-lock 3.3.0", "cfg-if", @@ -646,7 +646,7 @@ dependencies = [ "futures-io", "futures-lite 2.2.0", "parking", - "polling 3.3.1", + "polling 3.3.2", "rustix 0.38.28", "slab", "tracing", @@ -707,7 +707,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" dependencies = [ - "async-io 2.2.2", + "async-io 2.3.0", "async-lock 2.8.0", "atomic-waker", "cfg-if", @@ -1881,9 +1881,9 @@ dependencies = [ [[package]] name = "crypto-mac" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" dependencies = [ "generic-array", "subtle", @@ -4737,6 +4737,7 @@ dependencies = [ "async-stream 0.2.1", "async-trait", "atty", + "axum-server", "base64 0.20.0", "bincode", "blake2b_simd 0.5.11", @@ -4791,6 +4792,7 @@ dependencies = [ "regex", "reqwest", "rocksdb", + "rustls 0.20.9", "rustls-acme", "serde", "serde_json", @@ -6124,9 +6126,9 @@ dependencies = [ [[package]] name = "polling" -version = "3.3.1" +version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf63fa624ab313c11656b4cda960bfc46c410187ad493c41f6ba2d8c1e991c9e" +checksum = "545c980a3880efd47b2e262f6a4bb6daad6555cf3367aa9c4e52895f69537a41" dependencies = [ "cfg-if", "concurrent-queue", @@ -7033,6 +7035,7 @@ dependencies = [ "async-h1", "async-io 1.13.0", "async-trait", + "axum-server", "base64 0.13.1", "chrono", "futures", @@ -7048,6 +7051,8 @@ dependencies = [ "serde_json", "smol", "thiserror", + "tokio", + "tokio-util 0.7.10", "url", "webpki-roots 0.21.1", "x509-parser", @@ -7773,9 +7778,9 @@ dependencies = [ [[package]] name = "subtle" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "subtle-encoding" @@ -8722,9 +8727,9 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" [[package]] name = "universal-hash" -version = "0.4.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" dependencies = [ "generic-array", "subtle", diff --git a/crates/bin/pd/Cargo.toml b/crates/bin/pd/Cargo.toml index 925dd7b4a5..091da7a5c6 100644 --- a/crates/bin/pd/Cargo.toml +++ b/crates/bin/pd/Cargo.toml @@ -122,10 +122,13 @@ console-subscriber = "0.2" metrics-tracing-context = "0.11.0" metrics-util = "0.13" clap = { version = "3", features = ["derive", "env"] } -rustls-acme = "0.6" atty = "0.2" fs_extra = "1.3.0" +axum-server = { version = "0.4.7", features = ["tls-rustls"] } +rustls = "0.20.9" +rustls-acme = { version = "0.6.0", features = ["axum"] } + [dev-dependencies] penumbra-proof-params = { path = "../../crypto/proof-params", features = [ "bundled-proving-keys", diff --git a/crates/bin/pd/src/auto_https.rs b/crates/bin/pd/src/auto_https.rs index 6f3ed67e61..f5412ace93 100644 --- a/crates/bin/pd/src/auto_https.rs +++ b/crates/bin/pd/src/auto_https.rs @@ -1,54 +1,94 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; +//! Automatic HTTPS certificate management facilities. +//! +//! See [`axum_acceptor`] for more information. -use pin_project::pin_project; -use rustls_acme::futures_rustls::server::TlsStream; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - net::TcpStream, +use { + anyhow::Error, + futures::Future, + rustls::ServerConfig, + rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig, AcmeState}, + std::{fmt::Debug, path::PathBuf, sync::Arc}, }; -use tokio_util::compat::Compat; -use tonic::transport::server::Connected; - -/// Wrapper type needed to convert between futures_io and tokio traits -#[pin_project] -pub struct Wrapper { - #[pin] - pub inner: Compat>>, -} -impl Connected for Wrapper { - type ConnectInfo = ::ConnectInfo; +/// Protocols supported by this server, in order of preference. +/// +/// See [rfc7301] for more info on ALPN. +/// +/// [rfc7301]: https://datatracker.ietf.org/doc/html/rfc7301 +// +// We also permit HTTP1.1 for backwards-compatibility, specifically for grpc-web. +const ALPN_PROTOCOLS: [&[u8]; 2] = [b"h2", b"http/1.1"]; - fn connect_info(&self) -> Self::ConnectInfo { - self.inner.get_ref().get_ref().0.get_ref().connect_info() - } +/// The location of the file-based certificate cache. +// NB: this must not be an absolute path see [Path::join]. +const CACHE_DIR: &str = "tokio_rustls_acme_cache"; + +/// If true, use the production Let's Encrypt environment. +/// +/// If false, the ACME resolver will use the [staging environment]. +/// +/// [staging environment]: https://letsencrypt.org/docs/staging-environment/ +const PRODUCTION_LETS_ENCRYPT: bool = true; + +/// Use ACME to resolve certificates and handle new connections. +/// +/// This returns a tuple containing an [`AxumAcceptor`] that may be used with [`axum_server`], and +/// a [`Future`] that represents the background task to poll and log for changes in the +/// certificate environment. +pub fn axum_acceptor( + home: PathBuf, + domain: String, +) -> (AxumAcceptor, impl Future>) { + // Use a file-based cache located within the home directory. + let cache = home.join(CACHE_DIR); + let cache = DirCache::new(cache); + + // Create an ACME client, which we will use to resolve certificates. + let state = AcmeConfig::new(vec![domain]) + .cache(cache) + .directory_lets_encrypt(PRODUCTION_LETS_ENCRYPT) + .state(); + + // Define our server configuration, using the ACME certificate resolver. + let mut rustls_config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_cert_resolver(state.resolver()); + rustls_config.alpn_protocols = self::alpn_protocols(); + let rustls_config = Arc::new(rustls_config); + + // Return our connection acceptor and our background worker task. + let acceptor = state.axum_acceptor(rustls_config.clone()); + let worker = self::acme_worker(state); + (acceptor, worker) } -impl AsyncRead for Wrapper { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - self.project().inner.poll_read(cx, buf) +/// This function defines the task responsible for handling ACME events. +/// +/// This function will never return, unless an error is encountered. +#[tracing::instrument(level = "error", skip_all)] +async fn acme_worker(mut state: AcmeState) -> Result<(), anyhow::Error> +where + EC: Debug + 'static, + EA: Debug + 'static, +{ + use futures::StreamExt; + loop { + match state.next().await { + Some(Ok(ok)) => tracing::debug!("received acme event: {:?}", ok), + Some(Err(err)) => tracing::error!("acme error: {:?}", err), + None => { + debug_assert!(false, "acme worker unexpectedly reached end-of-stream"); + tracing::error!("acme worker unexpectedly reached end-of-stream"); + anyhow::bail!("unexpected end-of-stream"); + } + } } } -impl AsyncWrite for Wrapper { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().inner.poll_write(cx, buf) - } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_flush(cx) - } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_shutdown(cx) - } +/// Returns a vector of the protocols supported by this server. +/// +/// This is a convenience method to retrieve an owned copy of [`ALPN_PROTOCOLS`]. +fn alpn_protocols() -> Vec> { + ALPN_PROTOCOLS.into_iter().map(<[u8]>::to_vec).collect() } diff --git a/crates/bin/pd/src/main.rs b/crates/bin/pd/src/main.rs index 8315d70efb..468aa79c3b 100644 --- a/crates/bin/pd/src/main.rs +++ b/crates/bin/pd/src/main.rs @@ -10,7 +10,6 @@ use metrics_util::layers::Stack; use anyhow::Context; use clap::{Parser, Subcommand}; use cnidarium::{StateDelta, Storage}; -use futures::stream::TryStreamExt; use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer; use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer; use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer; @@ -30,7 +29,7 @@ use penumbra_tower_trace::remote_addr; use rand::Rng; use rand_core::OsRng; use tendermint_config::net::Address as TendermintAddress; -use tokio::{net::TcpListener, runtime}; +use tokio::runtime; use tonic::transport::Server; use tower_http::cors::CorsLayer; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -292,10 +291,11 @@ async fn main() -> anyhow::Result<()> { const HTTPS_DEFAULT: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 443); let default = || { - grpc_auto_https - .is_some() - .then_some(HTTPS_DEFAULT) - .unwrap_or(HTTP_DEFAULT) + if grpc_auto_https.is_some() { + HTTPS_DEFAULT + } else { + HTTP_DEFAULT + } }; grpc_bind.unwrap_or_else(default) }; @@ -478,39 +478,34 @@ async fn main() -> anyhow::Result<()> { ))); } - let grpc_server = if let Some(domain) = grpc_auto_https { - use pd::auto_https::Wrapper; - use rustls_acme::{caches::DirCache, AcmeConfig}; - use tokio_stream::wrappers::TcpListenerStream; - use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; - - let mut acme_cache = pd_home.clone(); - acme_cache.push("rustls_acme_cache"); - - let bound_listener = TcpListener::bind(grpc_bind) - .await - .context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?; - let listener = TcpListenerStream::new(bound_listener); - // Configure HTTP2 support for the TLS negotiation; we also permit HTTP1.1 - // for backwards-compatibility, specifically for grpc-web. - let alpn_config = vec!["h2".into(), "http/1.1".into()]; - let tls_incoming = AcmeConfig::new([domain.as_str()]) - .cache(DirCache::new(acme_cache)) - .directory_lets_encrypt(true) // Use the production LE environment - .incoming(listener.map_ok(|conn| conn.compat()), alpn_config) - .map_ok(|incoming| Wrapper { - inner: incoming.compat(), - }); - - tokio::task::Builder::new() - .name("grpc_server") - .spawn(grpc_server.serve_with_incoming(tls_incoming)) - .expect("failed to spawn grpc server") - } else { - tokio::task::Builder::new() - .name("grpc_server") - .spawn(grpc_server.serve(grpc_bind)) - .expect("failed to spawn grpc server") + // Now we drop down a layer of abstraction, from tonic to axum. + // + // TODO(kate): this is where we may attach additional routes upon this router in the + // future. see #3646 for more information. + let router = grpc_server.into_router(); + let make_svc = router.into_make_service(); + + // Now start the GRPC server, initializing an ACME client to use as a certificate + // resolver if auto-https has been enabled. + macro_rules! spawn_grpc_server { + ($server:expr) => { + tokio::task::Builder::new() + .name("grpc_server") + .spawn($server.serve(make_svc)) + .expect("failed to spawn grpc server") + }; + } + let grpc_server = axum_server::bind(grpc_bind); + let grpc_server = match grpc_auto_https { + Some(domain) => { + let (acceptor, acme_worker) = pd::auto_https::axum_acceptor(pd_home, domain); + // TODO(kate): we should eventually propagate errors from the ACME worker task. + tokio::spawn(acme_worker); + spawn_grpc_server!(grpc_server.acceptor(acceptor)) + } + None => { + spawn_grpc_server!(grpc_server) + } }; // Configure a Prometheus recorder and exporter.