From c578fdd1a32aef0f02cc11e648e0a0f3e4137488 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Tue, 20 Aug 2024 17:50:58 +0200 Subject: [PATCH 1/3] Refactor http-trigger to be more reusable outside of main flow Signed-off-by: Ryan Levick --- crates/trigger-http2/src/lib.rs | 61 +++--- crates/trigger-http2/src/outbound_http.rs | 13 +- crates/trigger-http2/src/server.rs | 226 ++++++++++++++-------- 3 files changed, 180 insertions(+), 120 deletions(-) diff --git a/crates/trigger-http2/src/lib.rs b/crates/trigger-http2/src/lib.rs index abb7319827..1ee4262c49 100644 --- a/crates/trigger-http2/src/lib.rs +++ b/crates/trigger-http2/src/lib.rs @@ -23,7 +23,6 @@ use serde::Deserialize; use spin_app::App; use spin_http::{config::HttpTriggerConfig, routes::Router}; use spin_trigger2::Trigger; -use tokio::net::TcpListener; use wasmtime_wasi_http::bindings::wasi::http::types::ErrorCode; use server::HttpServer; @@ -67,7 +66,7 @@ pub(crate) type InstanceState = (); /// The Spin HTTP trigger. pub struct HttpTrigger { - /// The address the server will listen on. + /// The address the server should listen on. /// /// Note that this might not be the actual socket address that ends up being bound to. /// If the port is set to 0, the actual address will be determined by the OS. @@ -85,6 +84,29 @@ impl Trigger for HttpTrigger { type InstanceState = InstanceState; fn new(cli_args: Self::CliArgs, app: &spin_app::App) -> anyhow::Result { + Self::new(app, cli_args.address, cli_args.into_tls_config()) + } + + async fn run(self, trigger_app: TriggerApp) -> anyhow::Result<()> { + let server = self.into_server(trigger_app)?; + + server.serve().await?; + + Ok(()) + } + + fn supported_host_requirements() -> Vec<&'static str> { + vec![spin_app::locked::SERVICE_CHAINING_KEY] + } +} + +impl HttpTrigger { + /// Create a new `HttpTrigger`. + pub fn new( + app: &spin_app::App, + listen_addr: SocketAddr, + tls_config: Option, + ) -> anyhow::Result { Self::validate_app(app)?; let component_trigger_configs = HashMap::from_iter( @@ -114,55 +136,32 @@ impl Trigger for HttpTrigger { "Constructed router: {:?}", router.routes().collect::>() ); - Ok(Self { - listen_addr: cli_args.address, - tls_config: cli_args.into_tls_config(), + listen_addr, + tls_config, router, component_trigger_configs, }) } - async fn run(self, trigger_app: TriggerApp) -> anyhow::Result<()> { + /// Turn this [`HttpTrigger`] into an [`HttpServer`]. + pub fn into_server(self, trigger_app: TriggerApp) -> anyhow::Result> { let Self { listen_addr, tls_config, router, component_trigger_configs, } = self; - - let listener = TcpListener::bind(listen_addr) - .await - .with_context(|| format!("Unable to listen on {listen_addr}"))?; - - // Get the address the server is actually listening on - // We can't use `self.listen_addr` because it might not - // be fully resolved (e.g, port 0). - let listen_addr = listener - .local_addr() - .context("failed to retrieve address server is listening on")?; let server = Arc::new(HttpServer::new( listen_addr, + tls_config, trigger_app, router, component_trigger_configs, )?); - - if let Some(tls_config) = tls_config { - server.serve_tls(listener, tls_config).await? - } else { - server.serve(listener).await? - }; - - Ok(()) + Ok(server) } - fn supported_host_requirements() -> Vec<&'static str> { - vec![spin_app::locked::SERVICE_CHAINING_KEY] - } -} - -impl HttpTrigger { fn validate_app(app: &App) -> anyhow::Result<()> { #[derive(Deserialize)] #[serde(deny_unknown_fields)] diff --git a/crates/trigger-http2/src/outbound_http.rs b/crates/trigger-http2/src/outbound_http.rs index 623eb6b69e..38219b8f7e 100644 --- a/crates/trigger-http2/src/outbound_http.rs +++ b/crates/trigger-http2/src/outbound_http.rs @@ -11,16 +11,17 @@ use spin_http::routes::RouteMatch; use spin_outbound_networking::parse_service_chaining_target; use wasmtime_wasi_http::types::IncomingResponse; -use crate::server::HttpServer; +use crate::server::RequestHandler; +/// An outbound HTTP interceptor that handles service chaining requests. pub struct OutboundHttpInterceptor { - server: Arc, + handler: Arc, origin: SelfRequestOrigin, } impl OutboundHttpInterceptor { - pub fn new(server: Arc, origin: SelfRequestOrigin) -> Self { - Self { server, origin } + pub fn new(handler: Arc, origin: SelfRequestOrigin) -> Self { + Self { handler, origin } } } @@ -40,10 +41,10 @@ impl spin_factor_outbound_http::OutboundHttpInterceptor for OutboundHttpIntercep let route_match = RouteMatch::synthetic(&component_id, uri.path()); let req = std::mem::take(request); let between_bytes_timeout = config.between_bytes_timeout; - let server = self.server.clone(); + let server = self.handler.clone(); let resp_fut = async move { match server - .handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) + .handle_trigger_route(req, &route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) .await { Ok(resp) => Ok(Ok(IncomingResponse { diff --git a/crates/trigger-http2/src/server.rs b/crates/trigger-http2/src/server.rs index 396a1870bc..acea0eb9eb 100644 --- a/crates/trigger-http2/src/server.rs +++ b/crates/trigger-http2/src/server.rs @@ -36,42 +36,52 @@ use crate::{ Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder, }; +/// An HTTP server which runs Spin apps. pub struct HttpServer { - /// The address the server is listening on. - listen_addr: SocketAddr, - trigger_app: TriggerApp, + tls_config: Option, router: Router, - // Component ID -> component trigger config - component_trigger_configs: HashMap, - // Component ID -> handler type - component_handler_types: HashMap, + handler: Arc, } impl HttpServer { + /// Create a new [`HttpServer`]. pub fn new( listen_addr: SocketAddr, + tls_config: Option, trigger_app: TriggerApp, router: Router, component_trigger_configs: HashMap, ) -> anyhow::Result { - let component_handler_types = component_trigger_configs - .keys() - .map(|component_id| { - let component = trigger_app.get_component(component_id)?; - let handler_type = HandlerType::from_component(trigger_app.engine(), component)?; - Ok((component_id.clone(), handler_type)) - }) - .collect::>()?; Ok(Self { - listen_addr, - trigger_app, + tls_config, router, - component_trigger_configs, - component_handler_types, + handler: Arc::new(RequestHandler::new( + listen_addr, + trigger_app, + component_trigger_configs, + )?), }) } - pub async fn serve(self: Arc, listener: TcpListener) -> anyhow::Result<()> { + /// Serve incoming requests over the provided [`TcpListener`]. + pub async fn serve(self: Arc) -> anyhow::Result<()> { + let listener = TcpListener::bind(self.handler.listen_addr) + .await + .with_context(|| { + format!( + "Unable to listen on {listen_addr}", + listen_addr = self.handler.listen_addr + ) + })?; + if let Some(tls_config) = self.tls_config.clone() { + self.serve_https(listener, tls_config).await?; + } else { + self.serve_http(listener).await?; + } + Ok(()) + } + + async fn serve_http(self: Arc, listener: TcpListener) -> anyhow::Result<()> { self.print_startup_msgs("http", &listener)?; loop { let (stream, client_addr) = listener.accept().await?; @@ -80,7 +90,7 @@ impl HttpServer { } } - pub async fn serve_tls( + async fn serve_https( self: Arc, listener: TcpListener, tls_config: TlsConfig, @@ -140,70 +150,15 @@ impl HttpServer { /// Handles a successful route match. pub async fn handle_trigger_route( self: &Arc, - mut req: Request, + req: Request, route_match: RouteMatch, server_scheme: Scheme, client_addr: SocketAddr, ) -> anyhow::Result> { - set_req_uri(&mut req, server_scheme.clone())?; - let app_id = self - .trigger_app - .app() - .get_metadata(APP_NAME_KEY)? - .unwrap_or_else(|| "".into()); - - let component_id = route_match.component_id(); - - spin_telemetry::metrics::monotonic_counter!( - spin.request_count = 1, - trigger_type = "http", - app_id = app_id, - component_id = component_id - ); - - let mut instance_builder = self.trigger_app.prepare(component_id)?; - - // Set up outbound HTTP request origin and service chaining - let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr)?; - instance_builder - .factor_builders() - .outbound_http() - .set_request_interceptor(OutboundHttpInterceptor::new(self.clone(), origin))?; - - // Prepare HTTP executor - let trigger_config = self.component_trigger_configs.get(component_id).unwrap(); - let handler_type = self.component_handler_types.get(component_id).unwrap(); - let executor = trigger_config - .executor - .as_ref() - .unwrap_or(&HttpExecutorType::Http); - - let res = match executor { - HttpExecutorType::Http => match handler_type { - HandlerType::Spin => { - SpinHttpExecutor - .execute(instance_builder, &route_match, req, client_addr) - .await - } - HandlerType::Wasi0_2 - | HandlerType::Wasi2023_11_10 - | HandlerType::Wasi2023_10_18 => { - WasiHttpExecutor { - handler_type: *handler_type, - } - .execute(instance_builder, &route_match, req, client_addr) - .await - } - }, - HttpExecutorType::Wagi(wagi_config) => { - let executor = WagiHttpExecutor { - wagi_config: wagi_config.clone(), - }; - executor - .execute(instance_builder, &route_match, req, client_addr) - .await - } - }; + let res = self + .handler + .handle_trigger_route(req, &route_match, server_scheme, client_addr) + .await; match res { Ok(res) => Ok(MatchedRoute::with_response_extension( res, @@ -219,7 +174,7 @@ impl HttpServer { /// Returns spin status information. fn app_info(&self, route: String) -> anyhow::Result> { - let info = AppInfo::new(self.trigger_app.app()); + let info = AppInfo::new(self.handler.trigger_app.app()); let body = serde_json::to_vec_pretty(&info)?; Ok(MatchedRoute::with_response_extension( Response::builder() @@ -323,7 +278,7 @@ impl HttpServer { println!("Available Routes:"); for (route, component_id) in self.router.routes() { println!(" {}: {}{}", component_id, base_url, route); - if let Some(component) = self.trigger_app.app().get_component(component_id) { + if let Some(component) = self.handler.trigger_app.app().get_component(component_id) { if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? { println!(" {}", description); } @@ -333,6 +288,111 @@ impl HttpServer { } } +/// Handles a routed HTTP trigger request. +pub struct RequestHandler { + /// The address the server is listening on. + pub(crate) listen_addr: SocketAddr, + /// The app being triggered. + trigger_app: TriggerApp, + // Component ID -> component trigger config + component_trigger_configs: HashMap, + // Component ID -> handler type + component_handler_types: HashMap, +} + +impl RequestHandler { + /// Create a new [`RequestHandler`] + pub fn new( + listen_addr: SocketAddr, + trigger_app: TriggerApp, + component_trigger_configs: HashMap, + ) -> anyhow::Result { + let component_handler_types = component_trigger_configs + .keys() + .map(|component_id| { + let component = trigger_app.get_component(component_id)?; + let handler_type = HandlerType::from_component(trigger_app.engine(), component)?; + Ok((component_id.clone(), handler_type)) + }) + .collect::>()?; + Ok(Self { + listen_addr, + trigger_app, + component_trigger_configs, + component_handler_types, + }) + } + + /// Handle a routed request. + pub async fn handle_trigger_route( + self: &Arc, + mut req: Request, + route_match: &RouteMatch, + server_scheme: Scheme, + client_addr: SocketAddr, + ) -> anyhow::Result> { + set_req_uri(&mut req, server_scheme.clone())?; + let app_id = self + .trigger_app + .app() + .get_metadata(APP_NAME_KEY)? + .unwrap_or_else(|| "".into()); + + let component_id = route_match.component_id(); + + spin_telemetry::metrics::monotonic_counter!( + spin.request_count = 1, + trigger_type = "http", + app_id = app_id, + component_id = component_id + ); + + let mut instance_builder = self.trigger_app.prepare(component_id)?; + + // Set up outbound HTTP request origin and service chaining + let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr)?; + instance_builder + .factor_builders() + .outbound_http() + .set_request_interceptor(OutboundHttpInterceptor::new(self.clone(), origin))?; + + // Prepare HTTP executor + let trigger_config = self.component_trigger_configs.get(component_id).unwrap(); + let handler_type = self.component_handler_types.get(component_id).unwrap(); + let executor = trigger_config + .executor + .as_ref() + .unwrap_or(&HttpExecutorType::Http); + + match executor { + HttpExecutorType::Http => match handler_type { + HandlerType::Spin => { + SpinHttpExecutor + .execute(instance_builder, route_match, req, client_addr) + .await + } + HandlerType::Wasi0_2 + | HandlerType::Wasi2023_11_10 + | HandlerType::Wasi2023_10_18 => { + WasiHttpExecutor { + handler_type: *handler_type, + } + .execute(instance_builder, route_match, req, client_addr) + .await + } + }, + HttpExecutorType::Wagi(wagi_config) => { + let executor = WagiHttpExecutor { + wagi_config: wagi_config.clone(), + }; + executor + .execute(instance_builder, route_match, req, client_addr) + .await + } + } + } +} + /// The incoming request's scheme and authority /// /// The incoming request's URI is relative to the server, so we need to set the scheme and authority. From 6a67cefe0cdabcf550d9e0be0c92589defbcb88e Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Tue, 20 Aug 2024 19:07:48 +0200 Subject: [PATCH 2/3] Get things compiling Signed-off-by: Ryan Levick --- Cargo.lock | 6 +- crates/trigger-http2/src/lib.rs | 2 +- crates/trigger2/src/cli.rs | 218 ++++++++++++------ crates/trigger2/src/stdio.rs | 9 +- tests/conformance-tests/src/lib.rs | 9 +- tests/testing-framework/Cargo.toml | 6 +- .../src/runtimes/in_process_spin.rs | 70 +++--- 7 files changed, 199 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aed3f025a3..20e822f0e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8811,10 +8811,12 @@ dependencies = [ "nix 0.26.4", "regex", "reqwest 0.12.4", + "spin-app", + "spin-factors-executor", "spin-http", "spin-loader", - "spin-trigger", - "spin-trigger-http", + "spin-trigger-http2", + "spin-trigger2", "temp-dir", "test-environment", "tokio", diff --git a/crates/trigger-http2/src/lib.rs b/crates/trigger-http2/src/lib.rs index 1ee4262c49..8263915b1d 100644 --- a/crates/trigger-http2/src/lib.rs +++ b/crates/trigger-http2/src/lib.rs @@ -25,7 +25,7 @@ use spin_http::{config::HttpTriggerConfig, routes::Router}; use spin_trigger2::Trigger; use wasmtime_wasi_http::bindings::wasi::http::types::ErrorCode; -use server::HttpServer; +pub use server::HttpServer; pub use tls::TlsConfig; diff --git a/crates/trigger2/src/cli.rs b/crates/trigger2/src/cli.rs index db095fd458..2aab88adb0 100644 --- a/crates/trigger2/src/cli.rs +++ b/crates/trigger2/src/cli.rs @@ -1,6 +1,7 @@ mod launch_metadata; -use std::path::PathBuf; +use std::future::Future; +use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use clap::{Args, IntoApp, Parser}; @@ -13,7 +14,7 @@ use spin_runtime_config::ResolvedRuntimeConfig; use crate::factors::{TriggerFactors, TriggerFactorsRuntimeConfig}; use crate::stdio::{FollowComponents, StdioLoggingExecutorHooks}; -use crate::Trigger; +use crate::{Trigger, TriggerApp}; pub use launch_metadata::LaunchMetadata; pub const APP_LOG_DIR: &str = "APP_LOG_DIR"; @@ -172,43 +173,154 @@ impl FactorsTriggerCommand { anyhow::bail!("This application requires the following features that are not available in this version of the '{}' trigger: {unmet}", T::TYPE); } - let mut trigger = T::new(self.trigger_args, &app)?; + let trigger = T::new(self.trigger_args, &app)?; + let mut builder = TriggerAppBuilder::new(trigger, PathBuf::from(working_dir)); + let config = builder.engine_config(); - let mut core_engine_builder = { - let mut config = spin_core::Config::default(); + // Apply --cache / --disable-cache + if !self.disable_cache { + config.enable_cache(&self.cache)?; + } - // Apply --cache / --disable-cache - if !self.disable_cache { - config.enable_cache(&self.cache)?; - } + if self.disable_pooling { + config.disable_pooling(); + } - if self.disable_pooling { - config.disable_pooling(); + let run_fut = builder + .run( + app, + TriggerAppOptions { + runtime_config_file: self.runtime_config_file.as_deref(), + state_dir: self.state_dir.as_deref(), + initial_key_values: self.key_values, + allow_transient_write: self.allow_transient_write, + follow_components, + log_dir: self.log, + }, + ) + .await?; + + let (abortable, abort_handle) = futures::future::abortable(run_fut); + ctrlc::set_handler(move || abort_handle.abort())?; + match abortable.await { + Ok(Ok(())) => { + tracing::info!("Trigger executor shut down: exiting"); + Ok(()) + } + Ok(Err(err)) => { + tracing::error!("Trigger executor failed"); + Err(err) + } + Err(_aborted) => { + tracing::info!("User requested shutdown: exiting"); + Ok(()) } + } + } + + fn follow_components(&self) -> FollowComponents { + if self.silence_component_logs { + FollowComponents::None + } else if self.follow_components.is_empty() { + FollowComponents::All + } else { + let followed = self.follow_components.clone().into_iter().collect(); + FollowComponents::Named(followed) + } + } +} + +const SLOTH_WARNING_DELAY_MILLIS: u64 = 1250; + +fn warn_if_wasm_build_slothful() -> sloth::SlothGuard { + #[cfg(debug_assertions)] + let message = "\ + This is a debug build - preparing Wasm modules might take a few seconds\n\ + If you're experiencing long startup times please switch to the release build"; + + #[cfg(not(debug_assertions))] + let message = "Preparing Wasm modules is taking a few seconds..."; + + sloth::warn_if_slothful(SLOTH_WARNING_DELAY_MILLIS, format!("{message}\n")) +} + +fn help_heading() -> Option<&'static str> { + if T::TYPE == help::HelpArgsOnlyTrigger::TYPE { + Some("TRIGGER OPTIONS") + } else { + let heading = format!("{} TRIGGER OPTIONS", T::TYPE.to_uppercase()); + let as_str = Box::new(heading).leak(); + Some(as_str) + } +} + +/// A builder for a [`TriggerApp`]. +pub struct TriggerAppBuilder { + engine_config: spin_core::Config, + working_dir: PathBuf, + pub trigger: T, +} - trigger.update_core_config(&mut config)?; +/// Options for building a [`TriggerApp`]. +#[derive(Default)] +pub struct TriggerAppOptions<'a> { + /// Path to the runtime config file. + runtime_config_file: Option<&'a Path>, + /// Path to the state directory. + state_dir: Option<&'a str>, + /// Initial key/value pairs to set in the app's default store. + initial_key_values: Vec<(String, String)>, + /// Whether to allow transient writes to mounted files + allow_transient_write: bool, + /// Which components should have their logs followed. + follow_components: FollowComponents, + /// Log directory for component stdout/stderr. + log_dir: Option, +} + +impl TriggerAppBuilder { + pub fn new(trigger: T, working_dir: PathBuf) -> Self { + Self { + engine_config: spin_core::Config::default(), + working_dir, + trigger, + } + } + + pub fn engine_config(&mut self) -> &mut spin_core::Config { + &mut self.engine_config + } - spin_core::Engine::builder(&config)? + /// Build a [`TriggerApp`] from the given [`App`] and options. + pub async fn build( + &mut self, + app: App, + options: TriggerAppOptions<'_>, + ) -> anyhow::Result> { + let mut core_engine_builder = { + self.trigger.update_core_config(&mut self.engine_config)?; + + spin_core::Engine::builder(&self.engine_config)? }; - trigger.add_to_linker(core_engine_builder.linker())?; + self.trigger.add_to_linker(core_engine_builder.linker())?; - let runtime_config = match &self.runtime_config_file { + let runtime_config = match options.runtime_config_file { Some(runtime_config_path) => { ResolvedRuntimeConfig::::from_file( runtime_config_path, - self.state_dir.as_deref(), + options.state_dir, )? } - None => ResolvedRuntimeConfig::default(self.state_dir.as_deref()), + None => ResolvedRuntimeConfig::default(options.state_dir), }; runtime_config - .set_initial_key_values(&self.key_values) + .set_initial_key_values(&options.initial_key_values) .await?; let factors = TriggerFactors::new( - working_dir, - self.allow_transient_write, + self.working_dir.clone(), + options.allow_transient_write, runtime_config.key_value_resolver, runtime_config.sqlite_resolver, ); @@ -249,8 +361,10 @@ impl FactorsTriggerCommand { let mut executor = FactorsExecutor::new(core_engine_builder, factors)?; - let log_dir = self.log.clone(); - executor.add_hooks(StdioLoggingExecutorHooks::new(follow_components, log_dir)); + executor.add_hooks(StdioLoggingExecutorHooks::new( + options.follow_components, + options.log_dir, + )); // TODO: // builder.hooks(SummariseRuntimeConfigHook::new(&self.runtime_config_file)); // builder.hooks(KeyValuePersistenceMessageHook); @@ -261,59 +375,17 @@ impl FactorsTriggerCommand { executor.load_app(app, runtime_config.runtime_config, SimpleComponentLoader)? }; - let run_fut = trigger.run(configured_app); - - let (abortable, abort_handle) = futures::future::abortable(run_fut); - ctrlc::set_handler(move || abort_handle.abort())?; - match abortable.await { - Ok(Ok(())) => { - tracing::info!("Trigger executor shut down: exiting"); - Ok(()) - } - Ok(Err(err)) => { - tracing::error!("Trigger executor failed"); - Err(err) - } - Err(_aborted) => { - tracing::info!("User requested shutdown: exiting"); - Ok(()) - } - } - } - - fn follow_components(&self) -> FollowComponents { - if self.silence_component_logs { - FollowComponents::None - } else if self.follow_components.is_empty() { - FollowComponents::All - } else { - let followed = self.follow_components.clone().into_iter().collect(); - FollowComponents::Named(followed) - } + Ok(configured_app) } -} - -const SLOTH_WARNING_DELAY_MILLIS: u64 = 1250; - -fn warn_if_wasm_build_slothful() -> sloth::SlothGuard { - #[cfg(debug_assertions)] - let message = "\ - This is a debug build - preparing Wasm modules might take a few seconds\n\ - If you're experiencing long startup times please switch to the release build"; - - #[cfg(not(debug_assertions))] - let message = "Preparing Wasm modules is taking a few seconds..."; - - sloth::warn_if_slothful(SLOTH_WARNING_DELAY_MILLIS, format!("{message}\n")) -} -fn help_heading() -> Option<&'static str> { - if T::TYPE == help::HelpArgsOnlyTrigger::TYPE { - Some("TRIGGER OPTIONS") - } else { - let heading = format!("{} TRIGGER OPTIONS", T::TYPE.to_uppercase()); - let as_str = Box::new(heading).leak(); - Some(as_str) + /// Run the [`TriggerApp`] with the given [`App`] and options. + pub async fn run( + mut self, + app: App, + options: TriggerAppOptions<'_>, + ) -> anyhow::Result>> { + let configured_app = self.build(app, options).await?; + Ok(self.trigger.run(configured_app)) } } diff --git a/crates/trigger2/src/stdio.rs b/crates/trigger2/src/stdio.rs index fe62c63939..398d59f727 100644 --- a/crates/trigger2/src/stdio.rs +++ b/crates/trigger2/src/stdio.rs @@ -12,8 +12,9 @@ use tokio::io::AsyncWrite; use crate::factors::TriggerFactors; /// Which components should have their logs followed on stdout/stderr. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub enum FollowComponents { + #[default] /// No components should have their logs followed. None, /// Only the specified components should have their logs followed. @@ -33,12 +34,6 @@ impl FollowComponents { } } -impl Default for FollowComponents { - fn default() -> Self { - Self::None - } -} - /// Implements TriggerHooks, writing logs to a log file and (optionally) stderr pub struct StdioLoggingExecutorHooks { follow_components: FollowComponents, diff --git a/tests/conformance-tests/src/lib.rs b/tests/conformance-tests/src/lib.rs index a030848196..bed1488a42 100644 --- a/tests/conformance-tests/src/lib.rs +++ b/tests/conformance-tests/src/lib.rs @@ -1,4 +1,5 @@ use anyhow::Context as _; +use test_environment::http::Request; use testing_framework::runtimes::spin_cli::{SpinCli, SpinConfig}; /// Run a single conformance test against the supplied spin binary. @@ -56,9 +57,11 @@ pub fn run_test( let conformance_tests::config::Invocation::Http(mut invocation) = invocation; invocation.request.substitute_from_env(&mut env)?; let spin = env.runtime_mut(); - let actual = invocation - .request - .send(|request| spin.make_http_request(request))?; + let actual = invocation.request.send(|request| { + let request = + Request::full(request.method, request.path, request.headers, request.body); + spin.make_http_request(request) + })?; conformance_tests::assertions::assert_response(&invocation.response, &actual) .with_context(|| { diff --git a/tests/testing-framework/Cargo.toml b/tests/testing-framework/Cargo.toml index d247b4123b..cc388246f3 100644 --- a/tests/testing-framework/Cargo.toml +++ b/tests/testing-framework/Cargo.toml @@ -14,9 +14,11 @@ regex = "1.10.2" reqwest = { workspace = true } temp-dir = "0.1.11" test-environment = { workspace = true } -spin-trigger-http = { path = "../../crates/trigger-http" } +spin-factors-executor = { path = "../../crates/factors-executor" } +spin-app = { path = "../../crates/app" } +spin-trigger-http2 = { path = "../../crates/trigger-http2" } spin-http = { path = "../../crates/http" } -spin-trigger = { path = "../../crates/trigger" } +spin-trigger2 = { path = "../../crates/trigger2" } spin-loader = { path = "../../crates/loader" } toml = "0.8.6" tokio = "1.23" diff --git a/tests/testing-framework/src/runtimes/in_process_spin.rs b/tests/testing-framework/src/runtimes/in_process_spin.rs index 5574c72b55..16ad654af9 100644 --- a/tests/testing-framework/src/runtimes/in_process_spin.rs +++ b/tests/testing-framework/src/runtimes/in_process_spin.rs @@ -1,6 +1,11 @@ //! The Spin runtime running in the same process as the test +use std::{path::PathBuf, sync::Arc}; + use anyhow::Context as _; +use spin_http::routes::RouteMatch; +use spin_trigger2::cli::{TriggerAppBuilder, TriggerAppOptions}; +use spin_trigger_http2::{HttpServer, HttpTrigger}; use test_environment::{ http::{Request, Response}, services::ServicesConfig, @@ -11,7 +16,7 @@ use test_environment::{ /// /// Use `runtimes::spin_cli::SpinCli` if you'd rather use Spin as a separate process pub struct InProcessSpin { - trigger: spin_trigger_http::HttpTrigger, + server: Arc, } impl InProcessSpin { @@ -32,31 +37,45 @@ impl InProcessSpin { } /// Create a new instance of Spin running in the same process as the tests - pub fn new(trigger: spin_trigger_http::HttpTrigger) -> Self { - Self { trigger } + pub fn new(server: Arc) -> Self { + Self { server } } /// Make an HTTP request to the Spin instance pub fn make_http_request(&self, req: Request<'_, &[u8]>) -> anyhow::Result { tokio::runtime::Runtime::new()?.block_on(async { let method: reqwest::Method = req.method.into(); - let req = http::request::Request::builder() + let mut builder = http::request::Request::builder() .method(method) - .uri(req.path) - // TODO(rylev): convert headers and body as well - .body(spin_http::body::empty()) - .unwrap(); + .uri(req.path); + + for (key, value) in req.headers { + builder = builder.header(*key, *value); + } + // TODO(rylev): convert body as well + let req = builder.body(spin_http::body::empty()).unwrap(); + let route_match = RouteMatch::synthetic("test", "/"); let response = self - .trigger - .handle( + .server + .handle_trigger_route( req, + route_match, http::uri::Scheme::HTTP, - (std::net::Ipv4Addr::LOCALHOST, 3000).into(), (std::net::Ipv4Addr::LOCALHOST, 7000).into(), ) .await?; use http_body_util::BodyExt; let status = response.status().as_u16(); + let headers = response + .headers() + .iter() + .map(|(k, v)| { + ( + k.as_str().to_owned(), + String::from_utf8(v.as_bytes().to_owned()).unwrap(), + ) + }) + .collect(); let body = response.into_body(); let chunks = body .collect() @@ -64,7 +83,7 @@ impl InProcessSpin { .context("could not get runtime test HTTP response")? .to_bytes() .to_vec(); - Ok(Response::full(status, Default::default(), chunks)) + Ok(Response::full(status, headers, chunks)) }) } } @@ -79,33 +98,18 @@ impl Runtime for InProcessSpin { async fn initialize_trigger( env: &mut TestEnvironment, ) -> anyhow::Result { - use spin_trigger::{ - loader::TriggerLoader, HostComponentInitData, RuntimeConfig, TriggerExecutorBuilder, - }; - use spin_trigger_http::HttpTrigger; - - // Create the locked app and write it to a file let locked_app = spin_loader::from_file( env.path().join("spin.toml"), spin_loader::FilesMountStrategy::Direct, None, ) .await?; - let json = locked_app.to_json()?; - std::fs::write(env.path().join("locked.json"), json)?; - // Create a loader and trigger builder - let loader = TriggerLoader::new(env.path().join(".working_dir"), false); - let mut builder = TriggerExecutorBuilder::::new(loader); - builder.hooks(spin_trigger::network::Network::default()); + let app = spin_app::App::new("my-app", locked_app); + let trigger = HttpTrigger::new(&app, "127.0.0.1:80".parse().unwrap(), None)?; + let mut builder = TriggerAppBuilder::new(trigger, PathBuf::from(".")); + let trigger_app = builder.build(app, TriggerAppOptions::default()).await?; + let server = builder.trigger.into_server(trigger_app)?; - // Build the trigger - let trigger = builder - .build( - format!("file:{}", env.path().join("locked.json").display()), - RuntimeConfig::default(), - HostComponentInitData::default(), - ) - .await?; - Ok(InProcessSpin::new(trigger)) + Ok(InProcessSpin::new(server)) } From 32f6da45eb37d97c64d875f4adadedfe96736749 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 21 Aug 2024 12:52:22 +0200 Subject: [PATCH 3/3] Revert RequestHandler type refactoring Signed-off-by: Ryan Levick --- crates/trigger-http2/src/outbound_http.rs | 12 +- crates/trigger-http2/src/server.rs | 218 ++++++++---------- tests/conformance-tests/src/lib.rs | 9 +- tests/testing-framework/Cargo.toml | 6 +- .../src/runtimes/in_process_spin.rs | 5 +- 5 files changed, 104 insertions(+), 146 deletions(-) diff --git a/crates/trigger-http2/src/outbound_http.rs b/crates/trigger-http2/src/outbound_http.rs index 38219b8f7e..45e63ab751 100644 --- a/crates/trigger-http2/src/outbound_http.rs +++ b/crates/trigger-http2/src/outbound_http.rs @@ -11,17 +11,17 @@ use spin_http::routes::RouteMatch; use spin_outbound_networking::parse_service_chaining_target; use wasmtime_wasi_http::types::IncomingResponse; -use crate::server::RequestHandler; +use crate::HttpServer; /// An outbound HTTP interceptor that handles service chaining requests. pub struct OutboundHttpInterceptor { - handler: Arc, + server: Arc, origin: SelfRequestOrigin, } impl OutboundHttpInterceptor { - pub fn new(handler: Arc, origin: SelfRequestOrigin) -> Self { - Self { handler, origin } + pub fn new(server: Arc, origin: SelfRequestOrigin) -> Self { + Self { server, origin } } } @@ -41,10 +41,10 @@ impl spin_factor_outbound_http::OutboundHttpInterceptor for OutboundHttpIntercep let route_match = RouteMatch::synthetic(&component_id, uri.path()); let req = std::mem::take(request); let between_bytes_timeout = config.between_bytes_timeout; - let server = self.handler.clone(); + let server = self.server.clone(); let resp_fut = async move { match server - .handle_trigger_route(req, &route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) + .handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) .await { Ok(resp) => Ok(Ok(IncomingResponse { diff --git a/crates/trigger-http2/src/server.rs b/crates/trigger-http2/src/server.rs index acea0eb9eb..c01327fda3 100644 --- a/crates/trigger-http2/src/server.rs +++ b/crates/trigger-http2/src/server.rs @@ -38,9 +38,18 @@ use crate::{ /// An HTTP server which runs Spin apps. pub struct HttpServer { + /// The address the server is listening on. + listen_addr: SocketAddr, + /// The TLS configuration for the server. tls_config: Option, + /// Request router. router: Router, - handler: Arc, + /// The app being triggered. + trigger_app: TriggerApp, + // Component ID -> component trigger config + component_trigger_configs: HashMap, + // Component ID -> handler type + component_handler_types: HashMap, } impl HttpServer { @@ -52,27 +61,32 @@ impl HttpServer { router: Router, component_trigger_configs: HashMap, ) -> anyhow::Result { + let component_handler_types = component_trigger_configs + .keys() + .map(|component_id| { + let component = trigger_app.get_component(component_id)?; + let handler_type = HandlerType::from_component(trigger_app.engine(), component)?; + Ok((component_id.clone(), handler_type)) + }) + .collect::>()?; Ok(Self { + listen_addr, tls_config, router, - handler: Arc::new(RequestHandler::new( - listen_addr, - trigger_app, - component_trigger_configs, - )?), + trigger_app, + component_trigger_configs, + component_handler_types, }) } /// Serve incoming requests over the provided [`TcpListener`]. pub async fn serve(self: Arc) -> anyhow::Result<()> { - let listener = TcpListener::bind(self.handler.listen_addr) - .await - .with_context(|| { - format!( - "Unable to listen on {listen_addr}", - listen_addr = self.handler.listen_addr - ) - })?; + let listener = TcpListener::bind(self.listen_addr).await.with_context(|| { + format!( + "Unable to listen on {listen_addr}", + listen_addr = self.listen_addr + ) + })?; if let Some(tls_config) = self.tls_config.clone() { self.serve_https(listener, tls_config).await?; } else { @@ -112,7 +126,7 @@ impl HttpServer { /// /// This method handles well known paths and routes requests to the handler when the router /// matches the requests path. - async fn handle( + pub async fn handle( self: &Arc, mut req: Request, server_scheme: Scheme, @@ -150,15 +164,70 @@ impl HttpServer { /// Handles a successful route match. pub async fn handle_trigger_route( self: &Arc, - req: Request, + mut req: Request, route_match: RouteMatch, server_scheme: Scheme, client_addr: SocketAddr, ) -> anyhow::Result> { - let res = self - .handler - .handle_trigger_route(req, &route_match, server_scheme, client_addr) - .await; + set_req_uri(&mut req, server_scheme.clone())?; + let app_id = self + .trigger_app + .app() + .get_metadata(APP_NAME_KEY)? + .unwrap_or_else(|| "".into()); + + let component_id = route_match.component_id(); + + spin_telemetry::metrics::monotonic_counter!( + spin.request_count = 1, + trigger_type = "http", + app_id = app_id, + component_id = component_id + ); + + let mut instance_builder = self.trigger_app.prepare(component_id)?; + + // Set up outbound HTTP request origin and service chaining + let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr)?; + instance_builder + .factor_builders() + .outbound_http() + .set_request_interceptor(OutboundHttpInterceptor::new(self.clone(), origin))?; + + // Prepare HTTP executor + let trigger_config = self.component_trigger_configs.get(component_id).unwrap(); + let handler_type = self.component_handler_types.get(component_id).unwrap(); + let executor = trigger_config + .executor + .as_ref() + .unwrap_or(&HttpExecutorType::Http); + + let res = match executor { + HttpExecutorType::Http => match handler_type { + HandlerType::Spin => { + SpinHttpExecutor + .execute(instance_builder, &route_match, req, client_addr) + .await + } + HandlerType::Wasi0_2 + | HandlerType::Wasi2023_11_10 + | HandlerType::Wasi2023_10_18 => { + WasiHttpExecutor { + handler_type: *handler_type, + } + .execute(instance_builder, &route_match, req, client_addr) + .await + } + }, + HttpExecutorType::Wagi(wagi_config) => { + let executor = WagiHttpExecutor { + wagi_config: wagi_config.clone(), + }; + executor + .execute(instance_builder, &route_match, req, client_addr) + .await + } + }; match res { Ok(res) => Ok(MatchedRoute::with_response_extension( res, @@ -174,7 +243,7 @@ impl HttpServer { /// Returns spin status information. fn app_info(&self, route: String) -> anyhow::Result> { - let info = AppInfo::new(self.handler.trigger_app.app()); + let info = AppInfo::new(self.trigger_app.app()); let body = serde_json::to_vec_pretty(&info)?; Ok(MatchedRoute::with_response_extension( Response::builder() @@ -278,7 +347,7 @@ impl HttpServer { println!("Available Routes:"); for (route, component_id) in self.router.routes() { println!(" {}: {}{}", component_id, base_url, route); - if let Some(component) = self.handler.trigger_app.app().get_component(component_id) { + if let Some(component) = self.trigger_app.app().get_component(component_id) { if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? { println!(" {}", description); } @@ -288,111 +357,6 @@ impl HttpServer { } } -/// Handles a routed HTTP trigger request. -pub struct RequestHandler { - /// The address the server is listening on. - pub(crate) listen_addr: SocketAddr, - /// The app being triggered. - trigger_app: TriggerApp, - // Component ID -> component trigger config - component_trigger_configs: HashMap, - // Component ID -> handler type - component_handler_types: HashMap, -} - -impl RequestHandler { - /// Create a new [`RequestHandler`] - pub fn new( - listen_addr: SocketAddr, - trigger_app: TriggerApp, - component_trigger_configs: HashMap, - ) -> anyhow::Result { - let component_handler_types = component_trigger_configs - .keys() - .map(|component_id| { - let component = trigger_app.get_component(component_id)?; - let handler_type = HandlerType::from_component(trigger_app.engine(), component)?; - Ok((component_id.clone(), handler_type)) - }) - .collect::>()?; - Ok(Self { - listen_addr, - trigger_app, - component_trigger_configs, - component_handler_types, - }) - } - - /// Handle a routed request. - pub async fn handle_trigger_route( - self: &Arc, - mut req: Request, - route_match: &RouteMatch, - server_scheme: Scheme, - client_addr: SocketAddr, - ) -> anyhow::Result> { - set_req_uri(&mut req, server_scheme.clone())?; - let app_id = self - .trigger_app - .app() - .get_metadata(APP_NAME_KEY)? - .unwrap_or_else(|| "".into()); - - let component_id = route_match.component_id(); - - spin_telemetry::metrics::monotonic_counter!( - spin.request_count = 1, - trigger_type = "http", - app_id = app_id, - component_id = component_id - ); - - let mut instance_builder = self.trigger_app.prepare(component_id)?; - - // Set up outbound HTTP request origin and service chaining - let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr)?; - instance_builder - .factor_builders() - .outbound_http() - .set_request_interceptor(OutboundHttpInterceptor::new(self.clone(), origin))?; - - // Prepare HTTP executor - let trigger_config = self.component_trigger_configs.get(component_id).unwrap(); - let handler_type = self.component_handler_types.get(component_id).unwrap(); - let executor = trigger_config - .executor - .as_ref() - .unwrap_or(&HttpExecutorType::Http); - - match executor { - HttpExecutorType::Http => match handler_type { - HandlerType::Spin => { - SpinHttpExecutor - .execute(instance_builder, route_match, req, client_addr) - .await - } - HandlerType::Wasi0_2 - | HandlerType::Wasi2023_11_10 - | HandlerType::Wasi2023_10_18 => { - WasiHttpExecutor { - handler_type: *handler_type, - } - .execute(instance_builder, route_match, req, client_addr) - .await - } - }, - HttpExecutorType::Wagi(wagi_config) => { - let executor = WagiHttpExecutor { - wagi_config: wagi_config.clone(), - }; - executor - .execute(instance_builder, route_match, req, client_addr) - .await - } - } - } -} - /// The incoming request's scheme and authority /// /// The incoming request's URI is relative to the server, so we need to set the scheme and authority. diff --git a/tests/conformance-tests/src/lib.rs b/tests/conformance-tests/src/lib.rs index bed1488a42..a030848196 100644 --- a/tests/conformance-tests/src/lib.rs +++ b/tests/conformance-tests/src/lib.rs @@ -1,5 +1,4 @@ use anyhow::Context as _; -use test_environment::http::Request; use testing_framework::runtimes::spin_cli::{SpinCli, SpinConfig}; /// Run a single conformance test against the supplied spin binary. @@ -57,11 +56,9 @@ pub fn run_test( let conformance_tests::config::Invocation::Http(mut invocation) = invocation; invocation.request.substitute_from_env(&mut env)?; let spin = env.runtime_mut(); - let actual = invocation.request.send(|request| { - let request = - Request::full(request.method, request.path, request.headers, request.body); - spin.make_http_request(request) - })?; + let actual = invocation + .request + .send(|request| spin.make_http_request(request))?; conformance_tests::assertions::assert_response(&invocation.response, &actual) .with_context(|| { diff --git a/tests/testing-framework/Cargo.toml b/tests/testing-framework/Cargo.toml index cc388246f3..2118f93b33 100644 --- a/tests/testing-framework/Cargo.toml +++ b/tests/testing-framework/Cargo.toml @@ -14,12 +14,12 @@ regex = "1.10.2" reqwest = { workspace = true } temp-dir = "0.1.11" test-environment = { workspace = true } -spin-factors-executor = { path = "../../crates/factors-executor" } spin-app = { path = "../../crates/app" } -spin-trigger-http2 = { path = "../../crates/trigger-http2" } +spin-factors-executor = { path = "../../crates/factors-executor" } spin-http = { path = "../../crates/http" } -spin-trigger2 = { path = "../../crates/trigger2" } spin-loader = { path = "../../crates/loader" } +spin-trigger2 = { path = "../../crates/trigger2" } +spin-trigger-http2 = { path = "../../crates/trigger-http2" } toml = "0.8.6" tokio = "1.23" wasmtime-wasi-http = { workspace = true } diff --git a/tests/testing-framework/src/runtimes/in_process_spin.rs b/tests/testing-framework/src/runtimes/in_process_spin.rs index 16ad654af9..7580842a25 100644 --- a/tests/testing-framework/src/runtimes/in_process_spin.rs +++ b/tests/testing-framework/src/runtimes/in_process_spin.rs @@ -3,7 +3,6 @@ use std::{path::PathBuf, sync::Arc}; use anyhow::Context as _; -use spin_http::routes::RouteMatch; use spin_trigger2::cli::{TriggerAppBuilder, TriggerAppOptions}; use spin_trigger_http2::{HttpServer, HttpTrigger}; use test_environment::{ @@ -54,12 +53,10 @@ impl InProcessSpin { } // TODO(rylev): convert body as well let req = builder.body(spin_http::body::empty()).unwrap(); - let route_match = RouteMatch::synthetic("test", "/"); let response = self .server - .handle_trigger_route( + .handle( req, - route_match, http::uri::Scheme::HTTP, (std::net::Ipv4Addr::LOCALHOST, 7000).into(), )