From 49f3df86c1a191271ae562944b1ec7929b3cb85d Mon Sep 17 00:00:00 2001 From: Tim Vilgot Mikael Fredenberg Date: Wed, 19 Jul 2023 17:47:22 +0200 Subject: [PATCH] Replace `futures-util` with `futures-core` & `futures-sink` (#3) Although `futures-util` is used by a majority of downstream users, not depending upon it speeds up the building of tokio-websockets as it can be scheduled earlier. This removes the following transient dependencies: * `futures-task` * `pun-utils` * `slab` --- Cargo.toml | 4 +++- README.md | 2 +- src/client.rs | 10 +++++++--- src/proto.rs | 21 +++++++++++++++++---- src/server.rs | 7 +++++-- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ceaed2f8fa8..ec22668ec59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ license = "MIT" [dependencies] bytes = "1" -futures-util = { version = "0.3.14", default-features = false, features = ["sink"] } +futures-core = "0.3" +futures-sink = "0.3" tokio = { version = "1", default-features = false, features = ["net", "io-util", "rt"] } tokio-util = { version = "0.7.1", default-features = false, features = ["codec"] } @@ -56,6 +57,7 @@ rustls-webpki-roots = ["tokio-rustls", "ring", "webpki", "webpki-roots"] rustls-native-roots = ["tokio-rustls", "ring", "rustls-native-certs"] [dev-dependencies] +futures-util = { version = "0.3.14", default-features = false, features = ["sink"] } # For tests hyper = { version = "0.14", default-features = false, features = ["client", "http1", "tcp"] } # This is just a little bit of performance tuning diff --git a/README.md b/README.md index 9c03786aa0a..bc3cf226795 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ High performance, strict, tokio-util based websockets implementation. - Built with tokio-util, intended to be used with tokio from the ground up - Minimal dependencies: The base only requires: - - tokio, tokio-util, bytes, futures-util (which almost all tokio projects depend on) + - tokio, tokio-util, bytes, futures-core, futures-sink - SHA1 backend, e.g. sha1_smol (see [Feature flags](#feature-flags)) - Big selection of features to tailor dependencies to any project (see [Feature flags](#feature-flags)) - SIMD support: AVX2, SSE2 or NEON for frame (un)masking and accelerated UTF-8 validation diff --git a/src/client.rs b/src/client.rs index caa4c7be000..0ca51361fd0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,12 +8,14 @@ //! - By performing the handshake yourself and then using //! [`Builder::take_over`] to let it take over a websocket stream use std::{ + future::poll_fn, net::{SocketAddr, ToSocketAddrs}, + pin::Pin, str::FromStr, }; use base64::{engine::general_purpose, Engine}; -use futures_util::StreamExt; +use futures_core::Stream; use http::{header::HeaderName, HeaderMap, HeaderValue, Uri}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, @@ -276,8 +278,10 @@ impl<'a> Builder<'a> { let request = build_request(uri, &key_base64, &self.headers); stream.write_all(&request).await?; - let (opt, framed) = upgrade_codec.framed(stream).into_future().await; - let res = opt.ok_or(Error::NoUpgradeResponse)??; + let mut framed = upgrade_codec.framed(stream); + let res = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx)) + .await + .ok_or(Error::NoUpgradeResponse)??; Ok(( WebsocketStream::from_framed( diff --git a/src/proto.rs b/src/proto.rs index dc7546410c4..cb91ffc4d66 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -4,6 +4,7 @@ use std::{ collections::VecDeque, fmt, + future::poll_fn, hint::unreachable_unchecked, mem::{replace, take}, pin::Pin, @@ -12,7 +13,8 @@ use std::{ }; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_util::{Sink, SinkExt, Stream, StreamExt}; +use futures_core::Stream; +use futures_sink::Sink; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::{Decoder, Encoder, Framed}; @@ -613,7 +615,7 @@ impl Default for Limits { /// A websocket stream that full messages can be read from and written to. /// -/// The stream implements [`futures_util::Sink`] and [`futures_util::Stream`]. +/// The stream implements [`futures_sink::Sink`] and [`futures_core::Stream`]. /// /// You must use a [`ClientBuilder`] or [`ServerBuilder`] to /// obtain a websocket stream. @@ -728,7 +730,7 @@ where }; loop { - let frame = match ready!(self.inner.poll_next_unpin(cx)) { + let frame = match ready!(Pin::new(&mut self.inner).poll_next(cx)) { Some(Ok(frame)) => frame, Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => return Poll::Ready(None), @@ -862,7 +864,18 @@ where close_code: Option, reason: Option<&str>, ) -> Result<(), Error> { - self.send(Message::close(close_code, reason)).await + let mut item = Some(Message::close(close_code, reason)); + let mut this = Pin::new(self); + poll_fn(|cx| { + if item.is_some() { + ready!(this.as_mut().poll_ready(cx)?); + } + if let Some(item) = item.take() { + this.as_mut().start_send(item)?; + } + this.as_mut().poll_flush(cx) + }) + .await } } diff --git a/src/server.rs b/src/server.rs index 79c2e480c71..c513a613291 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,7 +5,9 @@ //! established stream, via [`Builder::accept`] //! - By performing the handshake yourself and then using [`Builder::serve`] //! to let it take over a websocket stream -use futures_util::StreamExt; +use std::{future::poll_fn, pin::Pin}; + +use futures_core::Stream; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{Decoder, Framed}; @@ -79,7 +81,8 @@ impl Builder { &self, stream: S, ) -> Result, Error> { - let (reply, framed) = client_request::Codec {}.framed(stream).into_future().await; + let mut framed = client_request::Codec {}.framed(stream); + let reply = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx)).await; let mut parts = framed.into_parts(); match reply {