Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd: 🔨 rework RootCommand::start auto-https logic #3652

Merged
merged 4 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/bin/pd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,13 @@ console-subscriber = "0.2"
metrics-tracing-context = "0.11.0"
metrics-util = "0.13"
clap = { version = "3", features = ["derive", "env"] }
rustls-acme = "0.6"
atty = "0.2"
fs_extra = "1.3.0"

axum-server = { version = "0.4.7", features = ["tls-rustls"] }
rustls = "0.20.9"
rustls-acme = { version = "0.6.0", features = ["axum"] }

[dev-dependencies]
penumbra-proof-params = { path = "../../crypto/proof-params", features = [
"bundled-proving-keys",
Expand Down
128 changes: 84 additions & 44 deletions crates/bin/pd/src/auto_https.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,94 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
//! Automatic HTTPS certificate management facilities.
//!
//! See [`axum_acceptor`] for more information.

use pin_project::pin_project;
use rustls_acme::futures_rustls::server::TlsStream;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream,
use {
anyhow::Error,
futures::Future,
rustls::ServerConfig,
rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig, AcmeState},
std::{fmt::Debug, path::PathBuf, sync::Arc},
};
use tokio_util::compat::Compat;
use tonic::transport::server::Connected;

/// Wrapper type needed to convert between futures_io and tokio traits
#[pin_project]
pub struct Wrapper {
#[pin]
pub inner: Compat<TlsStream<Compat<TcpStream>>>,
}

impl Connected for Wrapper {
type ConnectInfo = <TcpStream as Connected>::ConnectInfo;
/// Protocols supported by this server, in order of preference.
///
/// See [rfc7301] for more info on ALPN.
///
/// [rfc7301]: https://datatracker.ietf.org/doc/html/rfc7301
//
// We also permit HTTP1.1 for backwards-compatibility, specifically for grpc-web.
const ALPN_PROTOCOLS: [&[u8]; 2] = [b"h2", b"http/1.1"];

fn connect_info(&self) -> Self::ConnectInfo {
self.inner.get_ref().get_ref().0.get_ref().connect_info()
}
/// The location of the file-based certificate cache.
// NB: this must not be an absolute path see [Path::join].
const CACHE_DIR: &str = "tokio_rustls_acme_cache";

/// If true, use the production Let's Encrypt environment.
///
/// If false, the ACME resolver will use the [staging environment].
///
/// [staging environment]: https://letsencrypt.org/docs/staging-environment/
const PRODUCTION_LETS_ENCRYPT: bool = true;
cratelyn marked this conversation as resolved.
Show resolved Hide resolved

/// Use ACME to resolve certificates and handle new connections.
///
/// This returns a tuple containing an [`AxumAcceptor`] that may be used with [`axum_server`], and
/// a [`Future`] that represents the background task to poll and log for changes in the
/// certificate environment.
pub fn axum_acceptor(
home: PathBuf,
domain: String,
) -> (AxumAcceptor, impl Future<Output = Result<(), Error>>) {
// Use a file-based cache located within the home directory.
let cache = home.join(CACHE_DIR);
let cache = DirCache::new(cache);

// Create an ACME client, which we will use to resolve certificates.
let state = AcmeConfig::new(vec![domain])
.cache(cache)
.directory_lets_encrypt(PRODUCTION_LETS_ENCRYPT)
.state();

// Define our server configuration, using the ACME certificate resolver.
let mut rustls_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(state.resolver());
rustls_config.alpn_protocols = self::alpn_protocols();
let rustls_config = Arc::new(rustls_config);

// Return our connection acceptor and our background worker task.
let acceptor = state.axum_acceptor(rustls_config.clone());
let worker = self::acme_worker(state);
(acceptor, worker)
}

impl AsyncRead for Wrapper {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.project().inner.poll_read(cx, buf)
/// This function defines the task responsible for handling ACME events.
///
/// This function will never return, unless an error is encountered.
#[tracing::instrument(level = "error", skip_all)]
async fn acme_worker<EC, EA>(mut state: AcmeState<EC, EA>) -> Result<(), anyhow::Error>
where
EC: Debug + 'static,
EA: Debug + 'static,
{
use futures::StreamExt;
loop {
match state.next().await {
Some(Ok(ok)) => tracing::debug!("received acme event: {:?}", ok),
Some(Err(err)) => tracing::error!("acme error: {:?}", err),
None => {
debug_assert!(false, "acme worker unexpectedly reached end-of-stream");
tracing::error!("acme worker unexpectedly reached end-of-stream");
anyhow::bail!("unexpected end-of-stream");
}
}
}
}

impl AsyncWrite for Wrapper {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}
/// Returns a vector of the protocols supported by this server.
///
/// This is a convenience method to retrieve an owned copy of [`ALPN_PROTOCOLS`].
fn alpn_protocols() -> Vec<Vec<u8>> {
ALPN_PROTOCOLS.into_iter().map(<[u8]>::to_vec).collect()
}
73 changes: 34 additions & 39 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use metrics_util::layers::Stack;
use anyhow::Context;
use clap::{Parser, Subcommand};
use cnidarium::{StateDelta, Storage};
use futures::stream::TryStreamExt;
use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer;
use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer;
use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer;
Expand All @@ -30,7 +29,7 @@ use penumbra_tower_trace::remote_addr;
use rand::Rng;
use rand_core::OsRng;
use tendermint_config::net::Address as TendermintAddress;
use tokio::{net::TcpListener, runtime};
use tokio::runtime;
use tonic::transport::Server;
use tower_http::cors::CorsLayer;
use tracing_subscriber::{prelude::*, EnvFilter};
Expand Down Expand Up @@ -292,10 +291,11 @@ async fn main() -> anyhow::Result<()> {
const HTTPS_DEFAULT: SocketAddr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 443);
let default = || {
grpc_auto_https
.is_some()
.then_some(HTTPS_DEFAULT)
.unwrap_or(HTTP_DEFAULT)
if grpc_auto_https.is_some() {
HTTPS_DEFAULT
} else {
HTTP_DEFAULT
}
};
grpc_bind.unwrap_or_else(default)
};
Expand Down Expand Up @@ -478,39 +478,34 @@ async fn main() -> anyhow::Result<()> {
)));
}

