Skip to content

Commit

Permalink
refactor(common): remove common re-export
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Nov 6, 2023
1 parent 06137bc commit 0264e98
Show file tree
Hide file tree
Showing 30 changed files with 245 additions and 222 deletions.
20 changes: 11 additions & 9 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_channel::mpsc;
Expand All @@ -15,10 +18,9 @@ use http_body::{Body as HttpBody, SizeHint};
use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
use crate::common::watch;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{task, watch, Pin, Poll};
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

Expand Down Expand Up @@ -239,7 +241,7 @@ impl Body {
.get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
}

fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.take_delayed_eof() {
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
Expand Down Expand Up @@ -292,7 +294,7 @@ impl Body {
}
}

fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.kind {
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
Kind::Chan {
Expand Down Expand Up @@ -367,14 +369,14 @@ impl HttpBody for Body {

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.poll_eof(cx)
}

fn poll_trailers(
#[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
#[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
#[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Expand Down Expand Up @@ -470,7 +472,7 @@ impl fmt::Debug for Body {
impl Stream for Body {
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
}
}
Expand Down Expand Up @@ -550,15 +552,15 @@ impl From<Cow<'static, str>> for Body {

impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
}

fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
match self.want_rx.load(cx) {
WANT_READY => Poll::Ready(Ok(())),
WANT_PENDING => Poll::Pending,
Expand Down
17 changes: 9 additions & 8 deletions src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
use std::marker::Unpin;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use futures_channel::oneshot;
Expand All @@ -12,10 +16,7 @@ use tracing::{debug, trace, warn};

use crate::body::{Body, HttpBody};
use crate::client::connect::CaptureConnectionExtension;
use crate::common::{
exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin,
Poll,
};
use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, Lazy};
#[cfg(feature = "http2")]
use crate::ext::Protocol;
use crate::rt::Executor;
Expand Down Expand Up @@ -553,7 +554,7 @@ where
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

Expand All @@ -573,7 +574,7 @@ where
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

Expand Down Expand Up @@ -628,7 +629,7 @@ impl fmt::Debug for ResponseFuture {
impl Future for ResponseFuture {
type Output = crate::Result<Response<Body>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.get_mut().as_mut().poll(cx)
}
}
Expand All @@ -650,7 +651,7 @@ enum PoolTx<B> {
}

impl<B> PoolClient<B> {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
match self.tx {
PoolTx::Http1(ref mut tx) => tx.poll_ready(cx),
#[cfg(feature = "http2")]
Expand Down
21 changes: 11 additions & 10 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ pub mod http2;

use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
#[cfg(not(all(feature = "http1", feature = "http2")))]
use std::marker::PhantomData;
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(all(feature = "runtime", feature = "http2"))]
use std::time::Duration;

Expand All @@ -77,12 +81,9 @@ use tracing::{debug, trace};

use super::dispatch;
use crate::body::HttpBody;
use crate::common::exec::{BoxSendFuture, Exec};
#[cfg(not(all(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
#[cfg(feature = "http1")]
Expand Down Expand Up @@ -257,7 +258,7 @@ impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.dispatch.poll_ready(cx)
}

Expand Down Expand Up @@ -381,7 +382,7 @@ where
type Error = crate::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}

Expand Down Expand Up @@ -502,7 +503,7 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
match *self.inner.as_mut().expect("already upgraded") {
#[cfg(feature = "http1")]
ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
Expand Down Expand Up @@ -554,7 +555,7 @@ where
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")]
Expand Down Expand Up @@ -1067,7 +1068,7 @@ impl Builder {
impl Future for ResponseFuture {
type Output = crate::Result<Response<Body>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner {
ResponseFutureState::Waiting(ref mut rx) => {
Pin::new(rx).poll(cx).map(|res| match res {
Expand Down Expand Up @@ -1101,7 +1102,7 @@ where
{
type Output = crate::Result<proto::Dispatched>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
#[cfg(feature = "http1")]
ProtoClientProj::H1 { h1 } => h1.poll(cx),
Expand Down
18 changes: 10 additions & 8 deletions src/client/connect/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::future::Future;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::pin::Pin;
use std::str::FromStr;
use std::task::{self, Poll};
use std::task::{Context, Poll};
use std::{fmt, io, vec};

use tokio::task::JoinHandle;
Expand Down Expand Up @@ -113,7 +113,7 @@ impl Service<Name> for GaiResolver {
type Error = io::Error;
type Future = GaiFuture;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

Expand All @@ -138,7 +138,7 @@ impl fmt::Debug for GaiResolver {
impl Future for GaiFuture {
type Output = Result<GaiAddrs, io::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx).map(|res| match res {
Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }),
Ok(Err(err)) => Err(err),
Expand Down Expand Up @@ -286,7 +286,7 @@ impl Service<Name> for TokioThreadpoolGaiResolver {
type Error = io::Error;
type Future = TokioThreadpoolGaiFuture;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
Expand All @@ -299,7 +299,7 @@ impl Service<Name> for TokioThreadpoolGaiResolver {
impl Future for TokioThreadpoolGaiFuture {
type Output = Result<GaiAddrs, io::Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(tokio_executor::threadpool::blocking(|| (
self.name.as_str(),
0
Expand All @@ -318,8 +318,10 @@ impl Future for TokioThreadpoolGaiFuture {
*/

mod sealed {
use std::future::Future;
use std::task::{Context, Poll};

use super::{Name, SocketAddr};
use crate::common::{task, Future, Poll};
use tower_service::Service;

// "Trait alias" for `Service<Name, Response = Addrs>`
Expand All @@ -328,7 +330,7 @@ mod sealed {
type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
type Future: Future<Output = Result<Self::Addrs, Self::Error>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn resolve(&mut self, name: Name) -> Self::Future;
}

Expand All @@ -342,7 +344,7 @@ mod sealed {
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,13 @@ where
#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) mod sealed {
use std::error::Error as StdError;
use std::future::Future;
use std::marker::Unpin;

use ::http::Uri;
use tokio::io::{AsyncRead, AsyncWrite};

use super::Connection;
use crate::common::{Future, Unpin};

/// Connect to a destination, returning an IO transport.
///
Expand Down
17 changes: 7 additions & 10 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#[cfg(feature = "http2")]
use std::future::Future;
use std::marker::Unpin;
#[cfg(feature = "http2")]
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::FutureExt;
use tokio::sync::{mpsc, oneshot};

#[cfg(feature = "http2")]
use crate::common::Pin;
use crate::common::{task, Poll};

pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;

Expand Down Expand Up @@ -53,7 +53,7 @@ pub(crate) struct UnboundedSender<T, U> {
}

impl<T, U> Sender<T, U> {
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.giver
.poll_want(cx)
.map_err(|_| crate::Error::new_closed())
Expand Down Expand Up @@ -155,10 +155,7 @@ pub(crate) struct Receiver<T, U> {
}

impl<T, U> Receiver<T, U> {
pub(crate) fn poll_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
match self.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
Expand Down Expand Up @@ -245,7 +242,7 @@ impl<T, U> Callback<T, U> {
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
Expand Down
Loading

0 comments on commit 0264e98

Please sign in to comment.