From d0e25a4bbbc59bba16eeeeafe1bb3ce95becd7f6 Mon Sep 17 00:00:00 2001 From: tottoto Date: Thu, 14 Dec 2023 00:19:03 +0900 Subject: [PATCH] refactor(common): move common::watch module to http1 feature --- Cargo.toml | 8 ++++---- src/body/incoming.rs | 17 ++++++++++++++--- src/common/mod.rs | 5 +++++ src/common/watch.rs | 1 - src/error.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 03acfb806a..954c5c692b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,6 @@ include = [ [dependencies] bytes = "1" -futures-channel = "0.3" -futures-util = { version = "0.3", default-features = false } http = "1" http-body = "1" pin-project-lite = "0.2.4" @@ -30,6 +28,8 @@ tokio = { version = "1", features = ["sync"] } # Optional +futures-channel = { version = "0.3", optional = true } +futures-util = { version = "0.3", default-features = false, optional = true } h2 = { version = "0.4", optional = true } http-body-util = { version = "0.1", optional = true } httparse = { version = "1.8", optional = true } @@ -74,8 +74,8 @@ full = [ ] # HTTP versions -http1 = ["dep:httparse", "dep:itoa"] -http2 = ["dep:h2"] +http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"] +http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"] # Client/Server client = ["dep:want"] diff --git a/src/body/incoming.rs b/src/body/incoming.rs index cb7b591b60..bc9355eaa5 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -5,10 +5,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use futures_channel::mpsc; -use futures_channel::oneshot; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] +use futures_channel::{mpsc, oneshot}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use http::HeaderMap; use http_body::{Body, Frame, SizeHint}; @@ -17,11 +18,14 @@ use http_body::{Body, Frame, SizeHint}; any(feature = "client", feature = "server") ))] use super::DecodedLength; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use crate::common::watch; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] type BodySender = mpsc::Sender>; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] type TrailersSender = oneshot::Sender; /// A stream of `Bytes`, used when receiving bodies from the network. @@ -78,13 +82,16 @@ enum Kind { /// [`Body::channel()`]: struct.Body.html#method.channel /// [`Sender::abort()`]: struct.Sender.html#method.abort #[must_use = "Sender does nothing unless sent on"] +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) struct Sender { want_rx: watch::Receiver, data_tx: BodySender, trailers_tx: Option, } +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] const WANT_PENDING: usize = 1; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] const WANT_READY: usize = 2; impl Incoming { @@ -331,6 +338,7 @@ impl fmt::Debug for Incoming { } } +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] impl Sender { /// Check to see if this `Sender` can send more data. pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -350,11 +358,13 @@ impl Sender { } } + #[cfg(test)] async fn ready(&mut self) -> crate::Result<()> { futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Send data on data channel when it is ready. + #[cfg(test)] #[allow(unused)] pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { self.ready().await?; @@ -392,7 +402,7 @@ impl Sender { .map_err(|err| err.into_inner().expect("just sent Ok")) } - #[allow(unused)] + #[cfg(test)] pub(crate) fn abort(mut self) { self.send_error(crate::Error::new_body_write_aborted()); } @@ -406,6 +416,7 @@ impl Sender { } } +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[derive(Debug)] diff --git a/src/common/mod.rs b/src/common/mod.rs index 4129673cdc..57b66e1f9f 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,3 +1,7 @@ +#[cfg(all( + any(feature = "client", feature = "server"), + any(feature = "http1", feature = "http2") +))] macro_rules! ready { ($e:expr) => { match $e { @@ -18,4 +22,5 @@ pub(crate) mod task; all(any(feature = "client", feature = "server"), feature = "http2"), ))] pub(crate) mod time; +#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] pub(crate) mod watch; diff --git a/src/common/watch.rs b/src/common/watch.rs index 3f9b2b651d..ba17d551cb 100644 --- a/src/common/watch.rs +++ b/src/common/watch.rs @@ -15,7 +15,6 @@ type Value = usize; pub(crate) const CLOSED: usize = 0; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { debug_assert!( initial != CLOSED, diff --git a/src/error.rs b/src/error.rs index a9a4e357a8..ae5bff3a4e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,6 +50,10 @@ pub(super) enum Kind { /// A pending item was dropped before ever being processed. Canceled, /// Indicates a channel (client or body sender) is closed. + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + all(feature = "http2", feature = "client") + ))] ChannelClosed, /// An `io::Error` that occurred while trying to read or write to a network stream. #[cfg(all( @@ -121,6 +125,10 @@ pub(super) enum User { ))] Body, /// The user aborted writing of the outgoing body. + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + feature = "ffi" + ))] BodyWriteAborted, /// Error from future of user's Service. #[cfg(any( @@ -192,6 +200,16 @@ impl Error { /// Returns true if a sender's channel is closed. pub fn is_closed(&self) -> bool { + #[cfg(not(any( + all(feature = "http1", any(feature = "client", feature = "server")), + all(feature = "http2", feature = "client") + )))] + return false; + + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + all(feature = "http2", feature = "client") + ))] matches!(self.inner.kind, Kind::ChannelClosed) } @@ -202,6 +220,16 @@ impl Error { /// Returns true if the body write was aborted. pub fn is_body_write_aborted(&self) -> bool { + #[cfg(not(any( + all(feature = "http1", any(feature = "client", feature = "server")), + feature = "ffi" + )))] + return false; + + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + feature = "ffi" + ))] matches!(self.inner.kind, Kind::User(User::BodyWriteAborted)) } @@ -280,6 +308,10 @@ impl Error { Error::new(Kind::Io).with(cause) } + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + all(feature = "http2", feature = "client") + ))] pub(super) fn new_closed() -> Error { Error::new(Kind::ChannelClosed) } @@ -300,6 +332,10 @@ impl Error { Error::new(Kind::BodyWrite).with(cause) } + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + feature = "ffi" + ))] pub(super) fn new_body_write_aborted() -> Error { Error::new(Kind::User(User::BodyWriteAborted)) } @@ -407,6 +443,10 @@ impl Error { Kind::IncompleteMessage => "connection closed before message completed", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::UnexpectedMessage => "received unexpected message from connection", + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + all(feature = "http2", feature = "client") + ))] Kind::ChannelClosed => "channel closed", Kind::Canceled => "operation was canceled", #[cfg(all(feature = "http1", feature = "server"))] @@ -436,6 +476,10 @@ impl Error { any(feature = "http1", feature = "http2") ))] Kind::User(User::Body) => "error from user's Body stream", + #[cfg(any( + all(feature = "http1", any(feature = "client", feature = "server")), + feature = "ffi" + ))] Kind::User(User::BodyWriteAborted) => "user body write aborted", #[cfg(any( all(any(feature = "client", feature = "server"), feature = "http1"),