let grpc_server = if let Some(domain) = grpc_auto_https {
use pd::auto_https::Wrapper;
use rustls_acme::{caches::DirCache, AcmeConfig};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};

let mut acme_cache = pd_home.clone();
acme_cache.push("rustls_acme_cache");

let bound_listener = TcpListener::bind(grpc_bind)
.await
.context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?;
let listener = TcpListenerStream::new(bound_listener);
// Configure HTTP2 support for the TLS negotiation; we also permit HTTP1.1
// for backwards-compatibility, specifically for grpc-web.
let alpn_config = vec!["h2".into(), "http/1.1".into()];
let tls_incoming = AcmeConfig::new([domain.as_str()])
.cache(DirCache::new(acme_cache))
.directory_lets_encrypt(true) // Use the production LE environment
.incoming(listener.map_ok(|conn| conn.compat()), alpn_config)
.map_ok(|incoming| Wrapper {
inner: incoming.compat(),
});

tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve_with_incoming(tls_incoming))
.expect("failed to spawn grpc server")
} else {
tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve(grpc_bind))
.expect("failed to spawn grpc server")
// Now we drop down a layer of abstraction, from tonic to axum.
//
// TODO(kate): this is where we may attach additional routes upon this router in the
// future. see #3646 for more information.
let router = grpc_server.into_router();
let make_svc = router.into_make_service();

// Now start the GRPC server, initializing an ACME client to use as a certificate
// resolver if auto-https has been enabled.
macro_rules! spawn_grpc_server {
($server:expr) => {
tokio::task::Builder::new()
.name("grpc_server")
.spawn($server.serve(make_svc))
.expect("failed to spawn grpc server")
};
}
let grpc_server = axum_server::bind(grpc_bind);
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
let grpc_server = match grpc_auto_https {
Some(domain) => {
let (acceptor, acme_worker) = pd::auto_https::axum_acceptor(pd_home, domain);
// TODO(kate): we should eventually propagate errors from the ACME worker task.
tokio::spawn(acme_worker);
spawn_grpc_server!(grpc_server.acceptor(acceptor))
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
}
None => {
spawn_grpc_server!(grpc_server)
}
};

// Configure a Prometheus recorder and exporter.
Expand Down
Loading