Skip to content

Commit

Permalink
Replace futures-util with futures-core & futures-sink (#3)
Browse files Browse the repository at this point in the history
Although `futures-util` is used by a majority of downstream users, not
depending upon it speeds up the building of tokio-websockets as it can
be scheduled earlier.

This removes the following transient dependencies:

* `futures-task`
* `pun-utils`
* `slab`
  • Loading branch information
vilgotf committed Jul 19, 2023
1 parent ca381bd commit 49f3df8
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ license = "MIT"

[dependencies]
bytes = "1"
futures-util = { version = "0.3.14", default-features = false, features = ["sink"] }
futures-core = "0.3"
futures-sink = "0.3"
tokio = { version = "1", default-features = false, features = ["net", "io-util", "rt"] }
tokio-util = { version = "0.7.1", default-features = false, features = ["codec"] }

Expand Down Expand Up @@ -56,6 +57,7 @@ rustls-webpki-roots = ["tokio-rustls", "ring", "webpki", "webpki-roots"]
rustls-native-roots = ["tokio-rustls", "ring", "rustls-native-certs"]

[dev-dependencies]
futures-util = { version = "0.3.14", default-features = false, features = ["sink"] }
# For tests
hyper = { version = "0.14", default-features = false, features = ["client", "http1", "tcp"] }
# This is just a little bit of performance tuning
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ High performance, strict, tokio-util based websockets implementation.

- Built with tokio-util, intended to be used with tokio from the ground up
- Minimal dependencies: The base only requires:
- tokio, tokio-util, bytes, futures-util (which almost all tokio projects depend on)
- tokio, tokio-util, bytes, futures-core, futures-sink
- SHA1 backend, e.g. sha1_smol (see [Feature flags](#feature-flags))
- Big selection of features to tailor dependencies to any project (see [Feature flags](#feature-flags))
- SIMD support: AVX2, SSE2 or NEON for frame (un)masking and accelerated UTF-8 validation
Expand Down
10 changes: 7 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
//! - By performing the handshake yourself and then using
//! [`Builder::take_over`] to let it take over a websocket stream
use std::{
future::poll_fn,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
str::FromStr,
};

use base64::{engine::general_purpose, Engine};
use futures_util::StreamExt;
use futures_core::Stream;
use http::{header::HeaderName, HeaderMap, HeaderValue, Uri};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
Expand Down Expand Up @@ -276,8 +278,10 @@ impl<'a> Builder<'a> {
let request = build_request(uri, &key_base64, &self.headers);
stream.write_all(&request).await?;

let (opt, framed) = upgrade_codec.framed(stream).into_future().await;
let res = opt.ok_or(Error::NoUpgradeResponse)??;
let mut framed = upgrade_codec.framed(stream);
let res = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx))
.await
.ok_or(Error::NoUpgradeResponse)??;

Ok((
WebsocketStream::from_framed(
Expand Down
21 changes: 17 additions & 4 deletions src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
collections::VecDeque,
fmt,
future::poll_fn,
hint::unreachable_unchecked,
mem::{replace, take},
pin::Pin,
Expand All @@ -12,7 +13,8 @@ use std::{
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
use futures_core::Stream;
use futures_sink::Sink;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder, Framed};

Expand Down Expand Up @@ -613,7 +615,7 @@ impl Default for Limits {

/// A websocket stream that full messages can be read from and written to.
///
/// The stream implements [`futures_util::Sink`] and [`futures_util::Stream`].
/// The stream implements [`futures_sink::Sink`] and [`futures_core::Stream`].
///
/// You must use a [`ClientBuilder`] or [`ServerBuilder`] to
/// obtain a websocket stream.
Expand Down Expand Up @@ -728,7 +730,7 @@ where
};

loop {
let frame = match ready!(self.inner.poll_next_unpin(cx)) {
let frame = match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(frame)) => frame,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
Expand Down Expand Up @@ -862,7 +864,18 @@ where
close_code: Option<CloseCode>,
reason: Option<&str>,
) -> Result<(), Error> {
self.send(Message::close(close_code, reason)).await
let mut item = Some(Message::close(close_code, reason));
let mut this = Pin::new(self);
poll_fn(|cx| {
if item.is_some() {
ready!(this.as_mut().poll_ready(cx)?);
}
if let Some(item) = item.take() {
this.as_mut().start_send(item)?;
}
this.as_mut().poll_flush(cx)
})
.await
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
//! established stream, via [`Builder::accept`]
//! - By performing the handshake yourself and then using [`Builder::serve`]
//! to let it take over a websocket stream
use futures_util::StreamExt;
use std::{future::poll_fn, pin::Pin};

use futures_core::Stream;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Framed};

Expand Down Expand Up @@ -79,7 +81,8 @@ impl Builder {
&self,
stream: S,
) -> Result<WebsocketStream<S>, Error> {
let (reply, framed) = client_request::Codec {}.framed(stream).into_future().await;
let mut framed = client_request::Codec {}.framed(stream);
let reply = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx)).await;
let mut parts = framed.into_parts();

match reply {
Expand Down

0 comments on commit 49f3df8

Please sign in to comment.