Skip to content

Commit

Permalink
Merge pull request #2817 from lann/intercept-spin
Browse files Browse the repository at this point in the history
Support interceptor in spin outbound http
  • Loading branch information
lann committed Sep 11, 2024
2 parents 880f4b8 + 24ab63b commit c0bfb61
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 75 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/factor-outbound-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ http = "1.1.0"
http-body-util = "0.1"
hyper = "1.4.1"
ip_network = "0.4"
reqwest = { version = "0.11", features = ["gzip"] }
reqwest = { version = "0.12", features = ["gzip"] }
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
spin-factors = { path = "../factors" }
Expand Down
107 changes: 107 additions & 0 deletions crates/factor-outbound-http/src/intercept.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use http::{Request, Response};
use http_body_util::{BodyExt, Full};
use spin_world::async_trait;
use wasmtime_wasi_http::{body::HyperOutgoingBody, HttpResult};

pub type HyperBody = HyperOutgoingBody;

/// An outbound HTTP request interceptor to be used with
/// [`InstanceState::set_request_interceptor`].
#[async_trait]
pub trait OutboundHttpInterceptor: Send + Sync {
/// Intercept an outgoing HTTP request.
///
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
/// updated) request will be passed on to the default outgoing request
/// handler.
///
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
/// result will be returned as the result of the request, bypassing the
/// default handler. The `request` will also be dropped immediately.
async fn intercept(&self, request: InterceptRequest) -> HttpResult<InterceptOutcome>;
}

/// The type returned by an [`OutboundHttpInterceptor`].
pub enum InterceptOutcome {
/// The intercepted request will be passed on to the default outgoing
/// request handler.
Continue(InterceptRequest),
/// The given response will be returned as the result of the intercepted
/// request, bypassing the default handler.
Complete(Response<HyperBody>),
}

/// An intercepted outgoing HTTP request.
///
/// This is a wrapper that implements `DerefMut<Target = Request<()>>` for
/// inspection and modification of the request envelope. If the body needs to be
/// consumed, call [`Self::into_hyper_request`].
pub struct InterceptRequest {
inner: Request<()>,
body: InterceptBody,
}

enum InterceptBody {
Hyper(HyperBody),
Vec(Vec<u8>),
}

impl InterceptRequest {
pub fn into_hyper_request(self) -> Request<HyperBody> {
let (parts, ()) = self.inner.into_parts();
Request::from_parts(parts, self.body.into())
}

pub(crate) fn into_vec_request(self) -> Option<Request<Vec<u8>>> {
let InterceptBody::Vec(bytes) = self.body else {
return None;
};
let (parts, ()) = self.inner.into_parts();
Some(Request::from_parts(parts, bytes))
}
}

