From 99ca6163da14e9547377580102f2c689339d2b45 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 5 Jun 2024 10:29:27 +0200 Subject: [PATCH] Make Future implementation on Connection unconditional on executor being Send + Sync. --- src/client/conn/http2.rs | 218 ++++++++++++++++++++++++++++++++++++++- src/proto/h2/client.rs | 2 +- 2 files changed, 218 insertions(+), 2 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 6d967713d8..e599bb5556 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -230,7 +230,7 @@ where B::Data: Send, E: Unpin, B::Error: Into>, - E: Http2ClientConnExec + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec + Unpin, { type Output = crate::Result<()>; @@ -457,3 +457,219 @@ where } } } + +#[cfg(test)] +mod tests { + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_non_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor; + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(LocalTokioExecutor, io) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData>, + } + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>( + LocalTokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_not_send_futures() { + #[derive(Clone)] + struct LocalTokioExecutor { + _x: std::marker::PhantomData>, + } + + impl crate::rt::Executor for LocalTokioExecutor + where + F: std::future::Future + 'static, // not requiring `Send` + { + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>( + LocalTokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor; + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = crate::client::conn::http2::handshake::< + _, + _, + http_body_util::Empty, + >(TokioExecutor, io) + .await + .unwrap(); + + tokio::task::spawn(async move { + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn not_send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { + // !Send, !Sync + _x: std::marker::PhantomData>, + } + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>( + TokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } + + #[tokio::test] + #[ignore] // only compilation is checked + async fn send_not_sync_executor_of_send_futures() { + #[derive(Clone)] + struct TokioExecutor { + // !Sync + _x: std::marker::PhantomData>, + } + + impl crate::rt::Executor for TokioExecutor + where + F: std::future::Future + 'static + Send, + F::Output: Send + 'static, + { + fn execute(&self, fut: F) { + tokio::task::spawn(fut); + } + } + + #[allow(unused)] + async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { + let (_sender, conn) = + crate::client::conn::http2::handshake::<_, _, http_body_util::Empty>( + TokioExecutor { + _x: Default::default(), + }, + io, + ) + .await + .unwrap(); + + tokio::task::spawn_local(async move { + // can't use spawn here because when executor is !Send + conn.await.unwrap(); + }); + } + } +} diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 7cb6c6ed5b..c735b614cd 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -611,7 +611,7 @@ where B: Body + 'static + Unpin, B::Data: Send, B::Error: Into>, - E: Http2ClientConnExec + 'static + Send + Sync + Unpin, + E: Http2ClientConnExec + Unpin, T: Read + Write + Unpin, { type Output = crate::Result;