diff --git a/Cargo.lock b/Cargo.lock index cffb1d41db..6580b5082a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6836,6 +6836,16 @@ dependencies = [ "regex", ] +[[package]] +name = "sanitize-filename" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ed72fbaf78e6f2d41744923916966c4fbe3d7c74e3037a8ee482f1115572603" +dependencies = [ + "lazy_static 1.4.0", + "regex", +] + [[package]] name = "saturating" version = "0.1.0" @@ -7731,6 +7741,7 @@ version = "2.7.0-pre0" dependencies = [ "async-trait", "cap-primitives 3.0.0", + "spin-common", "spin-factors", "spin-factors-test", "tokio", @@ -8217,7 +8228,7 @@ dependencies = [ "outbound-redis", "rustls-pemfile 2.1.2", "rustls-pki-types", - "sanitize-filename", + "sanitize-filename 0.4.0", "serde 1.0.197", "serde_json", "spin-app", @@ -8297,6 +8308,47 @@ dependencies = [ "webpki-roots 0.26.1", ] +[[package]] +name = "spin-trigger-http2" +version = "2.7.0-pre0" +dependencies = [ + "anyhow", + "async-trait", + "clap 3.2.25", + "futures", + "futures-util", + "http 1.1.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "indexmap 1.9.3", + "percent-encoding", + "rustls 0.22.4", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde 1.0.197", + "serde_json", + "spin-app", + "spin-core", + "spin-factor-outbound-http", + "spin-factor-wasi", + "spin-http", + "spin-outbound-networking", + "spin-telemetry", + "spin-trigger2", + "spin-world", + "terminal", + "tls-listener", + "tokio", + "tokio-rustls 0.25.0", + "tracing", + "url", + "wasmtime", + "wasmtime-wasi", + "wasmtime-wasi-http", + "webpki-roots 0.26.1", +] + [[package]] name = "spin-trigger-redis" version = "2.7.0-pre0" @@ -8318,6 +8370,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "spin-trigger2" +version = "2.7.0-pre0" +dependencies = [ + "anyhow", + "clap 3.2.25", + "ctrlc", + "futures", + "sanitize-filename 0.5.0", + "serde 1.0.197", + "serde_json", + "spin-app", + "spin-common", + "spin-core", + "spin-factor-wasi", + "spin-factors", + "spin-factors-executor", + "spin-telemetry", + "tokio", + "tracing", +] + [[package]] name = "spin-variables" version = "2.7.0-pre0" diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 22d97ceba8..e3aa54ac36 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -101,7 +101,7 @@ impl App { } /// Returns the trigger metadata for a specific trigger type. - pub fn get_trigger_metadata<'this, T: Deserialize<'this> + Default>( + pub fn get_trigger_metadata<'this, T: Deserialize<'this>>( &'this self, trigger_type: &str, ) -> Result> { @@ -140,6 +140,20 @@ impl App { .filter(move |trigger| trigger.locked.trigger_type == trigger_type) } + /// Returns an iterator of trigger IDs and deserialized trigger configs for + /// the given `trigger_type`. + pub fn trigger_configs<'a, T: Deserialize<'a>>( + &'a self, + trigger_type: &'a str, + ) -> Result> { + self.triggers_with_type(trigger_type) + .map(|trigger| { + let config = trigger.typed_config::()?; + Ok((trigger.id(), config)) + }) + .collect::>>() + } + /// Checks that the application does not have any host requirements /// outside the supported set. The error case returns a comma-separated /// list of unmet requirements. @@ -215,12 +229,12 @@ pub struct AppTrigger<'a> { impl<'a> AppTrigger<'a> { /// Returns this trigger's app-unique ID. - pub fn id(&self) -> &str { + pub fn id(&self) -> &'a str { &self.locked.id } /// Returns the Trigger's type. - pub fn trigger_type(&self) -> &str { + pub fn trigger_type(&self) -> &'a str { &self.locked.trigger_type } diff --git a/crates/core/src/store.rs b/crates/core/src/store.rs index 7ad7168b5a..9afbd3cfd3 100644 --- a/crates/core/src/store.rs +++ b/crates/core/src/store.rs @@ -38,6 +38,16 @@ impl Store { }; self.inner.set_epoch_deadline(ticks); } + + /// Provides access to the inner [`wasmtime::Store`]'s data. + pub fn data(&self) -> &T { + self.inner.data() + } + + /// Provides access to the inner [`wasmtime::Store`]'s data. + pub fn data_mut(&mut self) -> &mut T { + self.inner.data_mut() + } } impl AsRef> for Store { diff --git a/crates/core/tests/integration_test.rs b/crates/core/tests/integration_test.rs index 8db91940d4..ad47912b4d 100644 --- a/crates/core/tests/integration_test.rs +++ b/crates/core/tests/integration_test.rs @@ -140,8 +140,7 @@ async fn run_test( let app = App::new("test-app", locked); let configured_app = factors.configure_app(app, Default::default())?; let mut builders = factors.prepare(&configured_app, "test-component")?; - // FIXME: it is unfortunate that we have to unwrap here... - builders.wasi.as_mut().unwrap().args(args); + builders.wasi().args(args); let instance_state = factors.build_instance_state(builders)?; let state = TestState { core: State::default(), diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index e17babf2e4..ed01d137e9 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -1,7 +1,7 @@ mod spin; mod wasi; -mod wasi_2023_10_18; -mod wasi_2023_11_10; +pub mod wasi_2023_10_18; +pub mod wasi_2023_11_10; use spin_factor_outbound_networking::{OutboundAllowedHosts, OutboundNetworkingFactor}; use spin_factors::{ diff --git a/crates/factor-outbound-http/src/wasi_2023_10_18.rs b/crates/factor-outbound-http/src/wasi_2023_10_18.rs index 92a9cd4ccc..891853ec6c 100644 --- a/crates/factor-outbound-http/src/wasi_2023_10_18.rs +++ b/crates/factor-outbound-http/src/wasi_2023_10_18.rs @@ -17,6 +17,10 @@ mod bindings { interfaces: r#" include wasi:http/proxy@0.2.0-rc-2023-10-18; "#, + async: { + // Only need async exports + only_imports: [], + }, with: { "wasi:io/poll/pollable": latest::io::poll::Pollable, "wasi:io/streams/input-stream": latest::io::streams::InputStream, @@ -41,6 +45,12 @@ mod wasi { pub use super::bindings::wasi::{http0_2_0_rc_2023_10_18 as http, io0_2_0_rc_2023_10_18 as io}; } +pub mod exports { + pub mod wasi { + pub use super::super::bindings::exports::wasi::http0_2_0_rc_2023_10_18 as http; + } +} + use wasi::http::types::{ Error as HttpError, Fields, FutureIncomingResponse, FutureTrailers, Headers, IncomingBody, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest, OutgoingResponse, @@ -51,8 +61,9 @@ use wasi::io::streams::{InputStream, OutputStream}; use crate::wasi::WasiHttpImplInner; -pub fn add_to_linker(linker: &mut Linker, closure: F) -> Result<()> +pub(crate) fn add_to_linker(linker: &mut Linker, closure: F) -> Result<()> where + T: Send, F: Fn(&mut T) -> WasiHttpImpl + Send + Sync + Copy + 'static, { wasi::http::types::add_to_linker_get_host(linker, closure)?; diff --git a/crates/factor-outbound-http/src/wasi_2023_11_10.rs b/crates/factor-outbound-http/src/wasi_2023_11_10.rs index 439003d158..0a878cfcfb 100644 --- a/crates/factor-outbound-http/src/wasi_2023_11_10.rs +++ b/crates/factor-outbound-http/src/wasi_2023_11_10.rs @@ -20,6 +20,10 @@ mod bindings { interfaces: r#" include wasi:http/proxy@0.2.0-rc-2023-11-10; "#, + async: { + // Only need async exports + only_imports: [], + }, with: { "wasi:io/poll/pollable": latest::io::poll::Pollable, "wasi:io/streams/input-stream": latest::io::streams::InputStream, @@ -45,6 +49,12 @@ mod wasi { pub use super::bindings::wasi::{http0_2_0_rc_2023_11_10 as http, io0_2_0_rc_2023_11_10 as io}; } +pub mod exports { + pub mod wasi { + pub use super::super::bindings::exports::wasi::http0_2_0_rc_2023_11_10 as http; + } +} + use wasi::http::types::{ DnsErrorPayload, ErrorCode as HttpErrorCode, FieldSizePayload, Fields, FutureIncomingResponse, FutureTrailers, HeaderError, Headers, IncomingBody, IncomingRequest, IncomingResponse, Method, @@ -56,8 +66,9 @@ use wasi::io::streams::{Error as IoError, InputStream, OutputStream}; use crate::wasi::WasiHttpImplInner; -pub fn add_to_linker(linker: &mut Linker, closure: F) -> Result<()> +pub(crate) fn add_to_linker(linker: &mut Linker, closure: F) -> Result<()> where + T: Send, F: Fn(&mut T) -> WasiHttpImpl + Send + Sync + Copy + 'static, { wasi::http::types::add_to_linker_get_host(linker, closure)?; diff --git a/crates/factor-variables/src/spin_cli/mod.rs b/crates/factor-variables/src/spin_cli/mod.rs index 49ea1261cd..f03a12472d 100644 --- a/crates/factor-variables/src/spin_cli/mod.rs +++ b/crates/factor-variables/src/spin_cli/mod.rs @@ -17,7 +17,7 @@ use crate::runtime_config::RuntimeConfig; /// Resolves a runtime configuration for the variables factor from a TOML table. pub fn runtime_config_from_toml(table: &toml::Table) -> anyhow::Result { // Always include the environment variable provider. - let mut providers = vec![Box::new(EnvVariablesProvider::default()) as _]; + let mut providers = vec![Box::::default() as _]; let Some(array) = table.get("variable_provider") else { return Ok(RuntimeConfig { providers }); }; diff --git a/crates/factor-wasi/Cargo.toml b/crates/factor-wasi/Cargo.toml index 6fb2dfc8e6..b35bfc04e6 100644 --- a/crates/factor-wasi/Cargo.toml +++ b/crates/factor-wasi/Cargo.toml @@ -7,6 +7,7 @@ edition = { workspace = true } [dependencies] async-trait = "0.1" cap-primitives = "3.0.0" +spin-common = { path = "../common" } spin-factors = { path = "../factors" } tokio = { version = "1" } wasmtime = { workspace = true } diff --git a/crates/factor-wasi/src/lib.rs b/crates/factor-wasi/src/lib.rs index 2579b15e68..b7cc8a90f1 100644 --- a/crates/factor-wasi/src/lib.rs +++ b/crates/factor-wasi/src/lib.rs @@ -1,3 +1,4 @@ +pub mod spin; mod wasi_2023_10_18; mod wasi_2023_11_10; @@ -122,7 +123,7 @@ impl Factor for WasiFactor { } } -pub trait FilesMounter { +pub trait FilesMounter: Send + Sync { fn mount_files( &self, app_component: &AppComponent, diff --git a/crates/factor-wasi/src/spin.rs b/crates/factor-wasi/src/spin.rs new file mode 100644 index 0000000000..25de63b5e0 --- /dev/null +++ b/crates/factor-wasi/src/spin.rs @@ -0,0 +1,48 @@ +use std::path::PathBuf; + +use spin_common::{ui::quoted_path, url::parse_file_url}; +use spin_factors::anyhow::{ensure, Context}; + +use crate::FilesMounter; + +pub struct SpinFilesMounter { + working_dir: PathBuf, + allow_transient_writes: bool, +} + +impl SpinFilesMounter { + pub fn new(working_dir: impl Into, allow_transient_writes: bool) -> Self { + Self { + working_dir: working_dir.into(), + allow_transient_writes, + } + } +} + +impl FilesMounter for SpinFilesMounter { + fn mount_files( + &self, + app_component: &spin_factors::AppComponent, + mut ctx: crate::MountFilesContext, + ) -> spin_factors::anyhow::Result<()> { + for content_dir in app_component.files() { + let source_uri = content_dir + .content + .source + .as_deref() + .with_context(|| format!("Missing 'source' on files mount {content_dir:?}"))?; + let source_path = self.working_dir.join(parse_file_url(source_uri)?); + ensure!( + source_path.is_dir(), + "SpinFilesMounter only supports directory mounts; {} is not a directory", + quoted_path(&source_path), + ); + let guest_path = &content_dir.path; + let guest_path = guest_path + .to_str() + .with_context(|| format!("guest path {guest_path:?} not valid UTF-8"))?; + ctx.preopened_dir(source_path, guest_path, self.allow_transient_writes)?; + } + Ok(()) + } +} diff --git a/crates/factors-derive/src/lib.rs b/crates/factors-derive/src/lib.rs index 921196c8b7..df97924482 100644 --- a/crates/factors-derive/src/lib.rs +++ b/crates/factors-derive/src/lib.rs @@ -207,7 +207,16 @@ fn expand_factors(input: &DeriveInput) -> syn::Result { #vis struct #builders_name { #( - pub #factor_names: Option<<#factor_types as #Factor>::InstanceBuilder>, + #factor_names: Option<<#factor_types as #Factor>::InstanceBuilder>, + )* + } + + #[allow(dead_code)] + impl #builders_name { + #( + pub fn #factor_names(&mut self) -> &mut <#factor_types as #Factor>::InstanceBuilder { + self.#factor_names.as_mut().unwrap() + } )* } diff --git a/crates/factors-executor/src/lib.rs b/crates/factors-executor/src/lib.rs index c624a02f02..2d1c00330a 100644 --- a/crates/factors-executor/src/lib.rs +++ b/crates/factors-executor/src/lib.rs @@ -10,85 +10,154 @@ use spin_factors::{AsInstanceState, ConfiguredApp, RuntimeFactors, RuntimeFactor /// `Factors` is the executor's [`RuntimeFactors`]. `ExecutorInstanceState` /// holds any other per-instance state needed by the caller. pub struct FactorsExecutor { - factors: T, core_engine: spin_core::Engine>, - configured_app: ConfiguredApp, - // Maps component IDs -> InstancePres - component_instance_pres: HashMap>, + factors: T, + hooks: Vec>>, } -type InstancePre = - spin_core::InstancePre::InstanceState, U>>; - impl FactorsExecutor { /// Constructs a new executor. pub fn new( - core_config: &spin_core::Config, + mut core_engine_builder: spin_core::EngineBuilder< + InstanceState<::InstanceState, U>, + >, mut factors: T, - app: App, - mut component_loader: impl ComponentLoader, - runtime_config: T::RuntimeConfig, ) -> anyhow::Result { - let core_engine = { - let mut builder = - spin_core::Engine::builder(core_config).context("failed to initialize engine")?; - factors - .init(builder.linker()) - .context("failed to initialize factors")?; - builder.build() - }; + factors + .init(core_engine_builder.linker()) + .context("failed to initialize factors")?; + Ok(Self { + factors, + core_engine: core_engine_builder.build(), + hooks: Default::default(), + }) + } - let configured_app = factors + /// Adds the given [`ExecutorHooks`] to this executor. + /// + /// Hooks are run in the order they are added. + pub fn add_hooks(&mut self, hooks: impl ExecutorHooks + 'static) { + self.hooks.push(Box::new(hooks)); + } + + /// Loads a [`FactorsApp`] with this executor. + pub fn load_app( + mut self, + app: App, + runtime_config: T::RuntimeConfig, + mut component_loader: impl ComponentLoader, + ) -> anyhow::Result> { + let configured_app = self + .factors .configure_app(app, runtime_config) .context("failed to configure app")?; + for hooks in &mut self.hooks { + hooks.configure_app(&configured_app)?; + } + let component_instance_pres = configured_app .app() .components() .map(|app_component| { let component = - component_loader.load_component(core_engine.as_ref(), &app_component)?; - let instance_pre = core_engine.instantiate_pre(&component)?; + component_loader.load_component(self.core_engine.as_ref(), &app_component)?; + let instance_pre = self.core_engine.instantiate_pre(&component)?; Ok((app_component.id().to_string(), instance_pre)) }) .collect::>>()?; - Ok(Self { - factors, - core_engine, + Ok(FactorsExecutorApp { + executor: self, configured_app, component_instance_pres, }) } +} + +pub trait ExecutorHooks: Send + Sync { + /// Configure app hooks run immediately after [`RuntimeFactors::configure_app`]. + fn configure_app(&mut self, configured_app: &ConfiguredApp) -> anyhow::Result<()> { + let _ = configured_app; + Ok(()) + } + + /// Prepare instance hooks run immediately before [`FactorsExecutor::prepare`] returns. + fn prepare_instance(&self, builder: &mut FactorsInstanceBuilder) -> anyhow::Result<()> { + let _ = builder; + Ok(()) + } +} + +/// A ComponentLoader is responsible for loading Wasmtime [`Component`]s. +pub trait ComponentLoader { + /// Loads a [`Component`] for the given [`AppComponent`]. + fn load_component( + &mut self, + engine: &spin_core::wasmtime::Engine, + component: &AppComponent, + ) -> anyhow::Result; +} + +type InstancePre = + spin_core::InstancePre::InstanceState, U>>; + +/// A FactorsExecutorApp represents a loaded Spin app, ready for instantiation. +pub struct FactorsExecutorApp { + executor: FactorsExecutor, + configured_app: ConfiguredApp, + // Maps component IDs -> InstancePres + component_instance_pres: HashMap>, +} + +impl FactorsExecutorApp { + pub fn engine(&self) -> &spin_core::Engine> { + &self.executor.core_engine + } + + pub fn app(&self) -> &App { + self.configured_app.app() + } + + pub fn get_component(&self, component_id: &str) -> anyhow::Result<&Component> { + let instance_pre = self + .component_instance_pres + .get(component_id) + .with_context(|| format!("no such component {component_id:?}"))?; + Ok(instance_pre.component()) + } /// Returns an instance builder for the given component ID. - pub fn prepare(&mut self, component_id: &str) -> anyhow::Result> { + pub fn prepare(&self, component_id: &str) -> anyhow::Result> { let app_component = self .configured_app .app() .get_component(component_id) .with_context(|| format!("no such component {component_id:?}"))?; + let instance_pre = self.component_instance_pres.get(component_id).unwrap(); - let factor_builders = self.factors.prepare(&self.configured_app, component_id)?; - let store_builder = self.core_engine.store_builder(); - Ok(FactorsInstanceBuilder { + + let factor_builders = self + .executor + .factors + .prepare(&self.configured_app, component_id)?; + + let store_builder = self.executor.core_engine.store_builder(); + + let mut builder = FactorsInstanceBuilder { store_builder, factor_builders, instance_pre, app_component, - factors: &self.factors, - }) - } -} + factors: &self.executor.factors, + }; -/// A ComponentLoader is responsible for loading Wasmtime [`Component`]s. -pub trait ComponentLoader { - /// Loads a [`Component`] for the given [`AppComponent`]. - fn load_component( - &mut self, - engine: &spin_core::wasmtime::Engine, - component: &AppComponent, - ) -> anyhow::Result; + for hooks in &self.executor.hooks { + hooks.prepare_instance(&mut builder)?; + } + + Ok(builder) + } } /// A FactorsInstanceBuilder manages the instantiation of a Spin component @@ -101,7 +170,7 @@ pub struct FactorsInstanceBuilder<'a, T: RuntimeFactors, U> { factors: &'a T, } -impl<'a, T: RuntimeFactors, U: Send> FactorsInstanceBuilder<'a, T, U> { +impl<'a, T: RuntimeFactors, U> FactorsInstanceBuilder<'a, T, U> { /// Returns the app component for the instance. pub fn app_component(&self) -> &AppComponent { &self.app_component @@ -116,7 +185,9 @@ impl<'a, T: RuntimeFactors, U: Send> FactorsInstanceBuilder<'a, T, U> { pub fn factor_builders(&mut self) -> &mut T::InstanceBuilders { &mut self.factor_builders } +} +impl<'a, T: RuntimeFactors, U: Send> FactorsInstanceBuilder<'a, T, U> { /// Instantiates the instance with the given executor instance state pub async fn instantiate( self, @@ -137,14 +208,24 @@ impl<'a, T: RuntimeFactors, U: Send> FactorsInstanceBuilder<'a, T, U> { } /// InstanceState is the [`spin_core::Store`] `data` for an instance. -pub struct InstanceState { +pub struct InstanceState { core: spin_core::State, - factors: FactorsState, - executor: ExecutorInstanceState, + factors: T, + executor: U, } impl InstanceState { - /// Provides access to the `ExecutorInstanceState`. + /// Provides access to the [`spin_core::State`]. + pub fn core_state(&self) -> &spin_core::State { + &self.core + } + + /// Provides access to the [`RuntimeFactors::InstanceState`]. + pub fn factors_instance_state(&mut self) -> &mut T { + &mut self.factors + } + + /// Provides access to the `Self::ExecutorInstanceState`. pub fn executor_instance_state(&mut self) -> &mut U { &mut self.executor } @@ -184,15 +265,12 @@ mod tests { let locked = env.build_locked_app().await?; let app = App::new("test-app", locked); - let mut executor = FactorsExecutor::new( - &Default::default(), - env.factors, - app, - DummyComponentLoader, - Default::default(), - )?; + let engine_builder = spin_core::Engine::builder(&Default::default())?; + let executor = FactorsExecutor::new(engine_builder, env.factors)?; + + let factors_app = executor.load_app(app, Default::default(), DummyComponentLoader)?; - let mut instance_builder = executor.prepare("empty")?; + let mut instance_builder = factors_app.prepare("empty")?; assert_eq!(instance_builder.app_component().id(), "empty"); diff --git a/crates/http/src/trigger.rs b/crates/http/src/trigger.rs index ca53b33544..37a030ed0d 100644 --- a/crates/http/src/trigger.rs +++ b/crates/http/src/trigger.rs @@ -1,8 +1,4 @@ use serde::{Deserialize, Serialize}; -use spin_locked_app::MetadataKey; - -/// Http trigger metadata key -pub const METADATA_KEY: MetadataKey = MetadataKey::new("trigger"); #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] diff --git a/crates/trigger-http2/Cargo.toml b/crates/trigger-http2/Cargo.toml new file mode 100644 index 0000000000..0abb814365 --- /dev/null +++ b/crates/trigger-http2/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "spin-trigger-http2" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } + +[lib] +doctest = false + +[dependencies] +anyhow = "1.0" +async-trait = "0.1" +clap = "3" +futures = "0.3" +futures-util = "0.3.8" +http = "1.0.0" +hyper = { workspace = true } +hyper-util = { version = "0.1.2", features = ["tokio"] } +http-body-util = { workspace = true } +indexmap = "1" +percent-encoding = "2" +rustls = { version = "0.22.4" } +rustls-pemfile = "2.1.2" +rustls-pki-types = "1.7" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +spin-app = { path = "../app" } +spin-core = { path = "../core" } +spin-factor-outbound-http = { path = "../factor-outbound-http" } +spin-factor-wasi = { path = "../factor-wasi" } +spin-http = { path = "../http" } +spin-outbound-networking = { path = "../outbound-networking" } +spin-telemetry = { path = "../telemetry" } +spin-trigger2 = { path = "../trigger2" } +spin-world = { path = "../world" } +terminal = { path = "../terminal" } +tls-listener = { version = "0.10.0", features = ["rustls"] } +tokio = { version = "1.23", features = ["full"] } +tokio-rustls = { version = "0.25.0" } +url = "2.4.1" +tracing = { workspace = true } +wasmtime = { workspace = true } +wasmtime-wasi = { workspace = true } +wasmtime-wasi-http = { workspace = true } +webpki-roots = { version = "0.26.0" } + +[lints] +workspace = true diff --git a/crates/trigger-http2/src/handler.rs b/crates/trigger-http2/src/handler.rs new file mode 100644 index 0000000000..1631c5e162 --- /dev/null +++ b/crates/trigger-http2/src/handler.rs @@ -0,0 +1,411 @@ +use std::{net::SocketAddr, str, str::FromStr}; + +use anyhow::{anyhow, Context, Result}; +use futures::TryFutureExt; +use http::{HeaderName, HeaderValue}; +use http_body_util::BodyExt; +use hyper::{Request, Response}; +use spin_core::{Component, Instance}; +use spin_factor_outbound_http::wasi_2023_10_18::exports::wasi::http::incoming_handler::Guest as IncomingHandler2023_10_18; +use spin_factor_outbound_http::wasi_2023_11_10::exports::wasi::http::incoming_handler::Guest as IncomingHandler2023_11_10; +use spin_http::body; +use spin_http::routes::RouteMatch; +use spin_world::v1::http_types; +use tokio::{sync::oneshot, task}; +use tracing::{instrument, Instrument, Level}; +use wasmtime_wasi_http::{body::HyperIncomingBody as Body, proxy::Proxy, WasiHttpView}; + +use crate::{server::HttpExecutor, Store, TriggerInstanceBuilder}; + +#[derive(Clone)] +pub struct HttpHandlerExecutor { + pub handler_type: HandlerType, +} + +impl HttpExecutor for HttpHandlerExecutor { + #[instrument(name = "spin_trigger_http.execute_wasm", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wasm_component {}", route_match.component_id())))] + async fn execute( + &self, + instance_builder: TriggerInstanceBuilder<'_>, + route_match: &RouteMatch, + req: Request, + client_addr: SocketAddr, + ) -> Result> { + let component_id = route_match.component_id(); + + tracing::trace!("Executing request using the Spin executor for component {component_id}"); + + let (instance, store) = instance_builder.instantiate(()).await?; + + let resp = match self.handler_type { + HandlerType::Spin => self + .execute_spin(store, instance, route_match, req, client_addr) + .await + .map_err(contextualise_err)?, + _ => { + self.execute_wasi(store, instance, route_match, req, client_addr) + .await? + } + }; + + tracing::info!( + "Request finished, sending response with status code {}", + resp.status() + ); + Ok(resp) + } +} + +impl HttpHandlerExecutor { + pub async fn execute_spin( + &self, + mut store: Store, + instance: Instance, + route_match: &RouteMatch, + req: Request, + client_addr: SocketAddr, + ) -> Result> { + let headers = Self::headers(&req, route_match, client_addr)?; + let func = instance + .exports(&mut store) + .instance("fermyon:spin/inbound-http") + // Safe since we have already checked that this instance exists + .expect("no fermyon:spin/inbound-http found") + .typed_func::<(http_types::Request,), (http_types::Response,)>("handle-request")?; + + let (parts, body) = req.into_parts(); + let bytes = body.collect().await?.to_bytes().to_vec(); + + let method = if let Some(method) = Self::method(&parts.method) { + method + } else { + return Ok(Response::builder() + .status(http::StatusCode::METHOD_NOT_ALLOWED) + .body(body::empty())?); + }; + + // Preparing to remove the params field. We are leaving it in place for now + // to avoid breaking the ABI, but no longer pass or accept values in it. + // https://github.com/fermyon/spin/issues/663 + let params = vec![]; + + let uri = match parts.uri.path_and_query() { + Some(u) => u.to_string(), + None => parts.uri.to_string(), + }; + + let req = http_types::Request { + method, + uri, + headers, + params, + body: Some(bytes), + }; + + let (resp,) = func.call_async(&mut store, (req,)).await?; + + if resp.status < 100 || resp.status > 600 { + tracing::error!("malformed HTTP status code"); + return Ok(Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(body::empty())?); + }; + + let mut response = http::Response::builder().status(resp.status); + if let Some(headers) = response.headers_mut() { + Self::append_headers(headers, resp.headers)?; + } + + let body = match resp.body { + Some(b) => body::full(b.into()), + None => body::empty(), + }; + + Ok(response.body(body)?) + } + + fn method(m: &http::Method) -> Option { + Some(match *m { + http::Method::GET => http_types::Method::Get, + http::Method::POST => http_types::Method::Post, + http::Method::PUT => http_types::Method::Put, + http::Method::DELETE => http_types::Method::Delete, + http::Method::PATCH => http_types::Method::Patch, + http::Method::HEAD => http_types::Method::Head, + http::Method::OPTIONS => http_types::Method::Options, + _ => return None, + }) + } + + async fn execute_wasi( + &self, + mut store: Store, + instance: Instance, + route_match: &RouteMatch, + mut req: Request, + client_addr: SocketAddr, + ) -> anyhow::Result> { + let headers = Self::headers(&req, route_match, client_addr)?; + req.headers_mut().clear(); + req.headers_mut() + .extend(headers.into_iter().filter_map(|(n, v)| { + let Ok(name) = n.parse::() else { + return None; + }; + let Ok(value) = HeaderValue::from_bytes(v.as_bytes()) else { + return None; + }; + Some((name, value)) + })); + + let mut wasi_http = spin_factor_outbound_http::OutboundHttpFactor::get_wasi_http_impl( + store.data_mut().factors_instance_state(), + ) + .context("missing OutboundHttpFactor")?; + + let request = wasi_http.new_incoming_request(req)?; + + let (response_tx, response_rx) = oneshot::channel(); + let response = wasi_http.new_response_outparam(response_tx)?; + + drop(wasi_http); + + enum Handler { + Latest(Proxy), + Handler2023_11_10(IncomingHandler2023_11_10), + Handler2023_10_18(IncomingHandler2023_10_18), + } + + let handler = + { + let mut exports = instance.exports(&mut store); + match self.handler_type { + HandlerType::Wasi2023_10_18 => { + let mut instance = exports + .instance(WASI_HTTP_EXPORT_2023_10_18) + .ok_or_else(|| { + anyhow!("export of `{WASI_HTTP_EXPORT_2023_10_18}` not an instance") + })?; + Handler::Handler2023_10_18(IncomingHandler2023_10_18::new(&mut instance)?) + } + HandlerType::Wasi2023_11_10 => { + let mut instance = exports + .instance(WASI_HTTP_EXPORT_2023_11_10) + .ok_or_else(|| { + anyhow!("export of `{WASI_HTTP_EXPORT_2023_11_10}` not an instance") + })?; + Handler::Handler2023_11_10(IncomingHandler2023_11_10::new(&mut instance)?) + } + HandlerType::Wasi0_2 => { + drop(exports); + Handler::Latest(Proxy::new(&mut store, &instance)?) + } + HandlerType::Spin => panic!("should have used execute_spin instead"), + } + }; + + let span = tracing::debug_span!("execute_wasi"); + let handle = task::spawn( + async move { + let result = match handler { + Handler::Latest(proxy) => { + proxy + .wasi_http_incoming_handler() + .call_handle(&mut store, request, response) + .instrument(span) + .await + } + Handler::Handler2023_10_18(handler) => { + handler + .call_handle(&mut store, request, response) + .instrument(span) + .await + } + Handler::Handler2023_11_10(handler) => { + handler + .call_handle(&mut store, request, response) + .instrument(span) + .await + } + }; + + tracing::trace!( + "wasi-http memory consumed: {}", + store.data().core_state().memory_consumed() + ); + + result + } + .in_current_span(), + ); + + match response_rx.await { + Ok(response) => { + task::spawn( + async move { + handle + .await + .context("guest invocation panicked")? + .context("guest invocation failed")?; + + Ok(()) + } + .map_err(|e: anyhow::Error| { + tracing::warn!("component error after response: {e:?}"); + }), + ); + + Ok(response.context("guest failed to produce a response")?) + } + + Err(_) => { + handle + .await + .context("guest invocation panicked")? + .context("guest invocation failed")?; + + Err(anyhow!( + "guest failed to produce a response prior to returning" + )) + } + } + } + + fn headers( + req: &Request, + route_match: &RouteMatch, + client_addr: SocketAddr, + ) -> Result> { + let mut res = Vec::new(); + for (name, value) in req + .headers() + .iter() + .map(|(name, value)| (name.to_string(), std::str::from_utf8(value.as_bytes()))) + { + let value = value?.to_string(); + res.push((name, value)); + } + + let default_host = http::HeaderValue::from_str("localhost")?; + let host = std::str::from_utf8( + req.headers() + .get("host") + .unwrap_or(&default_host) + .as_bytes(), + )?; + + // Set the environment information (path info, base path, etc) as headers. + // In the future, we might want to have this information in a context + // object as opposed to headers. + for (keys, val) in + crate::server::compute_default_headers(req.uri(), host, route_match, client_addr)? + { + res.push((Self::prepare_header_key(&keys[0]), val)); + } + + Ok(res) + } + + fn prepare_header_key(key: &str) -> String { + key.replace('_', "-").to_ascii_lowercase() + } + + fn append_headers(res: &mut http::HeaderMap, src: Option>) -> Result<()> { + if let Some(src) = src { + for (k, v) in src.iter() { + res.insert( + http::header::HeaderName::from_str(k)?, + http::header::HeaderValue::from_str(v)?, + ); + } + }; + + Ok(()) + } +} + +/// Whether this handler uses the custom Spin http handler interface for wasi-http +#[derive(Copy, Clone)] +pub enum HandlerType { + Spin, + Wasi0_2, + Wasi2023_11_10, + Wasi2023_10_18, +} + +const WASI_HTTP_EXPORT_2023_10_18: &str = "wasi:http/incoming-handler@0.2.0-rc-2023-10-18"; +const WASI_HTTP_EXPORT_2023_11_10: &str = "wasi:http/incoming-handler@0.2.0-rc-2023-11-10"; +const WASI_HTTP_EXPORT_0_2_0: &str = "wasi:http/incoming-handler@0.2.0"; + +impl HandlerType { + /// Determine the handler type from the exports of a component + pub fn from_component( + engine: impl AsRef, + component: &Component, + ) -> Result { + let mut handler_ty = None; + + let mut set = |ty: HandlerType| { + if handler_ty.is_none() { + handler_ty = Some(ty); + Ok(()) + } else { + Err(anyhow!( + "component exports multiple different handlers but \ + it's expected to export only one" + )) + } + }; + let ty = component.component_type(); + for (name, _) in ty.exports(engine.as_ref()) { + match name { + WASI_HTTP_EXPORT_2023_10_18 => set(HandlerType::Wasi2023_10_18)?, + WASI_HTTP_EXPORT_2023_11_10 => set(HandlerType::Wasi2023_11_10)?, + WASI_HTTP_EXPORT_0_2_0 => set(HandlerType::Wasi0_2)?, + "fermyon:spin/inbound-http" => set(HandlerType::Spin)?, + _ => {} + } + } + + handler_ty.ok_or_else(|| { + anyhow!( + "Expected component to either export `{WASI_HTTP_EXPORT_2023_10_18}`, \ + `{WASI_HTTP_EXPORT_2023_11_10}`, `{WASI_HTTP_EXPORT_0_2_0}`, \ + or `fermyon:spin/inbound-http` but it exported none of those" + ) + }) + } +} + +fn contextualise_err(e: anyhow::Error) -> anyhow::Error { + if e.to_string() + .contains("failed to find function export `canonical_abi_free`") + { + e.context( + "component is not compatible with Spin executor - should this use the Wagi executor?", + ) + } else { + e + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_spin_header_keys() { + assert_eq!( + HttpHandlerExecutor::prepare_header_key("SPIN_FULL_URL"), + "spin-full-url".to_string() + ); + assert_eq!( + HttpHandlerExecutor::prepare_header_key("SPIN_PATH_INFO"), + "spin-path-info".to_string() + ); + assert_eq!( + HttpHandlerExecutor::prepare_header_key("SPIN_RAW_COMPONENT_ROUTE"), + "spin-raw-component-route".to_string() + ); + } +} diff --git a/crates/trigger-http2/src/instrument.rs b/crates/trigger-http2/src/instrument.rs new file mode 100644 index 0000000000..aa5b3e0e09 --- /dev/null +++ b/crates/trigger-http2/src/instrument.rs @@ -0,0 +1,93 @@ +use anyhow::Result; +use http::Response; +use tracing::Level; +use wasmtime_wasi_http::body::HyperIncomingBody; + +/// Create a span for an HTTP request. +macro_rules! http_span { + ($request:tt, $addr:tt) => { + tracing::info_span!( + "spin_trigger_http.handle_http_request", + "otel.kind" = "server", + "http.request.method" = %$request.method(), + "network.peer.address" = %$addr.ip(), + "network.peer.port" = %$addr.port(), + "network.protocol.name" = "http", + "url.path" = $request.uri().path(), + "url.query" = $request.uri().query().unwrap_or(""), + "url.scheme" = $request.uri().scheme_str().unwrap_or(""), + "client.address" = $request.headers().get("x-forwarded-for").and_then(|val| val.to_str().ok()), + // Recorded later + "error.type" = ::tracing::field::Empty, + "http.response.status_code" = ::tracing::field::Empty, + "http.route" = ::tracing::field::Empty, + "otel.name" = ::tracing::field::Empty, + ) + }; +} + +pub(crate) use http_span; + +/// Finish setting attributes on the HTTP span. +pub(crate) fn finalize_http_span( + response: Result>, + method: String, +) -> Result> { + let span = tracing::Span::current(); + match response { + Ok(response) => { + let matched_route = response.extensions().get::(); + // Set otel.name and http.route + if let Some(MatchedRoute { route }) = matched_route { + span.record("http.route", route); + span.record("otel.name", format!("{method} {route}")); + } else { + span.record("otel.name", method); + } + + // Set status code + span.record("http.response.status_code", response.status().as_u16()); + + Ok(response) + } + Err(err) => { + instrument_error(&err); + span.record("http.response.status_code", 500); + span.record("otel.name", method); + Err(err) + } + } +} + +/// Marks the current span as errored. +pub(crate) fn instrument_error(err: &anyhow::Error) { + let span = tracing::Span::current(); + tracing::event!(target:module_path!(), Level::INFO, error = %err); + span.record("error.type", format!("{:?}", err)); +} + +/// MatchedRoute is used as a response extension to track the route that was matched for OTel +/// tracing purposes. +#[derive(Clone)] +pub struct MatchedRoute { + pub route: String, +} + +impl MatchedRoute { + pub fn set_response_extension( + resp: &mut Response, + route: impl Into, + ) { + resp.extensions_mut().insert(MatchedRoute { + route: route.into(), + }); + } + + pub fn with_response_extension( + mut resp: Response, + route: impl Into, + ) -> Response { + Self::set_response_extension(&mut resp, route); + resp + } +} diff --git a/crates/trigger-http2/src/lib.rs b/crates/trigger-http2/src/lib.rs new file mode 100644 index 0000000000..f2b9c26b52 --- /dev/null +++ b/crates/trigger-http2/src/lib.rs @@ -0,0 +1,394 @@ +//! Implementation for the Spin HTTP engine. + +mod handler; +mod instrument; +mod server; +mod tls; +mod wagi; + +use std::{ + collections::HashMap, + error::Error, + net::{Ipv4Addr, SocketAddr, ToSocketAddrs}, + path::PathBuf, + sync::Arc, +}; + +use anyhow::{bail, Context}; +use clap::Args; +use serde::Deserialize; +use spin_app::App; +use spin_http::{config::HttpTriggerConfig, routes::Router}; +use spin_trigger2::Trigger; +use tokio::net::TcpListener; +use wasmtime_wasi_http::bindings::wasi::http::types::ErrorCode; + +use server::HttpServer; + +pub use tls::TlsConfig; + +pub(crate) type TriggerApp = spin_trigger2::TriggerApp; +pub(crate) type TriggerInstanceBuilder<'a> = spin_trigger2::TriggerInstanceBuilder<'a, HttpTrigger>; +pub(crate) type Store = spin_trigger2::Store; + +#[derive(Args)] +pub struct CliArgs { + /// IP address and port to listen on + #[clap(long = "listen", env = "SPIN_HTTP_LISTEN_ADDR", default_value = "127.0.0.1:3000", value_parser = parse_listen_addr)] + pub address: SocketAddr, + + /// The path to the certificate to use for https, if this is not set, normal http will be used. The cert should be in PEM format + #[clap(long, env = "SPIN_TLS_CERT", requires = "tls-key")] + pub tls_cert: Option, + + /// The path to the certificate key to use for https, if this is not set, normal http will be used. The key should be in PKCS#8 format + #[clap(long, env = "SPIN_TLS_KEY", requires = "tls-cert")] + pub tls_key: Option, +} + +impl CliArgs { + fn into_tls_config(self) -> Option { + match (self.tls_cert, self.tls_key) { + (Some(cert_path), Some(key_path)) => Some(TlsConfig { + cert_path, + key_path, + }), + (None, None) => None, + _ => unreachable!(), + } + } +} + +pub(crate) type InstanceState = (); + +/// The Spin HTTP trigger. +pub struct HttpTrigger { + listen_addr: SocketAddr, + tls_config: Option, + router: Router, + // Component ID -> component trigger config + component_trigger_configs: HashMap, +} + +impl Trigger for HttpTrigger { + const TYPE: &'static str = "http"; + + type CliArgs = CliArgs; + type InstanceState = InstanceState; + + fn new(cli_args: Self::CliArgs, app: &spin_app::App) -> anyhow::Result { + Self::validate_app(app)?; + + let component_trigger_configs = HashMap::from_iter( + app.trigger_configs::("http")? + .into_iter() + .map(|(_, config)| (config.component.clone(), config)), + ); + + let component_routes = component_trigger_configs + .iter() + .map(|(component_id, config)| (component_id.as_str(), &config.route)); + let (router, duplicate_routes) = Router::build("/", component_routes)?; + if !duplicate_routes.is_empty() { + tracing::error!( + "The following component routes are duplicates and will never be used:" + ); + for dup in &duplicate_routes { + tracing::error!( + " {}: {} (duplicate of {})", + dup.replaced_id, + dup.route(), + dup.effective_id, + ); + } + } + tracing::trace!( + "Constructed router: {:?}", + router.routes().collect::>() + ); + + Ok(Self { + listen_addr: cli_args.address, + tls_config: cli_args.into_tls_config(), + router, + component_trigger_configs, + }) + } + + async fn run(self, trigger_app: TriggerApp) -> anyhow::Result<()> { + let Self { + listen_addr, + tls_config, + router, + component_trigger_configs, + } = self; + + let listener = TcpListener::bind(listen_addr) + .await + .with_context(|| format!("Unable to listen on {listen_addr}"))?; + + let server = Arc::new(HttpServer::new( + listen_addr, + trigger_app, + router, + component_trigger_configs, + )?); + + if let Some(tls_config) = tls_config { + server.serve_tls(listener, tls_config).await? + } else { + server.serve(listener).await? + }; + + Ok(()) + } + + fn supported_host_requirements() -> Vec<&'static str> { + vec![spin_app::locked::SERVICE_CHAINING_KEY] + } +} + +impl HttpTrigger { + fn validate_app(app: &App) -> anyhow::Result<()> { + #[derive(Deserialize)] + #[serde(deny_unknown_fields)] + struct TriggerMetadata { + base: Option, + } + if let Some(TriggerMetadata { base: Some(base) }) = app.get_trigger_metadata("http")? { + if base == "/" { + tracing::warn!("This application has the deprecated trigger 'base' set to the default value '/'. This may be an error in the future!"); + } else { + bail!("This application is using the deprecated trigger 'base' field. The base must be prepended to each [[trigger.http]]'s 'route'.") + } + } + Ok(()) + } +} + +fn parse_listen_addr(addr: &str) -> anyhow::Result { + let addrs: Vec = addr.to_socket_addrs()?.collect(); + // Prefer 127.0.0.1 over e.g. [::1] because CHANGE IS HARD + if let Some(addr) = addrs + .iter() + .find(|addr| addr.is_ipv4() && addr.ip() == Ipv4Addr::LOCALHOST) + { + return Ok(*addr); + } + // Otherwise, take the first addr (OS preference) + addrs.into_iter().next().context("couldn't resolve address") +} + +#[derive(Debug, PartialEq)] +enum NotFoundRouteKind { + Normal(String), + WellKnown, +} + +/// Translate a [`hyper::Error`] to a wasi-http `ErrorCode` in the context of a request. +pub fn hyper_request_error(err: hyper::Error) -> ErrorCode { + // If there's a source, we might be able to extract a wasi-http error from it. + if let Some(cause) = err.source() { + if let Some(err) = cause.downcast_ref::() { + return err.clone(); + } + } + + tracing::warn!("hyper request error: {err:?}"); + + ErrorCode::HttpProtocolError +} + +pub fn dns_error(rcode: String, info_code: u16) -> ErrorCode { + ErrorCode::DnsError(wasmtime_wasi_http::bindings::http::types::DnsErrorPayload { + rcode: Some(rcode), + info_code: Some(info_code), + }) +} + +#[cfg(test)] +mod tests { + use anyhow::Result; + use http::Request; + + use super::{server::*, *}; + + #[test] + fn test_default_headers() -> Result<()> { + let scheme = "https"; + let host = "fermyon.dev"; + let trigger_route = "/foo/..."; + let component_path = "/foo"; + let path_info = "/bar"; + let client_addr: SocketAddr = "127.0.0.1:8777".parse().unwrap(); + + let req_uri = format!( + "{}://{}{}{}?key1=value1&key2=value2", + scheme, host, component_path, path_info + ); + + let req = http::Request::builder() + .method("POST") + .uri(req_uri) + .body("")?; + + let (router, _) = Router::build("/", [("DUMMY", &trigger_route.into())])?; + let route_match = router.route("/foo/bar")?; + + let default_headers = compute_default_headers(req.uri(), host, &route_match, client_addr)?; + + assert_eq!( + search(&FULL_URL, &default_headers).unwrap(), + "https://fermyon.dev/foo/bar?key1=value1&key2=value2".to_string() + ); + assert_eq!( + search(&PATH_INFO, &default_headers).unwrap(), + "/bar".to_string() + ); + assert_eq!( + search(&MATCHED_ROUTE, &default_headers).unwrap(), + "/foo/...".to_string() + ); + assert_eq!( + search(&BASE_PATH, &default_headers).unwrap(), + "/".to_string() + ); + assert_eq!( + search(&RAW_COMPONENT_ROUTE, &default_headers).unwrap(), + "/foo/...".to_string() + ); + assert_eq!( + search(&COMPONENT_ROUTE, &default_headers).unwrap(), + "/foo".to_string() + ); + assert_eq!( + search(&CLIENT_ADDR, &default_headers).unwrap(), + "127.0.0.1:8777".to_string() + ); + + Ok(()) + } + + #[test] + fn test_default_headers_with_named_wildcards() -> Result<()> { + let scheme = "https"; + let host = "fermyon.dev"; + let trigger_route = "/foo/:userid/..."; + let component_path = "/foo"; + let path_info = "/bar"; + let client_addr: SocketAddr = "127.0.0.1:8777".parse().unwrap(); + + let req_uri = format!( + "{}://{}{}/42{}?key1=value1&key2=value2", + scheme, host, component_path, path_info + ); + + let req = http::Request::builder() + .method("POST") + .uri(req_uri) + .body("")?; + + let (router, _) = Router::build("/", [("DUMMY", &trigger_route.into())])?; + let route_match = router.route("/foo/42/bar")?; + + let default_headers = compute_default_headers(req.uri(), host, &route_match, client_addr)?; + + // TODO: we currently replace the scheme with HTTP. When TLS is supported, this should be fixed. + assert_eq!( + search(&FULL_URL, &default_headers).unwrap(), + "https://fermyon.dev/foo/42/bar?key1=value1&key2=value2".to_string() + ); + assert_eq!( + search(&PATH_INFO, &default_headers).unwrap(), + "/bar".to_string() + ); + assert_eq!( + search(&MATCHED_ROUTE, &default_headers).unwrap(), + "/foo/:userid/...".to_string() + ); + assert_eq!( + search(&BASE_PATH, &default_headers).unwrap(), + "/".to_string() + ); + assert_eq!( + search(&RAW_COMPONENT_ROUTE, &default_headers).unwrap(), + "/foo/:userid/...".to_string() + ); + assert_eq!( + search(&COMPONENT_ROUTE, &default_headers).unwrap(), + "/foo/:userid".to_string() + ); + assert_eq!( + search(&CLIENT_ADDR, &default_headers).unwrap(), + "127.0.0.1:8777".to_string() + ); + + assert_eq!( + search( + &["SPIN_PATH_MATCH_USERID", "X_PATH_MATCH_USERID"], + &default_headers + ) + .unwrap(), + "42".to_string() + ); + + Ok(()) + } + + fn search(keys: &[&str; 2], headers: &[([String; 2], String)]) -> Option { + let mut res: Option = None; + for (k, v) in headers { + if k[0] == keys[0] && k[1] == keys[1] { + res = Some(v.clone()); + } + } + + res + } + + #[test] + fn parse_listen_addr_prefers_ipv4() { + let addr = parse_listen_addr("localhost:12345").unwrap(); + assert_eq!(addr.ip(), Ipv4Addr::LOCALHOST); + assert_eq!(addr.port(), 12345); + } + + #[test] + fn forbidden_headers_are_removed() { + let mut req = Request::get("http://test.spin.internal") + .header("Host", "test.spin.internal") + .header("accept", "text/plain") + .body(Default::default()) + .unwrap(); + + strip_forbidden_headers(&mut req); + + assert_eq!(1, req.headers().len()); + assert!(req.headers().get("Host").is_none()); + + let mut req = Request::get("http://test.spin.internal") + .header("Host", "test.spin.internal:1234") + .header("accept", "text/plain") + .body(Default::default()) + .unwrap(); + + strip_forbidden_headers(&mut req); + + assert_eq!(1, req.headers().len()); + assert!(req.headers().get("Host").is_none()); + } + + #[test] + fn non_forbidden_headers_are_not_removed() { + let mut req = Request::get("http://test.example.com") + .header("Host", "test.example.org") + .header("accept", "text/plain") + .body(Default::default()) + .unwrap(); + + strip_forbidden_headers(&mut req); + + assert_eq!(2, req.headers().len()); + assert!(req.headers().get("Host").is_some()); + } +} diff --git a/crates/trigger-http2/src/server.rs b/crates/trigger-http2/src/server.rs new file mode 100644 index 0000000000..c4f4a881ac --- /dev/null +++ b/crates/trigger-http2/src/server.rs @@ -0,0 +1,386 @@ +use std::{collections::HashMap, future::Future, io::IsTerminal, net::SocketAddr, sync::Arc}; + +use http::{uri::Scheme, Request, Response, StatusCode, Uri}; +use http_body_util::BodyExt; +use hyper::{ + body::{Bytes, Incoming}, + server::conn::http1, + service::service_fn, +}; +use hyper_util::rt::TokioIo; +use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY}; +use spin_http::{ + app_info::AppInfo, + body, + config::{HttpExecutorType, HttpTriggerConfig}, + routes::{RouteMatch, Router}, +}; +use spin_outbound_networking::is_service_chaining_host; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::TcpListener, + task, +}; +use tracing::Instrument; +use wasmtime_wasi_http::body::{HyperIncomingBody as Body, HyperOutgoingBody}; + +use crate::{ + handler::{HandlerType, HttpHandlerExecutor}, + instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute}, + wagi::WagiHttpExecutor, + NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder, +}; + +pub struct HttpServer { + listen_addr: SocketAddr, + trigger_app: TriggerApp, + router: Router, + // Component ID -> component trigger config + component_trigger_configs: HashMap, + // Component ID -> handler type + component_handler_types: HashMap, +} + +impl HttpServer { + pub fn new( + listen_addr: SocketAddr, + trigger_app: TriggerApp, + router: Router, + component_trigger_configs: HashMap, + ) -> anyhow::Result { + let component_handler_types = component_trigger_configs + .keys() + .map(|component_id| { + let component = trigger_app.get_component(component_id)?; + let handler_type = HandlerType::from_component(trigger_app.engine(), component)?; + Ok((component_id.clone(), handler_type)) + }) + .collect::>()?; + Ok(Self { + listen_addr, + trigger_app, + router, + component_trigger_configs, + component_handler_types, + }) + } + + pub async fn serve(self: Arc, listener: TcpListener) -> anyhow::Result<()> { + self.print_startup_msgs("http", &listener)?; + loop { + let (stream, client_addr) = listener.accept().await?; + self.clone().serve_connection(stream, client_addr); + } + } + + pub async fn serve_tls( + self: Arc, + listener: TcpListener, + tls_config: TlsConfig, + ) -> anyhow::Result<()> { + self.print_startup_msgs("https", &listener)?; + let acceptor = tls_config.server_config()?; + loop { + let (stream, client_addr) = listener.accept().await?; + match acceptor.accept(stream).await { + Ok(stream) => self.clone().serve_connection(stream, client_addr), + Err(err) => tracing::error!(?err, "Failed to start TLS session"), + } + } + } + + /// Handles incoming requests using an HTTP executor. + pub async fn handle( + &self, + mut req: Request, + scheme: Scheme, + client_addr: SocketAddr, + ) -> anyhow::Result> { + set_req_uri(&mut req, scheme, self.listen_addr)?; + strip_forbidden_headers(&mut req); + + spin_telemetry::extract_trace_context(&req); + + tracing::info!("Processing request on URI {}", req.uri()); + + let path = req.uri().path().to_string(); + + // Handle well-known spin paths + if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) { + return match well_known { + "health" => Ok(MatchedRoute::with_response_extension( + Response::new(body::full(Bytes::from_static(b"OK"))), + path, + )), + "info" => self.app_info(path), + _ => Self::not_found(NotFoundRouteKind::WellKnown), + }; + } + + let app_id = self + .trigger_app + .app() + .get_metadata(APP_NAME_KEY)? + .unwrap_or_else(|| "".into()); + + // Route to app component + match self.router.route(&path) { + Ok(route_match) => { + let component_id = route_match.component_id(); + + spin_telemetry::metrics::monotonic_counter!( + spin.request_count = 1, + trigger_type = "http", + app_id = app_id, + component_id = component_id + ); + + let instance_builder = self.trigger_app.prepare(component_id)?; + let trigger_config = self.component_trigger_configs.get(component_id).unwrap(); + let handler_type = self.component_handler_types.get(component_id).unwrap(); + let executor = trigger_config + .executor + .as_ref() + .unwrap_or(&HttpExecutorType::Http); + + let res = match executor { + HttpExecutorType::Http => { + HttpHandlerExecutor { + handler_type: *handler_type, + } + .execute(instance_builder, &route_match, req, client_addr) + .await + } + HttpExecutorType::Wagi(wagi_config) => { + let executor = WagiHttpExecutor { + wagi_config: wagi_config.clone(), + }; + executor + .execute(instance_builder, &route_match, req, client_addr) + .await + } + }; + match res { + Ok(res) => Ok(MatchedRoute::with_response_extension( + res, + route_match.raw_route(), + )), + Err(err) => { + tracing::error!("Error processing request: {err:?}"); + instrument_error(&err); + Self::internal_error(None, route_match.raw_route()) + } + } + } + Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())), + } + } + + /// Returns spin status information. + fn app_info(&self, route: String) -> anyhow::Result> { + let info = AppInfo::new(self.trigger_app.app()); + let body = serde_json::to_vec_pretty(&info)?; + Ok(MatchedRoute::with_response_extension( + Response::builder() + .header("content-type", "application/json") + .body(body::full(body.into()))?, + route, + )) + } + + /// Creates an HTTP 500 response. + fn internal_error( + body: Option<&str>, + route: impl Into, + ) -> anyhow::Result> { + let body = match body { + Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())), + None => body::empty(), + }; + + Ok(MatchedRoute::with_response_extension( + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(body)?, + route, + )) + } + + /// Creates an HTTP 404 response. + fn not_found(kind: NotFoundRouteKind) -> anyhow::Result> { + use std::sync::atomic::{AtomicBool, Ordering}; + static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false); + if let NotFoundRouteKind::Normal(route) = kind { + if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed) + && std::io::stderr().is_terminal() + { + terminal::warn!("Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."); + } + } + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(body::empty())?) + } + + fn serve_connection( + self: Arc, + stream: S, + client_addr: SocketAddr, + ) { + task::spawn(async move { + if let Err(err) = http1::Builder::new() + .keep_alive(true) + .serve_connection( + TokioIo::new(stream), + service_fn(move |request| { + self.clone().instrumented_service_fn(client_addr, request) + }), + ) + .await + { + tracing::warn!("Error serving HTTP connection: {err:?}"); + } + }); + } + + async fn instrumented_service_fn( + self: Arc, + client_addr: SocketAddr, + request: Request, + ) -> anyhow::Result> { + let span = http_span!(request, client_addr); + let method = request.method().to_string(); + async { + let result = self + .handle( + request.map(|body: Incoming| { + body.map_err(wasmtime_wasi_http::hyper_response_error) + .boxed() + }), + Scheme::HTTP, + client_addr, + ) + .await; + finalize_http_span(result, method) + } + .instrument(span) + .await + } + + fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> { + let local_addr = listener.local_addr()?; + let base_url = format!("{scheme}://{local_addr:?}"); + terminal::step!("\nServing", "{base_url}"); + tracing::info!("Serving {base_url}"); + + println!("Available Routes:"); + for (route, component_id) in self.router.routes() { + println!(" {}: {}{}", component_id, base_url, route); + if let Some(component) = self.trigger_app.app().get_component(component_id) { + if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? { + println!(" {}", description); + } + } + } + Ok(()) + } +} + +/// The incoming request's scheme and authority +/// +/// The incoming request's URI is relative to the server, so we need to set the scheme and authority +fn set_req_uri(req: &mut Request, scheme: Scheme, addr: SocketAddr) -> anyhow::Result<()> { + let uri = req.uri().clone(); + let mut parts = uri.into_parts(); + let authority = format!("{}:{}", addr.ip(), addr.port()).parse().unwrap(); + parts.scheme = Some(scheme); + parts.authority = Some(authority); + *req.uri_mut() = Uri::from_parts(parts).unwrap(); + Ok(()) +} + +pub fn strip_forbidden_headers(req: &mut Request) { + let headers = req.headers_mut(); + if let Some(host_header) = headers.get("Host") { + if let Ok(host) = host_header.to_str() { + if is_service_chaining_host(host) { + headers.remove("Host"); + } + } + } +} + +// We need to make the following pieces of information available to both executors. +// While the values we set are identical, the way they are passed to the +// modules is going to be different, so each executor must must use the info +// in its standardized way (environment variables for the Wagi executor, and custom headers +// for the Spin HTTP executor). +pub const FULL_URL: [&str; 2] = ["SPIN_FULL_URL", "X_FULL_URL"]; +pub const PATH_INFO: [&str; 2] = ["SPIN_PATH_INFO", "PATH_INFO"]; +pub const MATCHED_ROUTE: [&str; 2] = ["SPIN_MATCHED_ROUTE", "X_MATCHED_ROUTE"]; +pub const COMPONENT_ROUTE: [&str; 2] = ["SPIN_COMPONENT_ROUTE", "X_COMPONENT_ROUTE"]; +pub const RAW_COMPONENT_ROUTE: [&str; 2] = ["SPIN_RAW_COMPONENT_ROUTE", "X_RAW_COMPONENT_ROUTE"]; +pub const BASE_PATH: [&str; 2] = ["SPIN_BASE_PATH", "X_BASE_PATH"]; +pub const CLIENT_ADDR: [&str; 2] = ["SPIN_CLIENT_ADDR", "X_CLIENT_ADDR"]; + +pub(crate) fn compute_default_headers( + uri: &Uri, + host: &str, + route_match: &RouteMatch, + client_addr: SocketAddr, +) -> anyhow::Result> { + fn owned(strs: &[&'static str; 2]) -> [String; 2] { + [strs[0].to_owned(), strs[1].to_owned()] + } + + let owned_full_url: [String; 2] = owned(&FULL_URL); + let owned_path_info: [String; 2] = owned(&PATH_INFO); + let owned_matched_route: [String; 2] = owned(&MATCHED_ROUTE); + let owned_component_route: [String; 2] = owned(&COMPONENT_ROUTE); + let owned_raw_component_route: [String; 2] = owned(&RAW_COMPONENT_ROUTE); + let owned_base_path: [String; 2] = owned(&BASE_PATH); + let owned_client_addr: [String; 2] = owned(&CLIENT_ADDR); + + let mut res = vec![]; + let abs_path = uri + .path_and_query() + .expect("cannot get path and query") + .as_str(); + + let path_info = route_match.trailing_wildcard(); + + let scheme = uri.scheme_str().unwrap_or("http"); + + let full_url = format!("{}://{}{}", scheme, host, abs_path); + + res.push((owned_path_info, path_info)); + res.push((owned_full_url, full_url)); + res.push((owned_matched_route, route_match.based_route().to_string())); + + res.push((owned_base_path, "/".to_string())); + res.push(( + owned_raw_component_route, + route_match.raw_route().to_string(), + )); + res.push((owned_component_route, route_match.raw_route_or_prefix())); + res.push((owned_client_addr, client_addr.to_string())); + + for (wild_name, wild_value) in route_match.named_wildcards() { + let wild_header = format!("SPIN_PATH_MATCH_{}", wild_name.to_ascii_uppercase()); // TODO: safer + let wild_wagi_header = format!("X_PATH_MATCH_{}", wild_name.to_ascii_uppercase()); // TODO: safer + res.push(([wild_header, wild_wagi_header], wild_value.clone())); + } + + Ok(res) +} + +/// An HTTP executor. +pub(crate) trait HttpExecutor: Clone + Send + Sync + 'static { + fn execute( + &self, + instance_builder: TriggerInstanceBuilder, + route_match: &RouteMatch, + req: Request, + client_addr: SocketAddr, + ) -> impl Future>>; +} diff --git a/crates/trigger-http2/src/tls.rs b/crates/trigger-http2/src/tls.rs new file mode 100644 index 0000000000..0f75eaac8f --- /dev/null +++ b/crates/trigger-http2/src/tls.rs @@ -0,0 +1,134 @@ +use rustls_pemfile::private_key; +use std::{ + fs, io, + path::{Path, PathBuf}, + sync::Arc, +}; +use tokio_rustls::{rustls, TlsAcceptor}; + +/// TLS configuration for the server. +#[derive(Clone)] +pub struct TlsConfig { + /// Path to TLS certificate. + pub cert_path: PathBuf, + /// Path to TLS key. + pub key_path: PathBuf, +} + +impl TlsConfig { + // Creates a TLS acceptor from server config. + pub(super) fn server_config(&self) -> anyhow::Result { + let certs = load_certs(&self.cert_path)?; + let private_key = load_key(&self.key_path)?; + + let cfg = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, private_key) + .map_err(|e| anyhow::anyhow!("{}", e))?; + + Ok(Arc::new(cfg).into()) + } +} + +// load_certs parse and return the certs from the provided file +fn load_certs( + path: impl AsRef, +) -> io::Result>> { + rustls_pemfile::certs(&mut io::BufReader::new(fs::File::open(path).map_err( + |err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("failed to read cert file {:?}", err), + ) + }, + )?)) + .collect() +} + +// parse and return the first private key from the provided file +fn load_key(path: impl AsRef) -> io::Result> { + private_key(&mut io::BufReader::new(fs::File::open(path).map_err( + |err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("failed to read private key file {:?}", err), + ) + }, + )?)) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid private key")) + .transpose() + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "private key file contains no private keys", + ) + })? +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_datadir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata") + } + + #[test] + fn test_read_non_existing_cert() { + let path = test_datadir().join("non-existing-file.pem"); + + let certs = load_certs(path); + assert!(certs.is_err()); + assert_eq!(certs.err().unwrap().to_string(), "failed to read cert file Os { code: 2, kind: NotFound, message: \"No such file or directory\" }"); + } + + #[test] + fn test_read_invalid_cert() { + let path = test_datadir().join("invalid-cert.pem"); + + let certs = load_certs(path); + assert!(certs.is_err()); + assert_eq!( + certs.err().unwrap().to_string(), + "section end \"-----END CERTIFICATE-----\" missing" + ); + } + + #[test] + fn test_read_valid_cert() { + let path = test_datadir().join("valid-cert.pem"); + + let certs = load_certs(path); + assert!(certs.is_ok()); + assert_eq!(certs.unwrap().len(), 2); + } + + #[test] + fn test_read_non_existing_private_key() { + let mut path = test_datadir(); + path.push("non-existing-file.pem"); + + let keys = load_key(path); + assert!(keys.is_err()); + assert_eq!(keys.err().unwrap().to_string(), "failed to read private key file Os { code: 2, kind: NotFound, message: \"No such file or directory\" }"); + } + + #[test] + fn test_read_invalid_private_key() { + let mut path = test_datadir(); + path.push("invalid-private-key.pem"); + + let keys = load_key(path); + assert!(keys.is_err()); + assert_eq!(keys.err().unwrap().to_string(), "invalid private key"); + } + + #[test] + fn test_read_valid_private_key() { + let mut path = test_datadir(); + path.push("valid-private-key.pem"); + + let keys = load_key(path); + assert!(keys.is_ok()); + } +} diff --git a/crates/trigger-http2/src/wagi.rs b/crates/trigger-http2/src/wagi.rs new file mode 100644 index 0000000000..7a0a3701a9 --- /dev/null +++ b/crates/trigger-http2/src/wagi.rs @@ -0,0 +1,135 @@ +use std::{io::Cursor, net::SocketAddr}; + +use anyhow::{anyhow, ensure, Context, Result}; +use http_body_util::BodyExt; +use hyper::{Request, Response}; +use spin_http::{config::WagiTriggerConfig, routes::RouteMatch, wagi}; +use tracing::{instrument, Level}; +use wasmtime_wasi::pipe::MemoryOutputPipe; +use wasmtime_wasi_http::body::HyperIncomingBody as Body; + +use crate::{server::HttpExecutor, TriggerInstanceBuilder}; + +#[derive(Clone)] +pub struct WagiHttpExecutor { + pub wagi_config: WagiTriggerConfig, +} + +impl HttpExecutor for WagiHttpExecutor { + #[instrument(name = "spin_trigger_http.execute_wagi", skip_all, err(level = Level::INFO), fields(otel.name = format!("execute_wagi_component {}", route_match.component_id())))] + async fn execute( + &self, + mut instance_builder: TriggerInstanceBuilder<'_>, + route_match: &RouteMatch, + req: Request, + client_addr: SocketAddr, + ) -> Result> { + let component = route_match.component_id(); + + tracing::trace!( + "Executing request using the Wagi executor for component {}", + component + ); + + let uri_path = req.uri().path(); + + // Build the argv array by starting with the config for `argv` and substituting in + // script name and args where appropriate. + let script_name = uri_path.to_string(); + let args = req.uri().query().unwrap_or_default().replace('&', " "); + let argv = self + .wagi_config + .argv + .clone() + .replace("${SCRIPT_NAME}", &script_name) + .replace("${ARGS}", &args); + + let (parts, body) = req.into_parts(); + + let body = body.collect().await?.to_bytes().to_vec(); + let len = body.len(); + + // TODO + // The default host and TLS fields are currently hard-coded. + let mut headers = + wagi::build_headers(route_match, &parts, len, client_addr, "default_host", false); + + let default_host = http::HeaderValue::from_str("localhost")?; + let host = std::str::from_utf8( + parts + .headers + .get("host") + .unwrap_or(&default_host) + .as_bytes(), + )?; + + // Add the default Spin headers. + // This sets the current environment variables Wagi expects (such as + // `PATH_INFO`, or `X_FULL_URL`). + // Note that this overrides any existing headers previously set by Wagi. + for (keys, val) in + crate::server::compute_default_headers(&parts.uri, host, route_match, client_addr)? + { + headers.insert(keys[1].to_string(), val); + } + + let stdout = MemoryOutputPipe::new(usize::MAX); + + let wasi_builder = instance_builder.factor_builders().wasi(); + + // Set up Wagi environment + wasi_builder.args(argv.split(' ')); + wasi_builder.env(headers); + wasi_builder.stdin_pipe(Cursor::new(body)); + wasi_builder.stdout(stdout.clone()); + + let (instance, mut store) = instance_builder.instantiate(()).await?; + + let start = instance + .get_func(&mut store, &self.wagi_config.entrypoint) + .ok_or_else(|| { + anyhow::anyhow!( + "No such function '{}' in {}", + self.wagi_config.entrypoint, + component + ) + })?; + tracing::trace!("Calling Wasm entry point"); + start + .call_async(&mut store, &[], &mut []) + .await + .or_else(ignore_successful_proc_exit_trap) + .with_context(|| { + anyhow!( + "invoking {} for component {component}", + self.wagi_config.entrypoint + ) + })?; + tracing::info!("Module execution complete"); + + // Drop the store so we're left with a unique reference to `stdout`: + drop(store); + + let stdout = stdout.try_into_inner().unwrap(); + ensure!( + !stdout.is_empty(), + "The {component:?} component is configured to use the WAGI executor \ + but did not write to stdout. Check the `executor` in spin.toml." + ); + + wagi::compose_response(&stdout) + } +} + +fn ignore_successful_proc_exit_trap(guest_err: anyhow::Error) -> Result<()> { + match guest_err + .root_cause() + .downcast_ref::() + { + Some(trap) => match trap.0 { + 0 => Ok(()), + _ => Err(guest_err), + }, + None => Err(guest_err), + } +} diff --git a/crates/trigger-http2/testdata/invalid-cert.pem b/crates/trigger-http2/testdata/invalid-cert.pem new file mode 100644 index 0000000000..f1a952b9c8 --- /dev/null +++ b/crates/trigger-http2/testdata/invalid-cert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIBkjCCATegAwIBAgIIEOURVvWgx1AwCgYIKoZIzj0EAwIwIzEhMB8GA1UEAwwY +azNzLWNsaWVudC1jYUAxNzE3NzgwNTIwMB4XDTI0MDYwNzE3MTUyMFoXDTI1MDYw +NzE3MTUyMFowMDEXMBUGA1UEChMOc3lzdGVtOm1hc3RlcnMxFTATBgNVBAMTDHN5 +c3RlbTphZG1pbjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABFGE/CVuauj8kmde +i4AagSJ5GYgGnL0eF55ItiXrKSjMmsIf/N8EyeamxQfWPKVk/1xhH7cS9GcQgNe6 +XrRvmLyjSDBGMA4GA1UdDwEB/wQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDAjAf +BgNVHSMEGDAWgBRpihySeW3DafmU1cw6LMnQCQDD4jAKBggqhkjOPQQDAgNJADBG +AiEA/db1wb4mVrqJVctqbPU9xd0bXzJx7cBDzpWgPP9ISfkCIQDNyuskAkXvUMHH +F73/GJnh8Bt2H38qyzThM8nlR9v1eQ== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIBdjCCAR2gAwIBAgIBADAKBggqhkjOPQQDAjAjMSEwHwYDVQQDDBhrM3MtY2xp +ZW50LWNhQDE3MTc3ODA1MjAwHhcNMjQwNjA3MTcxNTIwWhcNMzQwNjA1MTcxNTIw +WjAjMSEwHwYDVQQDDBhrM3MtY2xpZW50LWNhQDE3MTc3ODA1MjAwWTATBgcqhkjO +PQIBBggqhkjOPQMBBwNCAASozciE0YGl8ak3G0Ll1riwXSScfpK0QRle/cFizdlA +HgDowBssBcla0/2a/eWabxqTPzsZH0cVhL7Tialoj8GNo0IwQDAOBgNVHQ8BAf8E +BAMCAqQwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUaYocknltw2n5lNXMOizJ +0AkAw+IwCgYIKoZIzj0EAwIDRwAwRAIgR8YcLA8cH4qAMDRPDsJqLaw4GJFkgjwV +TCrMgyUxSvACIBwyklgm7mgHcC5WM9CqmliAGZJyV0xRPZBK01POrNf0 diff --git a/crates/trigger-http2/testdata/invalid-private-key.pem b/crates/trigger-http2/testdata/invalid-private-key.pem new file mode 100644 index 0000000000..39d7e59ee6 --- /dev/null +++ b/crates/trigger-http2/testdata/invalid-private-key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIA+FBtmKJbd8wBGOWeuJQfHiCKjjXF8ywEPrvj8S1N3VoAoGCCqGSM49 +AwEHoUQDQgAEUYT8JW5q6PySZ16LgBqBInkZiAacvR4Xnki2JespKMyawh/83wTJ +5qbFB9Y8pWT/XGEftxL0ZxCA17petG+YvA== +-----END EC PRIVATE KEY- \ No newline at end of file diff --git a/crates/trigger-http2/testdata/valid-cert.pem b/crates/trigger-http2/testdata/valid-cert.pem new file mode 100644 index 0000000000..e75166d0e6 --- /dev/null +++ b/crates/trigger-http2/testdata/valid-cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIBkjCCATegAwIBAgIIEOURVvWgx1AwCgYIKoZIzj0EAwIwIzEhMB8GA1UEAwwY +azNzLWNsaWVudC1jYUAxNzE3NzgwNTIwMB4XDTI0MDYwNzE3MTUyMFoXDTI1MDYw +NzE3MTUyMFowMDEXMBUGA1UEChMOc3lzdGVtOm1hc3RlcnMxFTATBgNVBAMTDHN5 +c3RlbTphZG1pbjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABFGE/CVuauj8kmde +i4AagSJ5GYgGnL0eF55ItiXrKSjMmsIf/N8EyeamxQfWPKVk/1xhH7cS9GcQgNe6 +XrRvmLyjSDBGMA4GA1UdDwEB/wQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDAjAf +BgNVHSMEGDAWgBRpihySeW3DafmU1cw6LMnQCQDD4jAKBggqhkjOPQQDAgNJADBG +AiEA/db1wb4mVrqJVctqbPU9xd0bXzJx7cBDzpWgPP9ISfkCIQDNyuskAkXvUMHH +F73/GJnh8Bt2H38qyzThM8nlR9v1eQ== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIBdjCCAR2gAwIBAgIBADAKBggqhkjOPQQDAjAjMSEwHwYDVQQDDBhrM3MtY2xp +ZW50LWNhQDE3MTc3ODA1MjAwHhcNMjQwNjA3MTcxNTIwWhcNMzQwNjA1MTcxNTIw +WjAjMSEwHwYDVQQDDBhrM3MtY2xpZW50LWNhQDE3MTc3ODA1MjAwWTATBgcqhkjO +PQIBBggqhkjOPQMBBwNCAASozciE0YGl8ak3G0Ll1riwXSScfpK0QRle/cFizdlA +HgDowBssBcla0/2a/eWabxqTPzsZH0cVhL7Tialoj8GNo0IwQDAOBgNVHQ8BAf8E +BAMCAqQwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUaYocknltw2n5lNXMOizJ +0AkAw+IwCgYIKoZIzj0EAwIDRwAwRAIgR8YcLA8cH4qAMDRPDsJqLaw4GJFkgjwV +TCrMgyUxSvACIBwyklgm7mgHcC5WM9CqmliAGZJyV0xRPZBK01POrNf0 +-----END CERTIFICATE----- \ No newline at end of file diff --git a/crates/trigger-http2/testdata/valid-private-key.pem b/crates/trigger-http2/testdata/valid-private-key.pem new file mode 100644 index 0000000000..2820fbed26 --- /dev/null +++ b/crates/trigger-http2/testdata/valid-private-key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIA+FBtmKJbd8wBGOWeuJQfHiCKjjXF8ywEPrvj8S1N3VoAoGCCqGSM49 +AwEHoUQDQgAEUYT8JW5q6PySZ16LgBqBInkZiAacvR4Xnki2JespKMyawh/83wTJ +5qbFB9Y8pWT/XGEftxL0ZxCA17petG+YvA== +-----END EC PRIVATE KEY----- \ No newline at end of file diff --git a/crates/trigger2/Cargo.toml b/crates/trigger2/Cargo.toml new file mode 100644 index 0000000000..a978ce987e --- /dev/null +++ b/crates/trigger2/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "spin-trigger2" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = "1" +clap = { version = "3.1.18", features = ["derive", "env"] } +ctrlc = { version = "3.2", features = ["termination"] } +futures = "0.3" +sanitize-filename = "0.5" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +spin-app = { path = "../app" } +spin-common = { path = "../common" } +spin-core = { path = "../core" } +spin-factor-wasi = { path = "../factor-wasi" } +spin-factors = { path = "../factors" } +spin-factors-executor = { path = "../factors-executor" } +spin-telemetry = { path = "../telemetry" } +tokio = { version = "1.23", features = ["fs"] } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/trigger2/src/cli.rs b/crates/trigger2/src/cli.rs new file mode 100644 index 0000000000..b3c9ea1c06 --- /dev/null +++ b/crates/trigger2/src/cli.rs @@ -0,0 +1,317 @@ +mod launch_metadata; + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::{Args, IntoApp, Parser}; +use spin_app::App; +use spin_common::ui::quoted_path; +use spin_common::url::parse_file_url; +use spin_common::{arg_parser::parse_kv, sloth}; +use spin_factors_executor::{ComponentLoader, FactorsExecutor}; + +use crate::factors::TriggerFactors; +use crate::stdio::{FollowComponents, StdioLoggingExecutorHooks}; +use crate::Trigger; +pub use launch_metadata::LaunchMetadata; + +pub const APP_LOG_DIR: &str = "APP_LOG_DIR"; +pub const DISABLE_WASMTIME_CACHE: &str = "DISABLE_WASMTIME_CACHE"; +pub const FOLLOW_LOG_OPT: &str = "FOLLOW_ID"; +pub const WASMTIME_CACHE_FILE: &str = "WASMTIME_CACHE_FILE"; +pub const RUNTIME_CONFIG_FILE: &str = "RUNTIME_CONFIG_FILE"; + +// Set by `spin up` +pub const SPIN_LOCKED_URL: &str = "SPIN_LOCKED_URL"; +pub const SPIN_LOCAL_APP_DIR: &str = "SPIN_LOCAL_APP_DIR"; +pub const SPIN_WORKING_DIR: &str = "SPIN_WORKING_DIR"; + +/// A command that runs a TriggerExecutor. +#[derive(Parser, Debug)] +#[clap( + usage = "spin [COMMAND] [OPTIONS]", + next_help_heading = help_heading::() +)] +pub struct FactorsTriggerCommand { + /// Log directory for the stdout and stderr of components. Setting to + /// the empty string disables logging to disk. + #[clap( + name = APP_LOG_DIR, + short = 'L', + long = "log-dir", + env = "SPIN_LOG_DIR", + )] + pub log: Option, + + /// Disable Wasmtime cache. + #[clap( + name = DISABLE_WASMTIME_CACHE, + long = "disable-cache", + env = DISABLE_WASMTIME_CACHE, + conflicts_with = WASMTIME_CACHE_FILE, + takes_value = false, + )] + pub disable_cache: bool, + + /// Wasmtime cache configuration file. + #[clap( + name = WASMTIME_CACHE_FILE, + long = "cache", + env = WASMTIME_CACHE_FILE, + conflicts_with = DISABLE_WASMTIME_CACHE, + )] + pub cache: Option, + + /// Disable Wasmtime's pooling instance allocator. + #[clap(long = "disable-pooling")] + pub disable_pooling: bool, + + /// Print output to stdout/stderr only for given component(s) + #[clap( + name = FOLLOW_LOG_OPT, + long = "follow", + multiple_occurrences = true, + )] + pub follow_components: Vec, + + /// Silence all component output to stdout/stderr + #[clap( + long = "quiet", + short = 'q', + aliases = &["sh", "shush"], + conflicts_with = FOLLOW_LOG_OPT, + )] + pub silence_component_logs: bool, + + /// Set the static assets of the components in the temporary directory as writable. + #[clap(long = "allow-transient-write")] + pub allow_transient_write: bool, + + /// Configuration file for config providers and wasmtime config. + #[clap( + name = RUNTIME_CONFIG_FILE, + long = "runtime-config-file", + env = RUNTIME_CONFIG_FILE, + )] + pub runtime_config_file: Option, + + /// Set the application state directory path. This is used in the default + /// locations for logs, key value stores, etc. + /// + /// For local apps, this defaults to `.spin/` relative to the `spin.toml` file. + /// For remote apps, this has no default (unset). + /// Passing an empty value forces the value to be unset. + #[clap(long)] + pub state_dir: Option, + + #[clap(flatten)] + pub trigger_args: T::CliArgs, + + /// Set a key/value pair (key=value) in the application's + /// default store. Any existing value will be overwritten. + /// Can be used multiple times. + #[clap(long = "key-value", parse(try_from_str = parse_kv))] + key_values: Vec<(String, String)>, + + /// Run a SQLite statement such as a migration against the default database. + /// To run from a file, prefix the filename with @ e.g. spin up --sqlite @migration.sql + #[clap(long = "sqlite")] + sqlite_statements: Vec, + + #[clap(long = "help-args-only", hide = true)] + pub help_args_only: bool, + + #[clap(long = "launch-metadata-only", hide = true)] + pub launch_metadata_only: bool, +} + +/// An empty implementation of clap::Args to be used as TriggerExecutor::RunConfig +/// for executors that do not need additional CLI args. +#[derive(Args)] +pub struct NoArgs; + +impl FactorsTriggerCommand { + /// Create a new TriggerExecutorBuilder from this TriggerExecutorCommand. + pub async fn run(self) -> Result<()> { + // Handle --help-args-only + if self.help_args_only { + Self::command() + .disable_help_flag(true) + .help_template("{all-args}") + .print_long_help()?; + return Ok(()); + } + + // Handle --launch-metadata-only + if self.launch_metadata_only { + let lm = LaunchMetadata::infer::(); + let json = serde_json::to_string_pretty(&lm)?; + eprintln!("{json}"); + return Ok(()); + } + + // Required env vars + let working_dir = std::env::var(SPIN_WORKING_DIR).context(SPIN_WORKING_DIR)?; + let locked_url = std::env::var(SPIN_LOCKED_URL).context(SPIN_LOCKED_URL)?; + + let follow_components = self.follow_components(); + + // Load App + let app = { + let path = parse_file_url(&locked_url)?; + let contents = std::fs::read(&path) + .with_context(|| format!("failed to read manifest at {}", quoted_path(&path)))?; + let locked = + serde_json::from_slice(&contents).context("failed to parse app lock file JSON")?; + App::new(locked_url, locked) + }; + + // Validate required host features + if let Err(unmet) = app.ensure_needs_only(&T::supported_host_requirements()) { + anyhow::bail!("This application requires the following features that are not available in this version of the '{}' trigger: {unmet}", T::TYPE); + } + + let mut trigger = T::new(self.trigger_args, &app)?; + + let mut core_engine_builder = { + let mut config = spin_core::Config::default(); + + // Apply --cache / --disable-cache + if !self.disable_cache { + config.enable_cache(&self.cache)?; + } + + if self.disable_pooling { + config.disable_pooling(); + } + + trigger.update_core_config(&mut config)?; + + spin_core::Engine::builder(&config)? + }; + trigger.add_to_linker(core_engine_builder.linker())?; + + let factors = TriggerFactors::new(working_dir, self.allow_transient_write); + + // TODO: move these into Factor methods/constructors + // let init_data = crate::HostComponentInitData::new( + // &*self.key_values, + // &*self.sqlite_statements, + // LLmOptions { use_gpu: true }, + // ); + + // TODO: component loader + struct TodoComponentLoader; + impl ComponentLoader for TodoComponentLoader { + fn load_component( + &mut self, + _engine: &spin_core::wasmtime::Engine, + _component: &spin_factors::AppComponent, + ) -> anyhow::Result { + todo!() + } + } + + let mut executor = FactorsExecutor::new(core_engine_builder, factors)?; + + // TODO: integrate with runtime config + let log_dir = self.log.clone(); + executor.add_hooks(StdioLoggingExecutorHooks::new(follow_components, log_dir)); + // TODO: + // builder.hooks(SummariseRuntimeConfigHook::new(&self.runtime_config_file)); + // builder.hooks(KeyValuePersistenceMessageHook); + // builder.hooks(SqlitePersistenceMessageHook); + + let configured_app = { + let _sloth_guard = warn_if_wasm_build_slothful(); + executor.load_app( + app, + Default::default(), // TODO runtime config + TodoComponentLoader, + )? + }; + + // TODO: Construct factors + let run_fut = trigger.run(configured_app); + + let (abortable, abort_handle) = futures::future::abortable(run_fut); + ctrlc::set_handler(move || abort_handle.abort())?; + match abortable.await { + Ok(Ok(())) => { + tracing::info!("Trigger executor shut down: exiting"); + Ok(()) + } + Ok(Err(err)) => { + tracing::error!("Trigger executor failed"); + Err(err) + } + Err(_aborted) => { + tracing::info!("User requested shutdown: exiting"); + Ok(()) + } + } + } + + fn follow_components(&self) -> FollowComponents { + if self.silence_component_logs { + FollowComponents::None + } else if self.follow_components.is_empty() { + FollowComponents::All + } else { + let followed = self.follow_components.clone().into_iter().collect(); + FollowComponents::Named(followed) + } + } +} + +const SLOTH_WARNING_DELAY_MILLIS: u64 = 1250; + +fn warn_if_wasm_build_slothful() -> sloth::SlothGuard { + #[cfg(debug_assertions)] + let message = "\ + This is a debug build - preparing Wasm modules might take a few seconds\n\ + If you're experiencing long startup times please switch to the release build"; + + #[cfg(not(debug_assertions))] + let message = "Preparing Wasm modules is taking a few seconds..."; + + sloth::warn_if_slothful(SLOTH_WARNING_DELAY_MILLIS, format!("{message}\n")) +} + +fn help_heading() -> Option<&'static str> { + if T::TYPE == help::HelpArgsOnlyTrigger::TYPE { + Some("TRIGGER OPTIONS") + } else { + let heading = format!("{} TRIGGER OPTIONS", T::TYPE.to_uppercase()); + let as_str = Box::new(heading).leak(); + Some(as_str) + } +} + +pub mod help { + use super::*; + + /// Null object to support --help-args-only in the absence of + /// a `spin.toml` file. + pub struct HelpArgsOnlyTrigger; + + impl Trigger for HelpArgsOnlyTrigger { + const TYPE: &'static str = "help-args-only"; + type CliArgs = NoArgs; + type InstanceState = (); + + fn new(_cli_args: Self::CliArgs, _app: &App) -> anyhow::Result { + Ok(Self) + } + + async fn run( + self, + _configured_app: spin_factors_executor::FactorsExecutorApp< + TriggerFactors, + Self::InstanceState, + >, + ) -> anyhow::Result<()> { + Ok(()) + } + } +} diff --git a/crates/trigger2/src/cli/launch_metadata.rs b/crates/trigger2/src/cli/launch_metadata.rs new file mode 100644 index 0000000000..be78e3634d --- /dev/null +++ b/crates/trigger2/src/cli/launch_metadata.rs @@ -0,0 +1,83 @@ +use clap::CommandFactory; +use serde::{Deserialize, Serialize}; +use std::ffi::OsString; + +use crate::{cli::FactorsTriggerCommand, Trigger}; + +/// Contains information about the trigger flags (and potentially +/// in future configuration) that a consumer (such as `spin up`) +/// can query using `--launch-metadata-only`. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct LaunchMetadata { + all_flags: Vec, +} + +// This assumes no triggers that want to participate in multi-trigger +// use positional arguments. This is a restriction we'll have to make +// anyway: suppose triggers A and B both take one positional arg, and +// the user writes `spin up 123 456` - which value would go to which trigger? +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +struct LaunchFlag { + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + short: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + long: Option, +} + +impl LaunchMetadata { + pub fn infer() -> Self { + let all_flags: Vec<_> = FactorsTriggerCommand::::command() + .get_arguments() + .map(LaunchFlag::infer) + .collect(); + + LaunchMetadata { all_flags } + } + + pub fn matches<'a>(&self, groups: &[Vec<&'a OsString>]) -> Vec<&'a OsString> { + let mut matches = vec![]; + + for group in groups { + if group.is_empty() { + continue; + } + if self.is_match(group[0]) { + matches.extend(group); + } + } + + matches + } + + fn is_match(&self, arg: &OsString) -> bool { + self.all_flags.iter().any(|f| f.is_match(arg)) + } + + pub fn is_group_match(&self, group: &[&OsString]) -> bool { + if group.is_empty() { + false + } else { + self.all_flags.iter().any(|f| f.is_match(group[0])) + } + } +} + +impl LaunchFlag { + fn infer(arg: &clap::Arg) -> Self { + Self { + long: arg.get_long().map(|s| format!("--{s}")), + short: arg.get_short().map(|ch| format!("-{ch}")), + } + } + + fn is_match(&self, candidate: &OsString) -> bool { + let Some(s) = candidate.to_str() else { + return false; + }; + let candidate = Some(s.to_owned()); + + candidate == self.long || candidate == self.short + } +} diff --git a/crates/trigger2/src/factors.rs b/crates/trigger2/src/factors.rs new file mode 100644 index 0000000000..b433883da8 --- /dev/null +++ b/crates/trigger2/src/factors.rs @@ -0,0 +1,18 @@ +use std::path::PathBuf; + +use spin_factor_wasi::{spin::SpinFilesMounter, WasiFactor}; +use spin_factors::RuntimeFactors; + +#[derive(RuntimeFactors)] +pub struct TriggerFactors { + pub wasi: WasiFactor, +} + +impl TriggerFactors { + pub fn new(working_dir: impl Into, allow_transient_writes: bool) -> Self { + let files_mounter = SpinFilesMounter::new(working_dir, allow_transient_writes); + Self { + wasi: WasiFactor::new(files_mounter), + } + } +} diff --git a/crates/trigger2/src/lib.rs b/crates/trigger2/src/lib.rs new file mode 100644 index 0000000000..095d30e3b3 --- /dev/null +++ b/crates/trigger2/src/lib.rs @@ -0,0 +1,66 @@ +use std::future::Future; + +use clap::Args; +use factors::{TriggerFactors, TriggerFactorsInstanceState}; +use spin_app::App; +use spin_core::Linker; +use spin_factors_executor::{FactorsExecutorApp, FactorsInstanceBuilder}; + +pub mod cli; +mod factors; +mod stdio; + +/// Type alias for a [`FactorsConfiguredApp`] specialized to a [`Trigger`]. +pub type TriggerApp = FactorsExecutorApp::InstanceState>; + +pub type TriggerInstanceBuilder<'a, T> = + FactorsInstanceBuilder<'a, TriggerFactors, ::InstanceState>; + +pub type Store = spin_core::Store>; + +type TriggerInstanceState = spin_factors_executor::InstanceState< + TriggerFactorsInstanceState, + ::InstanceState, +>; + +pub trait Trigger: Sized + Send { + const TYPE: &'static str; + + type CliArgs: Args; + type InstanceState: Send + 'static; + + /// Constructs a new trigger. + fn new(cli_args: Self::CliArgs, app: &App) -> anyhow::Result; + + /// Update the [`spin_core::Config`] for this trigger. + /// + /// !!!Warning!!! This is unsupported; many configurations are likely to + /// cause errors or unexpected behavior, especially in future versions. + #[doc(hidden)] + fn update_core_config(&mut self, config: &mut spin_core::Config) -> anyhow::Result<()> { + let _ = config; + Ok(()) + } + + /// Update the [`Linker`] for this trigger. + fn add_to_linker( + &mut self, + linker: &mut Linker>, + ) -> anyhow::Result<()> { + let _ = linker; + Ok(()) + } + + /// Run this trigger. + fn run( + self, + configured_app: TriggerApp, + ) -> impl Future> + Send; + + /// Returns a list of host requirements supported by this trigger specifically. + /// + /// See [`App::ensure_needs_only`]. + fn supported_host_requirements() -> Vec<&'static str> { + Vec::new() + } +} diff --git a/crates/trigger2/src/stdio.rs b/crates/trigger2/src/stdio.rs new file mode 100644 index 0000000000..fe62c63939 --- /dev/null +++ b/crates/trigger2/src/stdio.rs @@ -0,0 +1,331 @@ +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + task::Poll, +}; + +use anyhow::{Context, Result}; +use spin_common::ui::quoted_path; +use spin_factors_executor::ExecutorHooks; +use tokio::io::AsyncWrite; + +use crate::factors::TriggerFactors; + +/// Which components should have their logs followed on stdout/stderr. +#[derive(Clone, Debug)] +pub enum FollowComponents { + /// No components should have their logs followed. + None, + /// Only the specified components should have their logs followed. + Named(HashSet), + /// All components should have their logs followed. + All, +} + +impl FollowComponents { + /// Whether a given component should have its logs followed on stdout/stderr. + pub fn should_follow(&self, component_id: &str) -> bool { + match self { + Self::None => false, + Self::All => true, + Self::Named(ids) => ids.contains(component_id), + } + } +} + +impl Default for FollowComponents { + fn default() -> Self { + Self::None + } +} + +/// Implements TriggerHooks, writing logs to a log file and (optionally) stderr +pub struct StdioLoggingExecutorHooks { + follow_components: FollowComponents, + log_dir: Option, +} + +impl StdioLoggingExecutorHooks { + pub fn new(follow_components: FollowComponents, log_dir: Option) -> Self { + Self { + follow_components, + log_dir, + } + } + + fn component_stdio_writer( + &self, + component_id: &str, + log_suffix: &str, + log_dir: Option<&Path>, + ) -> Result { + let sanitized_component_id = sanitize_filename::sanitize(component_id); + let log_path = log_dir + .map(|log_dir| log_dir.join(format!("{sanitized_component_id}_{log_suffix}.txt",))); + let log_path = log_path.as_deref(); + + let follow = self.follow_components.should_follow(component_id); + match log_path { + Some(log_path) => ComponentStdioWriter::new_forward(log_path, follow) + .with_context(|| format!("Failed to open log file {}", quoted_path(log_path))), + None => ComponentStdioWriter::new_inherit(), + } + } + + fn validate_follows(&self, app: &spin_app::App) -> anyhow::Result<()> { + match &self.follow_components { + FollowComponents::Named(names) => { + let component_ids: HashSet<_> = + app.components().map(|c| c.id().to_owned()).collect(); + let unknown_names: Vec<_> = names.difference(&component_ids).collect(); + if unknown_names.is_empty() { + Ok(()) + } else { + let unknown_list = bullet_list(&unknown_names); + let actual_list = bullet_list(&component_ids); + let message = anyhow::anyhow!("The following component(s) specified in --follow do not exist in the application:\n{unknown_list}\nThe following components exist:\n{actual_list}"); + Err(message) + } + } + _ => Ok(()), + } + } +} + +impl ExecutorHooks for StdioLoggingExecutorHooks { + fn configure_app( + &mut self, + configured_app: &spin_factors::ConfiguredApp, + ) -> anyhow::Result<()> { + self.validate_follows(configured_app.app())?; + if let Some(dir) = &self.log_dir { + // Ensure log dir exists if set + std::fs::create_dir_all(dir) + .with_context(|| format!("Failed to create log dir {}", quoted_path(dir)))?; + + println!("Logging component stdio to {}", quoted_path(dir.join(""))) + } + Ok(()) + } + + fn prepare_instance( + &self, + builder: &mut spin_factors_executor::FactorsInstanceBuilder, + ) -> anyhow::Result<()> { + let component_id = builder.app_component().id().to_string(); + let wasi_builder = builder.factor_builders().wasi(); + wasi_builder.stdout_pipe(self.component_stdio_writer( + &component_id, + "stdout", + self.log_dir.as_deref(), + )?); + wasi_builder.stderr_pipe(self.component_stdio_writer( + &component_id, + "stderr", + self.log_dir.as_deref(), + )?); + Ok(()) + } +} + +/// ComponentStdioWriter forwards output to a log file, (optionally) stderr, and (optionally) to a +/// tracing compatibility layer. +pub struct ComponentStdioWriter { + inner: ComponentStdioWriterInner, +} + +enum ComponentStdioWriterInner { + /// Inherit stdout/stderr from the parent process. + Inherit, + /// Forward stdout/stderr to a file in addition to the inherited stdout/stderr. + Forward { + sync_file: std::fs::File, + async_file: tokio::fs::File, + state: ComponentStdioWriterState, + follow: bool, + }, +} + +#[derive(Debug)] +enum ComponentStdioWriterState { + File, + Follow(std::ops::Range), +} + +impl ComponentStdioWriter { + fn new_forward(log_path: &Path, follow: bool) -> anyhow::Result { + let sync_file = std::fs::File::options() + .create(true) + .append(true) + .open(log_path)?; + + let async_file = sync_file + .try_clone() + .context("could not get async file handle")? + .into(); + + Ok(Self { + inner: ComponentStdioWriterInner::Forward { + sync_file, + async_file, + state: ComponentStdioWriterState::File, + follow, + }, + }) + } + + fn new_inherit() -> anyhow::Result { + Ok(Self { + inner: ComponentStdioWriterInner::Inherit, + }) + } +} + +impl AsyncWrite for ComponentStdioWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + loop { + match &mut this.inner { + ComponentStdioWriterInner::Inherit => { + let written = futures::ready!( + std::pin::Pin::new(&mut tokio::io::stderr()).poll_write(cx, buf) + ); + let written = match written { + Ok(w) => w, + Err(e) => return Poll::Ready(Err(e)), + }; + return Poll::Ready(Ok(written)); + } + ComponentStdioWriterInner::Forward { + async_file, + state, + follow, + .. + } => match &state { + ComponentStdioWriterState::File => { + let written = + futures::ready!(std::pin::Pin::new(async_file).poll_write(cx, buf)); + let written = match written { + Ok(w) => w, + Err(e) => return Poll::Ready(Err(e)), + }; + if *follow { + *state = ComponentStdioWriterState::Follow(0..written); + } else { + return Poll::Ready(Ok(written)); + } + } + ComponentStdioWriterState::Follow(range) => { + let written = futures::ready!(std::pin::Pin::new(&mut tokio::io::stderr()) + .poll_write(cx, &buf[range.clone()])); + let written = match written { + Ok(w) => w, + Err(e) => return Poll::Ready(Err(e)), + }; + if range.start + written >= range.end { + let end = range.end; + *state = ComponentStdioWriterState::File; + return Poll::Ready(Ok(end)); + } else { + *state = ComponentStdioWriterState::Follow( + (range.start + written)..range.end, + ); + }; + } + }, + } + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + match &mut this.inner { + ComponentStdioWriterInner::Inherit => { + std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx) + } + ComponentStdioWriterInner::Forward { + async_file, state, .. + } => match state { + ComponentStdioWriterState::File => std::pin::Pin::new(async_file).poll_flush(cx), + ComponentStdioWriterState::Follow(_) => { + std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx) + } + }, + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + match &mut this.inner { + ComponentStdioWriterInner::Inherit => { + std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx) + } + ComponentStdioWriterInner::Forward { + async_file, state, .. + } => match state { + ComponentStdioWriterState::File => std::pin::Pin::new(async_file).poll_shutdown(cx), + ComponentStdioWriterState::Follow(_) => { + std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx) + } + }, + } + } +} + +impl std::io::Write for ComponentStdioWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + spin_telemetry::logs::handle_app_log(buf); + + match &mut self.inner { + ComponentStdioWriterInner::Inherit => { + std::io::stderr().write_all(buf)?; + Ok(buf.len()) + } + ComponentStdioWriterInner::Forward { + sync_file, follow, .. + } => { + let written = sync_file.write(buf)?; + if *follow { + std::io::stderr().write_all(&buf[..written])?; + } + Ok(written) + } + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match &mut self.inner { + ComponentStdioWriterInner::Inherit => std::io::stderr().flush(), + ComponentStdioWriterInner::Forward { + sync_file, follow, .. + } => { + sync_file.flush()?; + if *follow { + std::io::stderr().flush()?; + } + Ok(()) + } + } + } +} + +fn bullet_list(items: impl IntoIterator) -> String { + items + .into_iter() + .map(|item| format!(" - {item}")) + .collect::>() + .join("\n") +} diff --git a/run-factors-tests.sh b/run-factors-tests.sh index 497a0f3190..3b9ca4e42c 100755 --- a/run-factors-tests.sh +++ b/run-factors-tests.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash # TODO(factors): Remove after enabling CI for factors branch -cargo test -p '*factor*' +cargo test -p '*factor*' -p spin-trigger2 -p spin-trigger-http2