From 24ab63b5c20b233769e40cfb86e0905e7aed6c0f Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Mon, 9 Sep 2024 14:07:41 -0400 Subject: [PATCH] Support interceptor in spin outbound http Signed-off-by: Lann Martin --- Cargo.lock | 3 +- crates/factor-outbound-http/Cargo.toml | 2 +- crates/factor-outbound-http/src/intercept.rs | 107 +++++++++++++++++ crates/factor-outbound-http/src/lib.rs | 29 +---- crates/factor-outbound-http/src/spin.rs | 116 ++++++++++++++----- crates/factor-outbound-http/src/wasi.rs | 11 +- crates/factors-executor/src/lib.rs | 2 - crates/trigger-http/src/outbound_http.rs | 12 +- examples/spin-timer/Cargo.lock | 49 +++++++- 9 files changed, 256 insertions(+), 75 deletions(-) create mode 100644 crates/factor-outbound-http/src/intercept.rs diff --git a/Cargo.lock b/Cargo.lock index 42be97e2f..6deab9185 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6132,6 +6132,7 @@ version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", @@ -7314,7 +7315,7 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "ip_network", - "reqwest 0.11.27", + "reqwest 0.12.7", "rustls 0.23.7", "spin-factor-outbound-networking", "spin-factor-variables", diff --git a/crates/factor-outbound-http/Cargo.toml b/crates/factor-outbound-http/Cargo.toml index f65c39eb6..60ad1f2ab 100644 --- a/crates/factor-outbound-http/Cargo.toml +++ b/crates/factor-outbound-http/Cargo.toml @@ -10,7 +10,7 @@ http = "1.1.0" http-body-util = "0.1" hyper = "1.4.1" ip_network = "0.4" -reqwest = { version = "0.11", features = ["gzip"] } +reqwest = { version = "0.12", features = ["gzip"] } rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } diff --git a/crates/factor-outbound-http/src/intercept.rs b/crates/factor-outbound-http/src/intercept.rs new file mode 100644 index 000000000..3b583c076 --- /dev/null +++ b/crates/factor-outbound-http/src/intercept.rs @@ -0,0 +1,107 @@ +use http::{Request, Response}; +use http_body_util::{BodyExt, Full}; +use spin_world::async_trait; +use wasmtime_wasi_http::{body::HyperOutgoingBody, HttpResult}; + +pub type HyperBody = HyperOutgoingBody; + +/// An outbound HTTP request interceptor to be used with +/// [`InstanceState::set_request_interceptor`]. +#[async_trait] +pub trait OutboundHttpInterceptor: Send + Sync { + /// Intercept an outgoing HTTP request. + /// + /// If this method returns [`InterceptedResponse::Continue`], the (possibly + /// updated) request will be passed on to the default outgoing request + /// handler. + /// + /// If this method returns [`InterceptedResponse::Intercepted`], the inner + /// result will be returned as the result of the request, bypassing the + /// default handler. The `request` will also be dropped immediately. + async fn intercept(&self, request: InterceptRequest) -> HttpResult; +} + +/// The type returned by an [`OutboundHttpInterceptor`]. +pub enum InterceptOutcome { + /// The intercepted request will be passed on to the default outgoing + /// request handler. + Continue(InterceptRequest), + /// The given response will be returned as the result of the intercepted + /// request, bypassing the default handler. + Complete(Response), +} + +/// An intercepted outgoing HTTP request. +/// +/// This is a wrapper that implements `DerefMut>` for +/// inspection and modification of the request envelope. If the body needs to be +/// consumed, call [`Self::into_hyper_request`]. +pub struct InterceptRequest { + inner: Request<()>, + body: InterceptBody, +} + +enum InterceptBody { + Hyper(HyperBody), + Vec(Vec), +} + +impl InterceptRequest { + pub fn into_hyper_request(self) -> Request { + let (parts, ()) = self.inner.into_parts(); + Request::from_parts(parts, self.body.into()) + } + + pub(crate) fn into_vec_request(self) -> Option>> { + let InterceptBody::Vec(bytes) = self.body else { + return None; + }; + let (parts, ()) = self.inner.into_parts(); + Some(Request::from_parts(parts, bytes)) + } +} + +impl std::ops::Deref for InterceptRequest { + type Target = Request<()>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for InterceptRequest { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl From> for InterceptRequest { + fn from(req: Request) -> Self { + let (parts, body) = req.into_parts(); + Self { + inner: Request::from_parts(parts, ()), + body: InterceptBody::Hyper(body), + } + } +} + +impl From>> for InterceptRequest { + fn from(req: Request>) -> Self { + let (parts, body) = req.into_parts(); + Self { + inner: Request::from_parts(parts, ()), + body: InterceptBody::Vec(body), + } + } +} + +impl From for HyperBody { + fn from(body: InterceptBody) -> Self { + match body { + InterceptBody::Hyper(body) => body, + InterceptBody::Vec(bytes) => { + Full::new(bytes.into()).map_err(|err| match err {}).boxed() + } + } + } +} diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index 0d576f044..3feb70508 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -1,3 +1,4 @@ +pub mod intercept; mod spin; mod wasi; pub mod wasi_2023_10_18; @@ -10,13 +11,13 @@ use http::{ uri::{Authority, Parts, PathAndQuery, Scheme}, HeaderValue, Uri, }; +use intercept::OutboundHttpInterceptor; use spin_factor_outbound_networking::{ ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor, }; use spin_factors::{ anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, }; -use spin_world::async_trait; use wasmtime_wasi_http::WasiHttpCtx; pub use wasmtime_wasi_http::{ @@ -176,29 +177,3 @@ impl std::fmt::Display for SelfRequestOrigin { write!(f, "{}://{}", self.scheme, self.authority) } } - -/// An outbound HTTP request interceptor to be used with -/// [`InstanceState::set_request_interceptor`]. -#[async_trait] -pub trait OutboundHttpInterceptor: Send + Sync { - /// Intercept an outgoing HTTP request. - /// - /// If this method returns [`InterceptedResponse::Continue`], the (possibly - /// updated) request will be passed on to the default outgoing request - /// handler. - /// - /// If this method returns [`InterceptedResponse::Intercepted`], the inner - /// result will be returned as the result of the request, bypassing the - /// default handler. The `request` will also be dropped immediately. - async fn intercept(&self, request: &mut Request) -> HttpResult; -} - -/// The type returned by an [`OutboundHttpInterceptor`]. -pub enum InterceptOutcome { - /// The intercepted request will be passed on to the default outgoing - /// request handler. - Continue, - /// The given response will be returned as the result of the intercepted - /// request, bypassing the default handler. - Complete(Response), -} diff --git a/crates/factor-outbound-http/src/spin.rs b/crates/factor-outbound-http/src/spin.rs index 633df727d..f4d2e71b6 100644 --- a/crates/factor-outbound-http/src/spin.rs +++ b/crates/factor-outbound-http/src/spin.rs @@ -1,3 +1,4 @@ +use http_body_util::BodyExt; use spin_world::{ async_trait, v1::{ @@ -7,6 +8,8 @@ use spin_world::{ }; use tracing::{field::Empty, instrument, Level, Span}; +use crate::intercept::InterceptOutcome; + #[async_trait] impl spin_http::Host for crate::InstanceState { #[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO), @@ -19,7 +22,11 @@ impl spin_http::Host for crate::InstanceState { let uri = req.uri; tracing::trace!("Sending outbound HTTP to {uri:?}"); - let abs_url = if !uri.starts_with('/') { + if !req.params.is_empty() { + tracing::warn!("HTTP params field is deprecated"); + } + + let req_url = if !uri.starts_with('/') { // Absolute URI let is_allowed = self .allowed_hosts @@ -29,7 +36,7 @@ impl spin_http::Host for crate::InstanceState { if !is_allowed { return Err(HttpError::DestinationNotAllowed); } - uri + uri.parse().map_err(|_| HttpError::InvalidUrl)? } else { // Relative URI ("self" request) let is_allowed = self @@ -47,36 +54,51 @@ impl spin_http::Host for crate::InstanceState { ); return Err(HttpError::InvalidUrl); }; - format!("{origin}{uri}") + let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?; + origin.clone().into_uri(Some(path_and_query)) }; - let req_url = reqwest::Url::parse(&abs_url).map_err(|_| HttpError::InvalidUrl)?; - - if !req.params.is_empty() { - tracing::warn!("HTTP params field is deprecated"); - } - - // Allow reuse of Client's internal connection pool for multiple requests - // in a single component execution - let client = self.spin_http_client.get_or_insert_with(Default::default); + // Build an http::Request for OutboundHttpInterceptor let mut req = { - let mut builder = client.request(reqwest_method(req.method), req_url); + let mut builder = http::Request::builder() + .method(hyper_method(req.method)) + .uri(&req_url); for (key, val) in req.headers { builder = builder.header(key, val); } - builder - .body(req.body.unwrap_or_default()) - .build() - .map_err(|err| { - tracing::error!("Error building outbound request: {err}"); - HttpError::RuntimeError - })? - }; + builder.body(req.body.unwrap_or_default()) + } + .map_err(|err| { + tracing::error!("Error building outbound request: {err}"); + HttpError::RuntimeError + })?; + spin_telemetry::inject_trace_context(req.headers_mut()); + if let Some(interceptor) = &self.request_interceptor { + let intercepted_request = std::mem::take(&mut req).into(); + match interceptor.intercept(intercepted_request).await { + Ok(InterceptOutcome::Continue(intercepted_request)) => { + req = intercepted_request.into_vec_request().unwrap(); + } + Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await, + Err(err) => { + tracing::error!("Error in outbound HTTP interceptor: {err}"); + return Err(HttpError::RuntimeError); + } + } + } + + // Convert http::Request to reqwest::Request + let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?; + + // Allow reuse of Client's internal connection pool for multiple requests + // in a single component execution + let client = self.spin_http_client.get_or_insert_with(Default::default); + let resp = client.execute(req).await.map_err(log_reqwest_error)?; - tracing::trace!("Returning response from outbound request to {abs_url}"); + tracing::trace!("Returning response from outbound request to {req_url}"); span.record("http.response.status_code", resp.status().as_u16()); response_from_reqwest(resp).await } @@ -111,18 +133,52 @@ fn record_request_fields(span: &Span, req: &Request) { } } -fn reqwest_method(m: Method) -> reqwest::Method { +fn hyper_method(m: Method) -> http::Method { match m { - Method::Get => reqwest::Method::GET, - Method::Post => reqwest::Method::POST, - Method::Put => reqwest::Method::PUT, - Method::Delete => reqwest::Method::DELETE, - Method::Patch => reqwest::Method::PATCH, - Method::Head => reqwest::Method::HEAD, - Method::Options => reqwest::Method::OPTIONS, + Method::Get => http::Method::GET, + Method::Post => http::Method::POST, + Method::Put => http::Method::PUT, + Method::Delete => http::Method::DELETE, + Method::Patch => http::Method::PATCH, + Method::Head => http::Method::HEAD, + Method::Options => http::Method::OPTIONS, } } +async fn response_from_hyper(mut resp: crate::Response) -> Result { + let status = resp.status().as_u16(); + + let headers = resp + .headers() + .into_iter() + .map(|(key, val)| { + Ok(( + key.to_string(), + val.to_str() + .map_err(|_| { + tracing::error!("Non-ascii response header {key} = {val:?}"); + HttpError::RuntimeError + })? + .to_string(), + )) + }) + .collect::, _>>()?; + + let body = resp + .body_mut() + .collect() + .await + .map_err(|_| HttpError::RuntimeError)? + .to_bytes() + .to_vec(); + + Ok(Response { + status, + headers: Some(headers), + body: Some(body), + }) +} + fn log_reqwest_error(err: reqwest::Error) -> HttpError { let error_desc = if err.is_timeout() { "timeout error" diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 67fabc4a1..a8602498f 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -18,8 +18,8 @@ use wasmtime_wasi_http::{ }; use crate::{ - wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor, - OutboundHttpInterceptor, SelfRequestOrigin, + intercept::{InterceptOutcome, OutboundHttpInterceptor}, + wasi_2023_10_18, wasi_2023_11_10, InstanceState, OutboundHttpFactor, SelfRequestOrigin, }; pub(crate) fn add_to_linker( @@ -133,8 +133,11 @@ async fn send_request_impl( spin_telemetry::inject_trace_context(&mut request); if let Some(interceptor) = request_interceptor { - match interceptor.intercept(&mut request).await? { - InterceptOutcome::Continue => (), + let intercept_request = std::mem::take(&mut request).into(); + match interceptor.intercept(intercept_request).await? { + InterceptOutcome::Continue(req) => { + request = req.into_hyper_request(); + } InterceptOutcome::Complete(resp) => { let resp = IncomingResponse { resp, diff --git a/crates/factors-executor/src/lib.rs b/crates/factors-executor/src/lib.rs index d7769822b..eb0b8fd6d 100644 --- a/crates/factors-executor/src/lib.rs +++ b/crates/factors-executor/src/lib.rs @@ -145,9 +145,7 @@ impl FactorsExecutorApp { .with_context(|| format!("no such component {component_id:?}"))?; Ok(instance_pre.component()) } -} -impl FactorsExecutorApp { /// Returns an instance builder for the given component ID. pub fn prepare(&self, component_id: &str) -> anyhow::Result> { let app_component = self diff --git a/crates/trigger-http/src/outbound_http.rs b/crates/trigger-http/src/outbound_http.rs index f5cc556d2..62b2933d7 100644 --- a/crates/trigger-http/src/outbound_http.rs +++ b/crates/trigger-http/src/outbound_http.rs @@ -5,7 +5,7 @@ use std::{ use http::uri::Scheme; use spin_core::async_trait; -use spin_factor_outbound_http::{InterceptOutcome, Request}; +use spin_factor_outbound_http::intercept::{self, InterceptOutcome, InterceptRequest}; use spin_factor_outbound_networking::parse_service_chaining_target; use spin_factors::RuntimeFactors; use spin_http::routes::RouteMatch; @@ -27,13 +27,11 @@ impl OutboundHttpInterceptor { const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); #[async_trait] -impl spin_factor_outbound_http::OutboundHttpInterceptor - for OutboundHttpInterceptor -{ - async fn intercept(&self, request: &mut Request) -> HttpResult { +impl intercept::OutboundHttpInterceptor for OutboundHttpInterceptor { + async fn intercept(&self, request: InterceptRequest) -> HttpResult { // Handle service chaining requests if let Some(component_id) = parse_service_chaining_target(request.uri()) { - let req = std::mem::take(request); + let req = request.into_hyper_request(); let route_match = RouteMatch::synthetic(&component_id, req.uri().path()); let resp = self .server @@ -42,7 +40,7 @@ impl spin_factor_outbound_http::OutboundHttpInterceptor .map_err(HttpError::trap)?; Ok(InterceptOutcome::Complete(resp)) } else { - Ok(InterceptOutcome::Continue) + Ok(InterceptOutcome::Continue(request)) } } } diff --git a/examples/spin-timer/Cargo.lock b/examples/spin-timer/Cargo.lock index 3f71b9d0e..25ac6d320 100644 --- a/examples/spin-timer/Cargo.lock +++ b/examples/spin-timer/Cargo.lock @@ -1745,6 +1745,23 @@ dependencies = [ "webpki-roots 0.26.3", ] +[[package]] +name = "hyper-rustls" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.12", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tower-service", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -3142,7 +3159,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration", + "system-configuration 0.5.1", "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", @@ -3162,14 +3179,18 @@ version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", "hyper 1.4.1", + "hyper-rustls 0.27.2", "hyper-tls 0.6.0", "hyper-util", "ipnet", @@ -3185,6 +3206,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", + "system-configuration 0.6.0", "tokio", "tokio-native-tls", "tokio-util", @@ -3820,7 +3842,7 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "ip_network", - "reqwest 0.11.27", + "reqwest 0.12.7", "rustls 0.23.12", "spin-factor-outbound-networking", "spin-factors", @@ -4355,7 +4377,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" +dependencies = [ + "bitflags 2.4.2", + "core-foundation", + "system-configuration-sys 0.6.0", ] [[package]] @@ -4368,6 +4401,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "system-interface" version = "0.27.2"