Skip to content

Commit

Permalink
Merge pull request snapview#38 from Eroc33/master
Browse files Browse the repository at this point in the history
Port to tokio reform
  • Loading branch information
daniel-abramov authored May 25, 2018
2 parents 6ac55ac + e201263 commit 336bb25
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 46 deletions.
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ version = "0.5.1"

[features]
default = ["connect", "tls"]
connect = ["tokio-dns-unofficial", "tokio-core", "stream"]
connect = ["tokio-dns-unofficial", "tokio", "stream"]
tls = ["tokio-tls", "native-tls", "stream", "tungstenite/tls"]
stream = ["bytes"]

[dependencies]
futures = "0.1.17"
tokio-io = "0.1.2"

[dependencies.tungstenite]
version = "0.5.3"
Expand All @@ -34,11 +33,11 @@ version = "0.1.5"

[dependencies.tokio-dns-unofficial]
optional = true
version = "0.1.1"
version = "0.3.0"

[dependencies.tokio-core]
[dependencies.tokio]
optional = true
version = "0.1.9"
version = "0.1.6"

[dependencies.tokio-tls]
optional = true
Expand Down
13 changes: 4 additions & 9 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! You can use this example together with the `server` example.
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;
extern crate url;
Expand All @@ -22,7 +22,6 @@ use std::thread;

use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
use tungstenite::protocol::Message;