impl std::ops::Deref for InterceptRequest {
type Target = Request<()>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl std::ops::DerefMut for InterceptRequest {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl From<Request<HyperBody>> for InterceptRequest {
fn from(req: Request<HyperBody>) -> Self {
let (parts, body) = req.into_parts();
Self {
inner: Request::from_parts(parts, ()),
body: InterceptBody::Hyper(body),
}
}
}

impl From<Request<Vec<u8>>> for InterceptRequest {
fn from(req: Request<Vec<u8>>) -> Self {
let (parts, body) = req.into_parts();
Self {
inner: Request::from_parts(parts, ()),
body: InterceptBody::Vec(body),
}
}
}

impl From<InterceptBody> for HyperBody {
fn from(body: InterceptBody) -> Self {
match body {
InterceptBody::Hyper(body) => body,
InterceptBody::Vec(bytes) => {
Full::new(bytes.into()).map_err(|err| match err {}).boxed()
}
}
}
}
29 changes: 2 additions & 27 deletions crates/factor-outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod intercept;
mod spin;
mod wasi;
pub mod wasi_2023_10_18;
Expand All @@ -10,13 +11,13 @@ use http::{
uri::{Authority, Parts, PathAndQuery, Scheme},
HeaderValue, Uri,
};
use intercept::OutboundHttpInterceptor;
use spin_factor_outbound_networking::{
ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor,
};
use spin_factors::{
anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
};
use spin_world::async_trait;
use wasmtime_wasi_http::WasiHttpCtx;

pub use wasmtime_wasi_http::{
Expand Down Expand Up @@ -176,29 +177,3 @@ impl std::fmt::Display for SelfRequestOrigin {
write!(f, "{}://{}", self.scheme, self.authority)
}
}

/// An outbound HTTP request interceptor to be used with
/// [`InstanceState::set_request_interceptor`].
#[async_trait]
pub trait OutboundHttpInterceptor: Send + Sync {
/// Intercept an outgoing HTTP request.
///
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
/// updated) request will be passed on to the default outgoing request
/// handler.
///
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
/// result will be returned as the result of the request, bypassing the
/// default handler. The `request` will also be dropped immediately.
async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome>;
}

/// The type returned by an [`OutboundHttpInterceptor`].
pub enum InterceptOutcome {
/// The intercepted request will be passed on to the default outgoing
/// request handler.
Continue,
/// The given response will be returned as the result of the intercepted
/// request, bypassing the default handler.
Complete(Response),
}
116 changes: 86 additions & 30 deletions crates/factor-outbound-http/src/spin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use http_body_util::BodyExt;
use spin_world::{
async_trait,
v1::{
Expand All @@ -7,6 +8,8 @@ use spin_world::{
};
use tracing::{field::Empty, instrument, Level, Span};

use crate::intercept::InterceptOutcome;

#[async_trait]
impl spin_http::Host for crate::InstanceState {
#[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
Expand All @@ -19,7 +22,11 @@ impl spin_http::Host for crate::InstanceState {
let uri = req.uri;
tracing::trace!("Sending outbound HTTP to {uri:?}");

let abs_url = if !uri.starts_with('/') {
if !req.params.is_empty() {
tracing::warn!("HTTP params field is deprecated");
}

let req_url = if !uri.starts_with('/') {
// Absolute URI
let is_allowed = self
.allowed_hosts
Expand All @@ -29,7 +36,7 @@ impl spin_http::Host for crate::InstanceState {
if !is_allowed {
return Err(HttpError::DestinationNotAllowed);
}
uri
uri.parse().map_err(|_| HttpError::InvalidUrl)?
} else {
// Relative URI ("self" request)
let is_allowed = self
Expand All @@ -47,36 +54,51 @@ impl spin_http::Host for crate::InstanceState {
);
return Err(HttpError::InvalidUrl);
};
format!("{origin}{uri}")
let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
origin.clone().into_uri(Some(path_and_query))
};
let req_url = reqwest::Url::parse(&abs_url).map_err(|_| HttpError::InvalidUrl)?;

if !req.params.is_empty() {
tracing::warn!("HTTP params field is deprecated");
}

// Allow reuse of Client's internal connection pool for multiple requests
// in a single component execution
let client = self.spin_http_client.get_or_insert_with(Default::default);

// Build an http::Request for OutboundHttpInterceptor
let mut req = {
let mut builder = client.request(reqwest_method(req.method), req_url);
let mut builder = http::Request::builder()
.method(hyper_method(req.method))
.uri(&req_url);
for (key, val) in req.headers {
builder = builder.header(key, val);
}
builder
.body(req.body.unwrap_or_default())
.build()
.map_err(|err| {
tracing::error!("Error building outbound request: {err}");
HttpError::RuntimeError
})?
};
builder.body(req.body.unwrap_or_default())
}
.map_err(|err| {
tracing::error!("Error building outbound request: {err}");
HttpError::RuntimeError
})?;

spin_telemetry::inject_trace_context(req.headers_mut());

if let Some(interceptor) = &self.request_interceptor {
let intercepted_request = std::mem::take(&mut req).into();
match interceptor.intercept(intercepted_request).await {
Ok(InterceptOutcome::Continue(intercepted_request)) => {
req = intercepted_request.into_vec_request().unwrap();
}
Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
Err(err) => {
tracing::error!("Error in outbound HTTP interceptor: {err}");
return Err(HttpError::RuntimeError);
}
}
}

// Convert http::Request to reqwest::Request
let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;

// Allow reuse of Client's internal connection pool for multiple requests
// in a single component execution
let client = self.spin_http_client.get_or_insert_with(Default::default);

let resp = client.execute(req).await.map_err(log_reqwest_error)?;

tracing::trace!("Returning response from outbound request to {abs_url}");
tracing::trace!("Returning response from outbound request to {req_url}");
span.record("http.response.status_code", resp.status().as_u16());
response_from_reqwest(resp).await
}
Expand Down Expand Up @@ -111,18 +133,52 @@ fn record_request_fields(span: &Span, req: &Request) {
}
}

fn reqwest_method(m: Method) -> reqwest::Method {
fn hyper_method(m: Method) -> http::Method {
match m {
Method::Get => reqwest::Method::GET,
Method::Post => reqwest::Method::POST,
Method::Put => reqwest::Method::PUT,
Method::Delete => reqwest::Method::DELETE,
Method::Patch => reqwest::Method::PATCH,
Method::Head => reqwest::Method::HEAD,
Method::Options => reqwest::Method::OPTIONS,
Method::Get => http::Method::GET,
Method::Post => http::Method::POST,
Method::Put => http::Method::PUT,
Method::Delete => http::Method::DELETE,
Method::Patch => http::Method::PATCH,
Method::Head => http::Method::HEAD,
Method::Options => http::Method::OPTIONS,
}
}

async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
let status = resp.status().as_u16();

let headers = resp
.headers()
.into_iter()
.map(|(key, val)| {
Ok((
key.to_string(),
val.to_str()
.map_err(|_| {
tracing::error!("Non-ascii response header {key} = {val:?}");
HttpError::RuntimeError
})?
.to_string(),
))
})
.collect::<Result<Vec<_>, _>>()?;

let body = resp
.body_mut()
.collect()
.await
.map_err(|_| HttpError::RuntimeError)?
.to_bytes()
.to_vec();

Ok(Response {
status,
headers: Some(headers),
body: Some(body),
})
}

fn log_reqwest_error(err: reqwest::Error) -> HttpError {
let error_desc = if err.is_timeout() {
"timeout error"
Expand Down
11 changes: 7 additions & 4 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use wasmtime_wasi_http::{
};

use crate::{
wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor,
OutboundHttpInterceptor, SelfRequestOrigin,
intercept::{InterceptOutcome, OutboundHttpInterceptor},
wasi_2023_10_18, wasi_2023_11_10, InstanceState, OutboundHttpFactor, SelfRequestOrigin,
};

pub(crate) fn add_to_linker<T: Send + 'static>(
Expand Down Expand Up @@ -133,8 +133,11 @@ async fn send_request_impl(
spin_telemetry::inject_trace_context(&mut request);

if let Some(interceptor) = request_interceptor {
match interceptor.intercept(&mut request).await? {
InterceptOutcome::Continue => (),
let intercept_request = std::mem::take(&mut request).into();
match interceptor.intercept(intercept_request).await? {
InterceptOutcome::Continue(req) => {
request = req.into_hyper_request();
}
InterceptOutcome::Complete(resp) => {
let resp = IncomingResponse {
resp,
Expand Down
2 changes: 0 additions & 2 deletions crates/factors-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ impl<T: RuntimeFactors, U: Send + 'static> FactorsExecutorApp<T, U> {
.with_context(|| format!("no such component {component_id:?}"))?;
Ok(instance_pre.component())
}
}

impl<T: RuntimeFactors, U: Send + 'static> FactorsExecutorApp<T, U> {
/// Returns an instance builder for the given component ID.
pub fn prepare(&self, component_id: &str) -> anyhow::Result<FactorsInstanceBuilder<T, U>> {
let app_component = self
Expand Down
Loading

0 comments on commit c0bfb61

Please sign in to comment.