diff --git a/Cargo.toml b/Cargo.toml index 1de8e682b8..ebd92d7780 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,15 +23,15 @@ include = [ bytes = "1" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } -http = "0.2.7" -http-body = "=1.0.0-rc.2" +http = "1" +http-body = "1" pin-project-lite = "0.2.4" tokio = { version = "1.13", features = ["sync"] } # Optional -h2 = { version = "0.3.9", optional = true } -http-body-util = { version = "=0.1.0-rc.3", optional = true } +h2 = { version = "0.4", optional = true } +http-body-util = { version = "0.1", optional = true } httparse = { version = "1.8", optional = true } httpdate = { version = "1.0", optional = true } itoa = { version = "1", optional = true } @@ -41,7 +41,7 @@ want = { version = "0.3", optional = true } [dev-dependencies] form_urlencoded = "1" -http-body-util = "=0.1.0-rc.3" +http-body-util = "0.1" pretty_env_logger = "0.5" spmc = "0.3" serde = { version = "1.0", features = ["derive"] } diff --git a/src/ffi/http_types.rs b/src/ffi/http_types.rs index 565ab095ce..a4d6b32a2c 100644 --- a/src/ffi/http_types.rs +++ b/src/ffi/http_types.rs @@ -20,12 +20,14 @@ pub struct hyper_response(pub(super) Response); /// An HTTP header map. /// /// These can be part of a request or response. +#[derive(Clone)] pub struct hyper_headers { pub(super) headers: HeaderMap, orig_casing: HeaderCaseMap, orig_order: OriginalHeaderOrder, } +#[derive(Clone)] pub(crate) struct OnInformational { func: hyper_request_on_informational_callback, data: UserDataPointer, diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 8dd8a5b9ab..664b6439d6 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -76,6 +76,7 @@ pub const HYPER_HTTP_VERSION_1_1: libc::c_int = 11; /// The HTTP/2 version. pub const HYPER_HTTP_VERSION_2: libc::c_int = 20; +#[derive(Clone)] struct UserDataPointer(*mut std::ffi::c_void); // We don't actually know anything about this pointer, it's up to the user diff --git a/src/upgrade.rs b/src/upgrade.rs index 6b7039f076..3b7735c56d 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -46,6 +46,7 @@ use std::future::Future; use std::io; use std::marker::Unpin; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use crate::rt::{Read, ReadBufCursor, Write}; @@ -69,8 +70,9 @@ pub struct Upgraded { /// A future for a possible HTTP upgrade. /// /// If no upgrade was available, or it doesn't succeed, yields an `Error`. +#[derive(Clone)] pub struct OnUpgrade { - rx: Option>>, + rx: Option>>>>, } /// The deconstructed parts of an [`Upgraded`] type. @@ -119,7 +121,12 @@ pub(super) struct Pending { ))] pub(super) fn pending() -> (Pending, OnUpgrade) { let (tx, rx) = oneshot::channel(); - (Pending { tx }, OnUpgrade { rx: Some(rx) }) + ( + Pending { tx }, + OnUpgrade { + rx: Some(Arc::new(Mutex::new(rx))), + }, + ) } // ===== impl Upgraded ===== @@ -219,13 +226,17 @@ impl OnUpgrade { impl Future for OnUpgrade { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.rx { - Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res { - Ok(Ok(upgraded)) => Ok(upgraded), - Ok(Err(err)) => Err(err), - Err(_oneshot_canceled) => Err(crate::Error::new_canceled().with(UpgradeExpected)), - }), + Some(ref rx) => Pin::new(&mut *rx.lock().unwrap()) + .poll(cx) + .map(|res| match res { + Ok(Ok(upgraded)) => Ok(upgraded), + Ok(Err(err)) => Err(err), + Err(_oneshot_canceled) => { + Err(crate::Error::new_canceled().with(UpgradeExpected)) + } + }), None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())), } } diff --git a/tests/client.rs b/tests/client.rs index b306016eea..92955af360 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -38,6 +38,7 @@ async fn tcp_connect(addr: &SocketAddr) -> std::io::Result> { TcpStream::connect(*addr).await.map(TokioIo::new) } +#[derive(Clone)] struct HttpInfo { remote_addr: SocketAddr, }