use tokio_tungstenite::connect_async;
Expand All @@ -35,10 +34,6 @@ fn main() {

let url = url::Url::parse(&connect_addr).unwrap();

// Create the event loop and initiate the connection to the remote server.
let mut core = Core::new().unwrap();
let handle = core.handle();

// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data from stdin and then send it to the event loop over a standard
Expand All @@ -63,7 +58,7 @@ fn main() {
// finishes. If we don't have any more data to read or we won't receive any
// more work from the remote then we can exit.
let mut stdout = io::stdout();
let client = connect_async(url, handle.remote().clone()).and_then(|(ws_stream, _)| {
let client = connect_async(url).and_then(move |(ws_stream, _)| {
println!("WebSocket handshake has been successfully completed");

// `sink` is the stream of messages going out.
Expand All @@ -73,7 +68,7 @@ fn main() {
// We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(|message| {
let write_stdout = stream.for_each(move |message| {
stdout.write_all(&message.into_data()).unwrap();
Ok(())
});
Expand All @@ -88,7 +83,7 @@ fn main() {
});

// And now that we've got our client, we execute it in the event loop!
core.run(client).unwrap();
tokio::runtime::run(client.map_err(|_e| ()));
}

// Our helper method which will read data from stdin and send it along the
Expand Down
29 changes: 13 additions & 16 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
//! messages.
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;

use std::cell::RefCell;
use std::collections::HashMap;
use std::env;
use std::io::{Error, ErrorKind};
use std::rc::Rc;
use std::sync::{Arc,Mutex};

use futures::stream::Stream;
use futures::Future;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio::net::TcpListener;
use tungstenite::protocol::Message;

use tokio_tungstenite::accept_async;
Expand All @@ -41,24 +39,23 @@ fn main() {
let addr = addr.parse().unwrap();

// Create the event loop and TCP listener we'll accept connections on.
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);

// This is a single-threaded server, so we can just use Rc and RefCell to
// store the map of all connections we know about.
let connections = Rc::new(RefCell::new(HashMap::new()));
let connections = Arc::new(Mutex::new(HashMap::new()));

let srv = socket.incoming().for_each(|(stream, addr)| {
let srv = socket.incoming().for_each(move |stream| {

let addr = stream.peer_addr().expect("connected streams should have a peer address");

// We have to clone both of these values, because the `and_then`
// function below constructs a new future, `and_then` requires
// `FnOnce`, so we construct a move closure to move the
// environment inside the future (AndThen future may overlive our
// `for_each` future).
let connections_inner = connections.clone();
let handle_inner = handle.clone();

accept_async(stream).and_then(move |ws_stream| {
println!("New WebSocket connection: {}", addr);
Expand All @@ -67,7 +64,7 @@ fn main() {
// send us messages. Then register our address with the stream to send
// data to us.
let (tx, rx) = futures::sync::mpsc::unbounded();
connections_inner.borrow_mut().insert(addr, tx);
connections_inner.lock().unwrap().insert(addr, tx);

// Let's split the WebSocket stream, so we can work with the
// reading and writing halves separately.
Expand All @@ -81,7 +78,7 @@ fn main() {

// For each open connection except the sender, send the
// string via the channel.
let mut conns = connections.borrow_mut();
let mut conns = connections.lock().unwrap();
let iter = conns.iter_mut()
.filter(|&(&k, _)| k != addr)
.map(|(_, v)| v);
Expand All @@ -105,8 +102,8 @@ fn main() {
let connection = ws_reader.map(|_| ()).map_err(|_| ())
.select(ws_writer.map(|_| ()).map_err(|_| ()));

handle_inner.spawn(connection.then(move |_| {
connections_inner.borrow_mut().remove(&addr);
tokio::spawn(connection.then(move |_| {
connections_inner.lock().unwrap().remove(&addr);
println!("Connection {} closed.", addr);
Ok(())
}));
Expand All @@ -119,5 +116,5 @@ fn main() {
});

// Execute server.
core.run(srv).unwrap();
tokio::runtime::run(srv.map_err(|_e| ()));
}
23 changes: 10 additions & 13 deletions src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
//! Connection helper.
extern crate tokio_dns;
extern crate tokio_core;

use std::io::Result as IoResult;

use self::tokio_core::net::TcpStream;
use self::tokio_core::reactor::Remote;
use self::tokio_dns::tcp_connect;
use tokio::net::TcpStream;

use futures::{future, Future};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::Error;
use tungstenite::client::url_mode;
Expand All @@ -36,7 +33,7 @@ mod encryption {
use std::io::{Read, Write, Result as IoResult};

use futures::{future, Future};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::Error;
use tungstenite::stream::Mode;
Expand All @@ -53,9 +50,9 @@ mod encryption {
}

pub fn wrap_stream<S>(socket: S, domain: String, mode: Mode)
-> Box<Future<Item=AutoStream<S>, Error=Error>>
-> Box<Future<Item=AutoStream<S>, Error=Error> + Send>
where
S: 'static + AsyncRead + AsyncWrite,
S: 'static + AsyncRead + AsyncWrite + Send,
{
match mode {
Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))),
Expand Down Expand Up @@ -106,10 +103,10 @@ fn domain(request: &Request) -> Result<String, Error> {
/// Creates a WebSocket handshake from a request and a stream,
/// upgrading the stream to TLS if required.
pub fn client_async_tls<R, S>(request: R, stream: S)
-> Box<Future<Item=(WebSocketStream<AutoStream<S>>, Response), Error=Error>>
-> Box<Future<Item=(WebSocketStream<AutoStream<S>>, Response), Error=Error> + Send>
where
R: Into<Request<'static>>,
S: 'static + AsyncRead + AsyncWrite + NoDelay,
S: 'static + AsyncRead + AsyncWrite + NoDelay + Send,
{
let request: Request = request.into();

Expand All @@ -134,8 +131,8 @@ where
}

/// Connect to a given URL.
pub fn connect_async<R>(request: R, handle: Remote)
-> Box<Future<Item=(WebSocketStream<AutoStream<TcpStream>>, Response), Error=Error>>
pub fn connect_async<R>(request: R)
-> Box<Future<Item=(WebSocketStream<AutoStream<TcpStream>>, Response), Error=Error> + Send>
where
R: Into<Request<'static>>
{
Expand All @@ -147,6 +144,6 @@ where
};
let port = request.url.port_or_known_default().expect("Bug: port unknown");

Box::new(tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into())
Box::new(tokio_dns::TcpStream::connect((domain.as_str(), port)).map_err(|e| e.into())
.and_then(move |socket| client_async_tls(request, socket)))
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
unused_import_braces)]

extern crate futures;
extern crate tokio_io;
extern crate tokio;

pub extern crate tungstenite;

Expand All @@ -29,7 +29,7 @@ pub mod stream;
use std::io::ErrorKind;

use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::handshake::client::{ClientHandshake, Response, Request};
use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback};
Expand Down
2 changes: 1 addition & 1 deletion src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io::{Read, Write, Result as IoResult, Error as IoError};

use self::bytes::{Buf, BufMut};
use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};

/// Trait to switch TCP_NODELAY.
pub trait NoDelay {
Expand Down

0 comments on commit 336bb25

Please sign in to comment.