diff --git a/Cargo.toml b/Cargo.toml index 9357a67..5241c0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ hex = "0.4.3" itertools = "0.12.0" nom = "7.1.3" reqwest = { version = "0.11.23", default-features = false } -sec = { version = "1.0.0", features = ["deserialize"] } +sec = { version = "1.0.0", features = [ "deserialize", "serialize" ] } serde = { version = "1.0.193", features = [ "derive" ] } serde_json = "1.0.108" sha2 = "0.10.8" diff --git a/README.md b/README.md index ab19055..a512c45 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,13 @@ # rockslide +## Container runtime configuration + +``` +curl -u :devpw localhost:3000/_rockslide/config/foo/bar/prod > foobarprod.toml +curl -v -X PUT -u :devpw localhost:3000/_rockslide/config/foo/bar/prod --data-binar +y "@foobarprod.toml" +``` + ## macOS suppport macOS is supported as a tier 2 platform to develop rockslide itself, although currently completely untested for production use. [podman can run on Mac OS X](https://podman.io/docs/installation), where it will launch a Linux virtual machine to run containers. The `rockslide` application itself and its supporting nix-derivation all account for being built on macOS. @@ -21,4 +29,4 @@ podman run -it debian:latest /bin/sh -c 'echo everything is working fine' `rockslide` will check an envvar `PODMAN_IS_REMOTE`, if it is `true`, it will assume a remote instance and act accordingly. This envvar is set to `true` automatically when running `nix-shell` on a macOS machine. -With these prerequisites fulfilled, `rockslide` should operate normally as it does on Linux. \ No newline at end of file +With these prerequisites fulfilled, `rockslide` should operate normally as it does on Linux. diff --git a/src/config.rs b/src/config.rs index dc10fef..b72d16d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,7 +6,7 @@ use sec::Secret; use serde::Deserialize; use crate::{ - podman_is_remote, + podman::podman_is_remote, registry::{AuthProvider, UnverifiedCredentials}, }; diff --git a/src/container_orchestrator.rs b/src/container_orchestrator.rs new file mode 100644 index 0000000..5eacd0d --- /dev/null +++ b/src/container_orchestrator.rs @@ -0,0 +1,420 @@ +use std::collections::HashMap; +use std::fs; +use std::net::Ipv4Addr; +use std::path::PathBuf; +use std::str::FromStr; +use std::{net::SocketAddr, path::Path, sync::Arc}; + +use crate::podman::podman_is_remote; +use crate::{ + podman::Podman, + registry::{storage::ImageLocation, ManifestReference, Reference, RegistryHooks}, + reverse_proxy::ReverseProxy, +}; + +use anyhow::Context; +use axum::async_trait; +use axum::body::Body; +use axum::http::header::CONTENT_TYPE; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use sec::Secret; +use serde::{Deserialize, Deserializer, Serialize}; +use tracing::{debug, error, info}; + +macro_rules! try_quiet { + ($ex:expr, $msg:expr) => { + match $ex { + Ok(v) => v, + Err(err) => { + error!(%err, $msg); + return; + } + } + }; +} + +pub(crate) struct ContainerOrchestrator { + podman: Podman, + reverse_proxy: Arc, + local_addr: SocketAddr, + registry_credentials: (String, Secret), + configs_dir: PathBuf, +} + +#[derive(Clone, Debug)] +pub(crate) struct PublishedContainer { + host_addr: SocketAddr, + manifest_reference: ManifestReference, + config: Arc, +} + +impl PublishedContainer { + pub(crate) fn manifest_reference(&self) -> &ManifestReference { + &self.manifest_reference + } + + pub(crate) fn host_addr(&self) -> SocketAddr { + self.host_addr + } + + pub(crate) fn config(&self) -> &Arc { + &self.config + } +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] +pub(crate) struct RuntimeConfig { + #[serde(default)] + pub(crate) http: Http, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] +pub(crate) struct Http { + #[serde(default)] + pub(crate) access: Option>>, +} + +impl IntoResponse for RuntimeConfig { + fn into_response(self) -> axum::response::Response { + toml::to_string_pretty(&self) + .ok() + .and_then(|config_toml| { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/toml") + .body(Body::from(config_toml)) + .ok() + }) + .unwrap_or_else(|| StatusCode::INTERNAL_SERVER_ERROR.into_response()) + } +} + +impl ContainerOrchestrator { + pub(crate) fn new, Q: AsRef>( + podman_path: P, + reverse_proxy: Arc, + local_addr: SocketAddr, + registry_credentials: (String, Secret), + runtime_dir: Q, + ) -> anyhow::Result { + let podman = Podman::new(podman_path, podman_is_remote()); + + let configs_dir = runtime_dir + .as_ref() + .canonicalize() + .context("could not canonicalize runtime config dir")? + .join("configs"); + + if !configs_dir.exists() { + fs::create_dir(&configs_dir).context("could not create config dir")?; + } + + Ok(Self { + podman, + reverse_proxy, + local_addr, + registry_credentials, + configs_dir, + }) + } + + fn config_path(&self, manifest_reference: &ManifestReference) -> PathBuf { + let location = manifest_reference.location(); + + self.configs_dir + .join(location.repository()) + .join(location.image()) + .join( + manifest_reference + .reference() + .to_string() + .trim_start_matches(':'), + ) + } + + pub(crate) async fn load_config( + &self, + manifest_reference: &ManifestReference, + ) -> anyhow::Result { + let config_path = self.config_path(manifest_reference); + + if !config_path.exists() { + return Ok(Default::default()); + } + + let raw = tokio::fs::read_to_string(config_path) + .await + .context("could not read config")?; + + toml::from_str(&raw).context("could not parse configuration") + } + + pub(crate) async fn save_config( + &self, + manifest_reference: &ManifestReference, + config: &RuntimeConfig, + ) -> anyhow::Result { + let config_path = self.config_path(manifest_reference); + let parent_dir = config_path + .parent() + .context("could not determine parent path")?; + + if !parent_dir.exists() { + tokio::fs::create_dir_all(parent_dir) + .await + .context("could not create parent path")?; + } + + let toml = toml::to_string_pretty(config).context("could not serialize new config")?; + + // TODO: Do atomic replace. + tokio::fs::write(config_path, toml) + .await + .context("failed to write new toml config")?; + + // Read back to verify. + self.load_config(manifest_reference).await + } + + async fn fetch_managed_containers(&self, all: bool) -> anyhow::Result> { + debug!("refreshing running containers"); + + let value = self.podman.ps(all).await?; + let all_containers: Vec = serde_json::from_value(value)?; + + debug!(?all_containers, "fetched containers"); + + let mut rv = Vec::new(); + for container in all_containers { + // TODO: Just log error instead of returning. + if let Some(pc) = self.load_managed_container(container).await? { + rv.push(pc); + } + } + Ok(rv) + } + + async fn load_managed_container( + &self, + container_json: ContainerJson, + ) -> anyhow::Result> { + let manifest_reference = if let Some(val) = container_json.manifest_reference() { + val + } else { + return Ok(None); + }; + + let port_mapping = if let Some(val) = container_json.active_published_port() { + val + } else { + return Ok(None); + }; + + let config = Arc::new(self.load_config(&manifest_reference).await?); + + Ok(Some(PublishedContainer { + host_addr: port_mapping + .get_host_listening_addr() + .context("could not get host listening address")?, + manifest_reference, + config, + })) + } + + pub(crate) async fn updated_published_set(&self) { + let running: Vec<_> = try_quiet!( + self.fetch_managed_containers(false).await, + "could not fetch running containers" + ); + + info!(?running, "updating running container set"); + self.reverse_proxy + .update_containers(running.into_iter()) + .await; + } + + async fn synchronize_container_state(&self, manifest_reference: &ManifestReference) { + // TODO: Make configurable? + let production_tag = "prod"; + + if matches!(manifest_reference.reference(), Reference::Tag(tag) if tag == production_tag) { + let location = manifest_reference.location(); + let name = format!("rockslide-{}-{}", location.repository(), location.image()); + + info!(%name, "removing (potentially nonexistant) container"); + try_quiet!( + self.podman.rm(&name, true).await, + "failed to remove container" + ); + + let image_url = format!( + "{}/{}/{}:{}", + self.local_addr, + location.repository(), + location.image(), + production_tag + ); + + info!(%name, "loggging in"); + try_quiet!( + self.podman + .login( + &self.registry_credentials.0, + self.registry_credentials.1.as_str(), + self.local_addr.to_string().as_ref(), + false + ) + .await, + "failed to login to local registry" + ); + + // We always pull the container to ensure we have the latest version. + info!(%name, "pulling container"); + try_quiet!( + self.podman.pull(&image_url).await, + "failed to pull container" + ); + + info!(%name, "starting container"); + try_quiet!( + self.podman + .run(&image_url) + .rm() + .rmi() + .name(name) + .tls_verify(false) + .publish("127.0.0.1::8000") + .env("PORT", "8000") + .execute() + .await, + "failed to launch container" + ); + + info!(?manifest_reference, "new production image running"); + } + } + + pub(crate) async fn synchronize_all(&self) -> anyhow::Result<()> { + for container in self.fetch_managed_containers(true).await? { + self.synchronize_container_state(container.manifest_reference()) + .await; + } + + Ok(()) + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +#[allow(dead_code)] +struct ContainerJson { + id: String, + image: String, + names: Vec, + #[serde(deserialize_with = "nullable_array")] + ports: Vec, +} + +impl ContainerJson { + fn image_location(&self) -> Option { + const PREFIX: &str = "rockslide-"; + + for name in &self.names { + if let Some(subname) = name.strip_prefix(PREFIX) { + if let Some((left, right)) = subname.split_once('-') { + return Some(ImageLocation::new(left.to_owned(), right.to_owned())); + } + } + } + + None + } + + fn image_tag(&self) -> Option { + let idx = self.image.rfind(':')?; + + // TODO: Handle Reference::Digest here. + Some(Reference::Tag(self.image[idx..].to_owned())) + } + + fn manifest_reference(&self) -> Option { + Some(ManifestReference::new( + self.image_location()?, + self.image_tag()?, + )) + } + + fn active_published_port(&self) -> Option<&PortMapping> { + self.ports.get(0) + } +} + +#[async_trait] +impl RegistryHooks for Arc { + async fn on_manifest_uploaded(&self, manifest_reference: &ManifestReference) { + self.synchronize_container_state(manifest_reference).await; + + self.updated_published_set().await; + } +} + +fn nullable_array<'de, D, T>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, +{ + let opt: Option> = Deserialize::deserialize(deserializer)?; + + Ok(opt.unwrap_or_default()) +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +struct PortMapping { + host_ip: String, + container_port: u16, + host_port: u16, + range: u16, + protocol: String, +} + +impl PortMapping { + fn get_host_listening_addr(&self) -> Option { + let ip = Ipv4Addr::from_str(&self.host_ip).ok()?; + + Some((ip, self.host_port).into()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use sec::Secret; + + use crate::container_orchestrator::Http; + + use super::RuntimeConfig; + + #[test] + fn can_parse_sample_configs() { + let example = r#" + [http] + access = { someuser = "somepw" } + "#; + + let parsed: RuntimeConfig = toml::from_str(example).expect("should parse"); + + let mut pw_map = HashMap::new(); + pw_map.insert("someuser".to_owned(), Secret::new("somepw".to_owned())); + assert_eq!( + parsed, + RuntimeConfig { + http: Http { + access: Some(pw_map) + } + } + ) + } +} diff --git a/src/main.rs b/src/main.rs index 82bc784..e4dc80f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,230 +1,26 @@ mod config; -mod podman; +mod container_orchestrator; +pub(crate) mod podman; pub(crate) mod registry; mod reverse_proxy; use std::{ env, fs, - net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}, - path::Path, - str::FromStr, + net::{IpAddr, SocketAddr, ToSocketAddrs}, sync::Arc, }; use anyhow::Context; -use axum::{async_trait, Router}; +use axum::Router; use config::Config; use gethostname::gethostname; -use podman::Podman; -use registry::{ - storage::ImageLocation, ContainerRegistry, ManifestReference, Reference, RegistryHooks, -}; -use reverse_proxy::{PublishedContainer, ReverseProxy}; -use sec::Secret; -use serde::{Deserialize, Deserializer}; +use registry::ContainerRegistry; +use reverse_proxy::ReverseProxy; use tower_http::trace::TraceLayer; -use tracing::{debug, error, info}; +use tracing::{debug, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -macro_rules! try_quiet { - ($ex:expr, $msg:expr) => { - match $ex { - Ok(v) => v, - Err(err) => { - error!(%err, $msg); - return; - } - } - }; -} - -struct PodmanHook { - podman: Podman, - reverse_proxy: Arc, - local_addr: SocketAddr, - registry_credentials: (String, Secret), -} - -impl PodmanHook { - fn new>( - podman_path: P, - reverse_proxy: Arc, - local_addr: SocketAddr, - registry_credentials: (String, Secret), - ) -> Self { - let podman = Podman::new(podman_path, podman_is_remote()); - Self { - podman, - reverse_proxy, - local_addr, - registry_credentials, - } - } - - async fn fetch_running_containers(&self) -> anyhow::Result> { - debug!("refreshing running containers"); - - let value = self.podman.ps(false).await?; - let rv: Vec = serde_json::from_value(value)?; - - debug!(?rv, "fetched containers"); - - Ok(rv) - } - - async fn updated_published_set(&self) { - let running: Vec<_> = try_quiet!( - self.fetch_running_containers().await, - "could not fetch running containers" - ) - .iter() - .filter_map(ContainerJson::published_container) - .collect(); - - info!(?running, "updating running container set"); - self.reverse_proxy - .update_containers(running.into_iter()) - .await; - } -} - -pub(crate) fn podman_is_remote() -> bool { - env::var("PODMAN_IS_REMOTE").unwrap_or_default() == "true" -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -#[allow(dead_code)] -struct ContainerJson { - id: String, - names: Vec, - #[serde(deserialize_with = "nullable_array")] - ports: Vec, -} - -impl ContainerJson { - fn image_location(&self) -> Option { - const PREFIX: &str = "rockslide-"; - - for name in &self.names { - if let Some(subname) = name.strip_prefix(PREFIX) { - if let Some((left, right)) = subname.split_once('-') { - return Some(ImageLocation::new(left.to_owned(), right.to_owned())); - } - } - } - - None - } - - fn active_published_port(&self) -> Option<&PortMapping> { - self.ports.get(0) - } - - fn published_container(&self) -> Option { - let image_location = self.image_location()?; - let port_mapping = self.active_published_port()?; - - Some(PublishedContainer::new( - port_mapping.get_host_listening_addr()?, - image_location, - )) - } -} - -fn nullable_array<'de, D, T>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, - T: Deserialize<'de>, -{ - let opt: Option> = Deserialize::deserialize(deserializer)?; - - Ok(opt.unwrap_or_default()) -} - -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct PortMapping { - host_ip: String, - container_port: u16, - host_port: u16, - range: u16, - protocol: String, -} - -impl PortMapping { - fn get_host_listening_addr(&self) -> Option { - let ip = Ipv4Addr::from_str(&self.host_ip).ok()?; - - Some((ip, self.host_port).into()) - } -} - -#[async_trait] -impl RegistryHooks for PodmanHook { - async fn on_manifest_uploaded(&self, manifest_reference: &ManifestReference) { - // TODO: Make configurable? - let production_tag = "prod"; - - if matches!(manifest_reference.reference(), Reference::Tag(tag) if tag == production_tag) { - let location = manifest_reference.location(); - let name = format!("rockslide-{}-{}", location.repository(), location.image()); - - info!(%name, "removing (potentially nonexistant) container"); - try_quiet!( - self.podman.rm(&name, true).await, - "failed to remove container" - ); - - let image_url = format!( - "{}/{}/{}:{}", - self.local_addr, - location.repository(), - location.image(), - production_tag - ); - - info!(%name, "loggging in"); - try_quiet!( - self.podman - .login( - &self.registry_credentials.0, - self.registry_credentials.1.as_str(), - self.local_addr.to_string().as_ref(), - false - ) - .await, - "failed to login to local registry" - ); - - // We always pull the container to ensure we have the latest version. - info!(%name, "pulling container"); - try_quiet!( - self.podman.pull(&image_url).await, - "failed to pull container" - ); - - info!(%name, "starting container"); - try_quiet!( - self.podman - .run(&image_url) - .rm() - .rmi() - .name(name) - .tls_verify(false) - .publish("127.0.0.1::8000") - .env("PORT", "8000") - .execute() - .await, - "failed to launch container" - ); - - info!(?manifest_reference, "new production image uploaded"); - - self.updated_published_set().await; - } - } -} +use crate::{container_orchestrator::ContainerOrchestrator, podman::podman_is_remote}; fn load_config() -> anyhow::Result { match env::args().len() { @@ -259,6 +55,9 @@ async fn main() -> anyhow::Result<()> { debug!(?cfg, "loaded configuration"); + let rockslide_pw = cfg.rockslide.master_key.as_secret_string(); + let auth_provider = Arc::new(cfg.rockslide.master_key); + let local_ip: IpAddr = if podman_is_remote() { info!("podman is remote, trying to guess IP address"); let local_hostname = gethostname(); @@ -278,24 +77,27 @@ async fn main() -> anyhow::Result<()> { }; let local_addr = SocketAddr::from((local_ip, cfg.reverse_proxy.http_bind.port())); + // TODO: Fix (see #34). + let local_addr = SocketAddr::from(([127, 0, 0, 1], cfg.reverse_proxy.http_bind.port())); info!(%local_addr, "guessing local registry address"); - let reverse_proxy = ReverseProxy::new(); + let reverse_proxy = ReverseProxy::new(auth_provider.clone()); - let credentials = ( - "rockslide-podman".to_owned(), - cfg.rockslide.master_key.as_secret_string(), - ); - let hooks = PodmanHook::new( + let credentials = ("rockslide-podman".to_owned(), rockslide_pw); + let orchestrator = Arc::new(ContainerOrchestrator::new( &cfg.containers.podman_path, reverse_proxy.clone(), local_addr, credentials, - ); - hooks.updated_published_set().await; + &cfg.registry.storage_path, + )?); + reverse_proxy.set_orchestrator(orchestrator.clone()); + + // TODO: Probably should not fail if synchronization fails. + orchestrator.synchronize_all().await?; + orchestrator.updated_published_set().await; - let registry = - ContainerRegistry::new(&cfg.registry.storage_path, hooks, cfg.rockslide.master_key)?; + let registry = ContainerRegistry::new(&cfg.registry.storage_path, orchestrator, auth_provider)?; let app = Router::new() .merge(registry.make_router()) diff --git a/src/podman.rs b/src/podman.rs index dd06ffd..a54bd4c 100644 --- a/src/podman.rs +++ b/src/podman.rs @@ -1,4 +1,5 @@ use std::{ + env, fmt::Display, io::{self, Seek, SeekFrom, Write}, path::{Path, PathBuf}, @@ -55,7 +56,7 @@ impl Podman { let mut pw_file = tempfile()?; - pw_file.write(password.reveal().as_bytes())?; + pw_file.write_all(password.reveal().as_bytes())?; pw_file.seek(SeekFrom::Start(0))?; cmd.stdin(Stdio::from(pw_file)); @@ -89,8 +90,8 @@ impl Podman { Ok(()) } - pub(crate) fn run(&self, image_url: &str) -> StartCommand { - StartCommand { + pub(crate) fn run(&self, image_url: &str) -> RunCommand { + RunCommand { podman: self, image_url: image_url.to_owned(), rm: false, @@ -129,7 +130,7 @@ impl Podman { } } -pub(crate) struct StartCommand<'a> { +pub(crate) struct RunCommand<'a> { podman: &'a Podman, env: Vec<(String, String)>, image_url: String, @@ -140,7 +141,7 @@ pub(crate) struct StartCommand<'a> { publish: Vec, } -impl<'a> StartCommand<'a> { +impl<'a> RunCommand<'a> { pub fn env, S2: Into>(&mut self, var: S1, value: S2) -> &mut Self { self.env.push((var.into(), value.into())); self @@ -286,3 +287,7 @@ async fn fetch_json(cmd: Command) -> Result { Ok(parsed) } + +pub(crate) fn podman_is_remote() -> bool { + env::var("PODMAN_IS_REMOTE").unwrap_or_default() == "true" +} diff --git a/src/registry.rs b/src/registry.rs index 6a4a4fd..8227228 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -92,26 +92,22 @@ impl IntoResponse for AppError { pub(crate) struct ContainerRegistry { realm: String, - auth_provider: Box, + auth_provider: Arc, storage: Box, hooks: Box, } impl ContainerRegistry { - pub(crate) fn new< - P: AsRef, - T: RegistryHooks + 'static, - A: AuthProvider + 'static, - >( + pub(crate) fn new, T: RegistryHooks + 'static>( storage_path: P, - hooks: T, - auth_provider: A, + orchestrator: T, + auth_provider: Arc, ) -> Result, FilesystemStorageError> { Ok(Arc::new(ContainerRegistry { realm: "ContainerRegistry".to_string(), - auth_provider: Box::new(auth_provider), + auth_provider: auth_provider, storage: Box::new(FilesystemStorage::new(storage_path)?), - hooks: Box::new(hooks), + hooks: Box::new(orchestrator), })) } @@ -516,7 +512,7 @@ mod tests { let tmp = TempDir::new("rockslide-test").expect("could not create temporary directory"); let password = "random-test-password".to_owned(); - let master_key = MasterKey::new_key(password.clone()); + let master_key = Arc::new(MasterKey::new_key(password.clone())); let registry = ContainerRegistry::new(tmp.as_ref(), (), master_key) .expect("should not fail to create app"); diff --git a/src/registry/auth.rs b/src/registry/auth.rs index 8f7887a..ff677af 100644 --- a/src/registry/auth.rs +++ b/src/registry/auth.rs @@ -1,4 +1,4 @@ -use std::{str, sync::Arc}; +use std::{collections::HashMap, str, sync::Arc}; use axum::{ async_trait, @@ -96,6 +96,30 @@ impl AuthProvider for bool { } } +#[async_trait] +impl AuthProvider for HashMap> { + async fn check_credentials( + &self, + UnverifiedCredentials { + username: unverified_username, + password: unverified_password, + }: &UnverifiedCredentials, + ) -> bool { + if let Some(correct_password) = self.get(unverified_username) { + // TODO: Use constant-time compare. Maybe add to `sec`? + if correct_password == unverified_password { + return true; + } + } + + false + } + + async fn has_access_to(&self, _username: &str, _namespace: &str, _image: &str) -> bool { + true + } +} + #[async_trait] impl AuthProvider for Box where diff --git a/src/registry/storage.rs b/src/registry/storage.rs index c1b125a..8d9a1ed 100644 --- a/src/registry/storage.rs +++ b/src/registry/storage.rs @@ -55,7 +55,7 @@ pub(crate) struct ImageLocation { image: String, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub(crate) struct ManifestReference { #[serde(flatten)] location: ImageLocation, @@ -97,7 +97,7 @@ impl ImageLocation { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum Reference { Tag(String), Digest(Digest), @@ -150,6 +150,15 @@ impl Reference { } } +impl Display for Reference { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Reference::Tag(tag) => Display::fmt(tag, f), + Reference::Digest(digest) => Display::fmt(digest, f), + } + } +} + #[derive(Debug, Error)] pub(crate) enum Error { #[error("given upload does not exist")] diff --git a/src/reverse_proxy.rs b/src/reverse_proxy.rs index 8415e3f..a23e78c 100644 --- a/src/reverse_proxy.rs +++ b/src/reverse_proxy.rs @@ -2,9 +2,8 @@ use std::{ collections::HashMap, fmt::{self, Display}, mem, - net::SocketAddr, str::{self, FromStr}, - sync::Arc, + sync::{Arc, OnceLock}, }; use axum::{ @@ -13,26 +12,27 @@ use axum::{ http::{ header::HOST, uri::{Authority, Parts, PathAndQuery, Scheme}, - StatusCode, Uri, + Method, StatusCode, Uri, }, response::{IntoResponse, Response}, - Router, + RequestExt, Router, }; use itertools::Itertools; use tokio::sync::RwLock; use tracing::{trace, warn}; -use crate::registry::storage::ImageLocation; +use crate::{ + container_orchestrator::{ContainerOrchestrator, PublishedContainer, RuntimeConfig}, + registry::{ + storage::ImageLocation, AuthProvider, ManifestReference, Reference, UnverifiedCredentials, + }, +}; pub(crate) struct ReverseProxy { + auth_provider: Arc, client: reqwest::Client, routing_table: RwLock, -} - -#[derive(Clone, Debug)] -pub(crate) struct PublishedContainer { - host_addr: SocketAddr, - image_location: ImageLocation, + orchestrator: OnceLock>, } #[derive(Debug, Default)] @@ -73,17 +73,29 @@ impl PartialEq for Domain { } } +#[derive(Debug)] +enum Destination { + ReverseProxied { + uri: Uri, + config: Arc, + }, + Internal(Uri), + NotFound, +} + impl RoutingTable { fn from_containers(containers: impl IntoIterator) -> Self { let mut path_maps = HashMap::new(); let mut domain_maps = HashMap::new(); for container in containers { - if let Some(domain) = Domain::new(&container.image_location.repository()) { + if let Some(domain) = + Domain::new(container.manifest_reference().location().repository()) + { domain_maps.insert(domain, container.clone()); } - path_maps.insert(container.image_location.clone(), container); + path_maps.insert(container.manifest_reference().location().clone(), container); } Self { @@ -92,8 +104,7 @@ impl RoutingTable { } } - // TODO: Consider return a `Uri`` instead. - fn get_destination_uri_from_request(&self, request: &Request) -> Option { + fn get_destination_uri_from_request(&self, request: &Request) -> Destination { let req_uri = request.uri(); // First, attempt to match a domain. @@ -118,17 +129,25 @@ impl RoutingTable { let mut parts = req_uri.clone().into_parts(); parts.scheme = Some(Scheme::HTTP); parts.authority = Some( - Authority::from_str(&pc.host_addr.to_string()) + Authority::from_str(&pc.host_addr().to_string()) .expect("SocketAddr should never fail to convert to Authority"), ); - return Some(Uri::from_parts(parts).expect("should not have invalidated Uri")); + return Destination::ReverseProxied { + uri: Uri::from_parts(parts).expect("should not have invalidated Uri"), + config: pc.config().clone(), + }; } // Matching a domain did not succeed, let's try with a path. + // First, we attempt to match a special `_rockslide` path: + if req_uri.path().starts_with("/_rockslide") { + return Destination::Internal(req_uri.to_owned()); + } + // Reconstruct image location from path segments, keeping remainder intact. if let Some((image_location, remainder)) = split_path_base_url(req_uri) { if let Some(pc) = self.get_path_route(&image_location) { - let container_addr = pc.host_addr; + let container_addr = pc.host_addr(); let mut dest_path_and_query = remainder; @@ -146,19 +165,28 @@ impl RoutingTable { parts.authority = Some(Authority::from_str(&container_addr.to_string()).unwrap()); parts.path_and_query = Some(PathAndQuery::from_str(&dest_path_and_query).unwrap()); - return Some(Uri::from_parts(parts).unwrap()); + return Destination::ReverseProxied { + uri: Uri::from_parts(parts).unwrap(), + config: pc.config().clone(), + }; } } - None + Destination::NotFound } } #[derive(Debug)] enum AppError { NoSuchContainer, + InternalUrlInvalid, AssertionFailed(&'static str), NonUtf8Header, + AuthFailure { + realm: &'static str, + status: StatusCode, + }, + InvalidPayload, Internal(anyhow::Error), } @@ -167,8 +195,11 @@ impl Display for AppError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { AppError::NoSuchContainer => f.write_str("no such container"), + AppError::InternalUrlInvalid => f.write_str("internal url invalid"), AppError::AssertionFailed(msg) => f.write_str(msg), AppError::NonUtf8Header => f.write_str("a header contained non-utf8 data"), + AppError::AuthFailure { .. } => f.write_str("authentication missing or not present"), + AppError::InvalidPayload => f.write_str("invalid payload"), AppError::Internal(err) => Display::fmt(err, f), } } @@ -189,8 +220,15 @@ impl IntoResponse for AppError { fn into_response(self) -> Response { match self { AppError::NoSuchContainer => StatusCode::NOT_FOUND.into_response(), + AppError::InternalUrlInvalid => StatusCode::NOT_FOUND.into_response(), AppError::AssertionFailed(msg) => (StatusCode::NOT_FOUND, msg).into_response(), AppError::NonUtf8Header => StatusCode::BAD_REQUEST.into_response(), + AppError::AuthFailure { realm, status } => Response::builder() + .status(status) + .header("WWW-Authenticate", format!("basic realm={realm}")) + .body(Body::empty()) + .expect("should never fail to build auth failure response"), + AppError::InvalidPayload => StatusCode::BAD_REQUEST.into_response(), AppError::Internal(err) => { (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response() } @@ -198,20 +236,13 @@ impl IntoResponse for AppError { } } -impl PublishedContainer { - pub(crate) fn new(host_addr: SocketAddr, image_location: ImageLocation) -> Self { - Self { - host_addr, - image_location, - } - } -} - impl ReverseProxy { - pub(crate) fn new() -> Arc { + pub(crate) fn new(auth_provider: Arc) -> Arc { Arc::new(ReverseProxy { + auth_provider, client: reqwest::Client::new(), routing_table: RwLock::new(Default::default()), + orchestrator: OnceLock::new(), }) } @@ -228,6 +259,14 @@ impl ReverseProxy { let mut guard = self.routing_table.write().await; mem::swap(&mut *guard, &mut routing_table); } + + pub(crate) fn set_orchestrator(&self, orchestrator: Arc) -> &Self { + self.orchestrator + .set(orchestrator) + .map_err(|_| ()) + .expect("set already set orchestrator"); + self + } } fn split_path_base_url(uri: &Uri) -> Option<(ImageLocation, String)> { @@ -246,56 +285,154 @@ fn split_path_base_url(uri: &Uri) -> Option<(ImageLocation, String)> { async fn route_request( State(rp): State>, - request: Request, + mut request: Request, ) -> Result { let dest_uri = { let routing_table = rp.routing_table.read().await; routing_table.get_destination_uri_from_request(&request) }; - let dest = dest_uri.ok_or(AppError::NoSuchContainer)?; - trace!(%dest, "reverse proxying"); - - // Note: `reqwest` and `axum` currently use different versions of `http` - let method = - request.method().to_string().parse().map_err(|_| { - AppError::AssertionFailed("method http version mismatch workaround failed") - })?; - let response = rp.client.request(method, dest.to_string()).send().await; - - match response { - Ok(response) => { - let mut bld = Response::builder().status(response.status().as_u16()); - for (key, value) in response.headers() { - if HOP_BY_HOP.contains(key) { - continue; + match dest_uri { + Destination::ReverseProxied { uri: dest, config } => { + trace!(%dest, "reverse proxying"); + + // First, check if http authentication is enabled. + if let Some(ref http_access) = config.http.access { + let creds = request + .extract_parts::() + .await + .map_err(|status| AppError::AuthFailure { + // TODO: Output container name? + realm: "password protected container", + status, + })?; + + if !http_access.check_credentials(&creds).await { + return Err(AppError::AuthFailure { + realm: "password protected container", + status: StatusCode::UNAUTHORIZED, + }); } + } - let key_string = key.to_string(); - let value_str = value.to_str().map_err(|_| AppError::NonUtf8Header)?; - - bld = bld.header(key_string, value_str); + // Note: `reqwest` and `axum` currently use different versions of `http` + let method = request.method().to_string().parse().map_err(|_| { + AppError::AssertionFailed("method http version mismatch workaround failed") + })?; + let response = rp.client.request(method, dest.to_string()).send().await; + + match response { + Ok(response) => { + let mut bld = Response::builder().status(response.status().as_u16()); + for (key, value) in response.headers() { + if HOP_BY_HOP.contains(key) { + continue; + } + + let key_string = key.to_string(); + let value_str = value.to_str().map_err(|_| AppError::NonUtf8Header)?; + + bld = bld.header(key_string, value_str); + } + Ok(bld.body(Body::from(response.bytes().await?)).map_err(|_| { + AppError::AssertionFailed("should not fail to construct response") + })?) + } + Err(err) => { + warn!(%err, %dest, "failed request"); + Ok(Response::builder() + .status(500) + .body(Body::empty()) + .map_err(|_| { + AppError::AssertionFailed("should not fail to construct error response") + })?) + } } - Ok(bld - .body(Body::from(response.bytes().await?)) - .map_err(|_| AppError::AssertionFailed("should not fail to construct response"))?) } - Err(err) => { - warn!(%err, %dest, "failed request"); - Ok(Response::builder() - .status(500) - .body(Body::empty()) - .map_err(|_| { - AppError::AssertionFailed("should not fail to construct error response") - })?) + Destination::Internal(uri) => { + let method = request.method().clone(); + // Note: The auth functionality has been lifted from `registry`. It may need to be + // refactored out because of that. + let creds: UnverifiedCredentials = + request + .extract_parts() + .await + .map_err(|status| AppError::AuthFailure { + realm: "internal", + status, + })?; + + let opt_body = request + .extract::, _>() + .await + .expect("infallible"); + + // Any internal URL is subject to requiring auth through the master key. + if !rp.auth_provider.check_credentials(&creds).await { + return Err(AppError::AuthFailure { + realm: "internal", + status: StatusCode::UNAUTHORIZED, + }); + } + + let remainder = uri + .path() + .strip_prefix("/_rockslide/config/") + .ok_or(AppError::InternalUrlInvalid)?; + + let parts = remainder.split('/').collect::>(); + if parts.len() != 3 { + return Err(AppError::InternalUrlInvalid); + } + + if parts[2] != "prod" { + return Err(AppError::InternalUrlInvalid); + } + + let manifest_reference = ManifestReference::new( + ImageLocation::new(parts[0].to_owned(), parts[1].to_owned()), + Reference::new_tag(parts[2]), + ); + + let orchestrator = rp + .orchestrator + .get() + .ok_or_else(|| AppError::AssertionFailed("no orchestrator configured"))?; + + match method { + Method::GET => { + let config = orchestrator + .load_config(&manifest_reference) + .await + .map_err(AppError::Internal)?; + + Ok(config.into_response()) + } + Method::PUT => { + let raw = opt_body.ok_or(AppError::InvalidPayload)?; + let new_config: RuntimeConfig = + toml::from_str(&raw).map_err(|_| AppError::InvalidPayload)?; + let stored = orchestrator + .save_config(&manifest_reference, &new_config) + .await + .map_err(AppError::Internal)?; + + // Update containers. + orchestrator.updated_published_set().await; + + Ok(stored.into_response()) + } + _ => Err(AppError::InternalUrlInvalid), + } } + Destination::NotFound => Err(AppError::NoSuchContainer), } } /// HTTP/1.1 hop-by-hop headers mod hop_by_hop { use reqwest::header::HeaderName; - pub(super) const HOP_BY_HOP: [HeaderName; 8] = [ + pub(super) static HOP_BY_HOP: [HeaderName; 8] = [ HeaderName::from_static("keep-alive"), HeaderName::from_static("transfer-encoding"), HeaderName::from_static("te"),