Skip to content

Commit

Permalink
refactor(common): move common::watch module to http1 feature
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored and seanmonstar committed Dec 13, 2023
1 parent be2024a commit e303f5a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ include = [

[dependencies]
bytes = "1"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "1"
http-body = "1"
pin-project-lite = "0.2.4"
tokio = { version = "1", features = ["sync"] }

# Optional

futures-channel = { version = "0.3", optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }
h2 = { version = "0.4", optional = true }
http-body-util = { version = "0.1", optional = true }
httparse = { version = "1.8", optional = true }
Expand Down Expand Up @@ -74,8 +74,8 @@ full = [
]

# HTTP versions
http1 = ["dep:httparse", "dep:itoa"]
http2 = ["dep:h2"]
http1 = ["dep:futures-channel", "dep:futures-util", "dep:httparse", "dep:itoa"]
http2 = ["dep:futures-channel", "dep:futures-util", "dep:h2"]

# Client/Server
client = ["dep:want"]
Expand Down
17 changes: 14 additions & 3 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use futures_channel::{mpsc, oneshot};
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use http::HeaderMap;
use http_body::{Body, Frame, SizeHint};

Expand All @@ -17,11 +18,14 @@ use http_body::{Body, Frame, SizeHint};
any(feature = "client", feature = "server")
))]
use super::DecodedLength;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use crate::common::watch;
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
type TrailersSender = oneshot::Sender<HeaderMap>;

/// A stream of `Bytes`, used when receiving bodies from the network.
Expand Down Expand Up @@ -78,13 +82,16 @@ enum Kind {
/// [`Body::channel()`]: struct.Body.html#method.channel
/// [`Sender::abort()`]: struct.Sender.html#method.abort
#[must_use = "Sender does nothing unless sent on"]
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
pub(crate) struct Sender {
want_rx: watch::Receiver,
data_tx: BodySender,
trailers_tx: Option<TrailersSender>,
}

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
const WANT_PENDING: usize = 1;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
const WANT_READY: usize = 2;

impl Incoming {
Expand Down Expand Up @@ -331,6 +338,7 @@ impl fmt::Debug for Incoming {
}
}

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
impl Sender {
/// Check to see if this `Sender` can send more data.
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Expand All @@ -350,11 +358,13 @@ impl Sender {
}
}

#[cfg(test)]
async fn ready(&mut self) -> crate::Result<()> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
}

/// Send data on data channel when it is ready.
#[cfg(test)]
#[allow(unused)]
pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
self.ready().await?;
Expand Down Expand Up @@ -392,7 +402,7 @@ impl Sender {
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

#[allow(unused)]
#[cfg(test)]
pub(crate) fn abort(mut self) {
self.send_error(crate::Error::new_body_write_aborted());
}
Expand All @@ -406,6 +416,7 @@ impl Sender {
}
}

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
impl fmt::Debug for Sender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[derive(Debug)]
Expand Down
5 changes: 5 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#[cfg(all(
any(feature = "client", feature = "server"),
any(feature = "http1", feature = "http2")
))]
macro_rules! ready {
($e:expr) => {
match $e {
Expand All @@ -18,4 +22,5 @@ pub(crate) mod task;
all(any(feature = "client", feature = "server"), feature = "http2"),
))]
pub(crate) mod time;
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
pub(crate) mod watch;
1 change: 0 additions & 1 deletion src/common/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Value = usize;

pub(crate) const CLOSED: usize = 0;

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
debug_assert!(
initial != CLOSED,
Expand Down
44 changes: 44 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub(super) enum Kind {
/// A pending item was dropped before ever being processed.
Canceled,
/// Indicates a channel (client or body sender) is closed.
#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
all(feature = "http2", feature = "client")
))]
ChannelClosed,
/// An `io::Error` that occurred while trying to read or write to a network stream.
#[cfg(all(
Expand Down Expand Up @@ -121,6 +125,10 @@ pub(super) enum User {
))]
Body,
/// The user aborted writing of the outgoing body.
#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
feature = "ffi"
))]
BodyWriteAborted,
/// Error from future of user's Service.
#[cfg(any(
Expand Down Expand Up @@ -192,6 +200,16 @@ impl Error {

/// Returns true if a sender's channel is closed.
pub fn is_closed(&self) -> bool {
#[cfg(not(any(
all(feature = "http1", any(feature = "client", feature = "server")),
all(feature = "http2", feature = "client")
)))]
return false;

#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
all(feature = "http2", feature = "client")
))]
matches!(self.inner.kind, Kind::ChannelClosed)
}

Expand All @@ -202,6 +220,16 @@ impl Error {

/// Returns true if the body write was aborted.
pub fn is_body_write_aborted(&self) -> bool {
#[cfg(not(any(
all(feature = "http1", any(feature = "client", feature = "server")),
feature = "ffi"
)))]
return false;

#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
feature = "ffi"
))]
matches!(self.inner.kind, Kind::User(User::BodyWriteAborted))
}

Expand Down Expand Up @@ -280,6 +308,10 @@ impl Error {
Error::new(Kind::Io).with(cause)
}

#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
all(feature = "http2", feature = "client")
))]
pub(super) fn new_closed() -> Error {
Error::new(Kind::ChannelClosed)
}
Expand All @@ -300,6 +332,10 @@ impl Error {
Error::new(Kind::BodyWrite).with(cause)
}

#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
feature = "ffi"
))]
pub(super) fn new_body_write_aborted() -> Error {
Error::new(Kind::User(User::BodyWriteAborted))
}
Expand Down Expand Up @@ -407,6 +443,10 @@ impl Error {
Kind::IncompleteMessage => "connection closed before message completed",
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
Kind::UnexpectedMessage => "received unexpected message from connection",
#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
all(feature = "http2", feature = "client")
))]
Kind::ChannelClosed => "channel closed",
Kind::Canceled => "operation was canceled",
#[cfg(all(feature = "http1", feature = "server"))]
Expand Down Expand Up @@ -436,6 +476,10 @@ impl Error {
any(feature = "http1", feature = "http2")
))]
Kind::User(User::Body) => "error from user's Body stream",
#[cfg(any(
all(feature = "http1", any(feature = "client", feature = "server")),
feature = "ffi"
))]
Kind::User(User::BodyWriteAborted) => "user body write aborted",
#[cfg(any(
all(any(feature = "client", feature = "server"), feature = "http1"),
Expand Down

0 comments on commit e303f5a

Please sign in to comment.