From 5d93193f03ba45df3f180426a3fc9a27ca6ab5f9 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Fri, 23 Mar 2018 11:17:59 +0300 Subject: [PATCH 1/3] chore(deps): add futures-timer as dep --- Cargo.toml | 2 +- src/lib.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 244c4b14a..cf961ab37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ clippy = { version = "<= 0.1.0", optional = true } bytes = "0.4" byteorder = "1" futures = "0.1" +futures-timer = "0.1" log = "0.4" sodiumoxide = "0.0.16" tokio = "0.1" @@ -39,7 +40,6 @@ tokio-core = "0.1" tokio-io = "0.1" nom = "3.2" cookie-factory = "0.2.2" -futures-timer = "0.1" [dev-dependencies] env_logger = "0.5" diff --git a/src/lib.rs b/src/lib.rs index 1bf58b6cb..1e71f0d6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ Repo: https://github.com/tox-rs/tox extern crate bytes; extern crate byteorder; extern crate futures; +extern crate futures_timer; #[macro_use] extern crate log; #[macro_use] From cdcdcf6096a3bc5f1f185671e0204049c5ef2b98 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Fri, 23 Mar 2018 11:18:32 +0300 Subject: [PATCH 2/3] feat(tcp): move example code into library --- src/toxcore/tcp/server/mod.rs | 2 + src/toxcore/tcp/server/processor.rs | 83 +++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 src/toxcore/tcp/server/processor.rs diff --git a/src/toxcore/tcp/server/mod.rs b/src/toxcore/tcp/server/mod.rs index 9ee120650..e92258db2 100644 --- a/src/toxcore/tcp/server/mod.rs +++ b/src/toxcore/tcp/server/mod.rs @@ -23,6 +23,8 @@ mod client; mod server; +mod processor; pub use self::client::Client; pub use self::server::Server; +pub use self::processor::ServerProcessor; diff --git a/src/toxcore/tcp/server/processor.rs b/src/toxcore/tcp/server/processor.rs new file mode 100644 index 000000000..24958aece --- /dev/null +++ b/src/toxcore/tcp/server/processor.rs @@ -0,0 +1,83 @@ +/* + Copyright (C) 2013 Tox project All Rights Reserved. + Copyright © 2018 Roman Proskuryakov + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + +/*! The implementation of TCP Relay ServerProcessor +*/ + +use toxcore::tcp::packet::*; +use toxcore::tcp::server::{Server, Client}; +use toxcore::crypto_core::PublicKey; + +use futures::prelude::*; +use futures::sync::mpsc; + +use tokio_io::IoFuture; + +use std::net::IpAddr; +use std::io::{Error, ErrorKind}; + +/** `ServerProcessor` helps you to manage incoming clients, handle packets + and shutdown the connection gracefully +*/ +pub struct ServerProcessor { + /// Push all `Packet`'s received via network from client into this channel + pub to_server_tx: mpsc::UnboundedSender, + /// Send all `Packet`'s from this channel to client via network + pub to_client_rx: mpsc::UnboundedReceiver, + /// Run this future to process connection + pub processor: IoFuture<()> +} + +impl ServerProcessor { + /** Create `ServerProcessor` for the given server and connection + */ + pub fn create(server: Server, client_pk: PublicKey, addr: IpAddr, port: u16) -> ServerProcessor { + // Push all `Packet`'s received via network from client into this channel + let (to_server_tx, to_server_rx) = mpsc::unbounded(); + // Send all `Packet`'s from this channel to client via network + let (to_client_tx, to_client_rx) = mpsc::unbounded(); + + server.insert(Client::new(to_client_tx, &client_pk, addr, port)); + + let client_pk_c = client_pk.clone(); + let server_c = server.clone(); + // processor = for each Packet from client process it + let processor = to_server_rx + .map_err(|()| Error::from(ErrorKind::UnexpectedEof)) + .for_each(move |packet| { + debug!("Handle {:?} => {:?}", client_pk_c, packet); + server_c.handle_packet(&client_pk, packet) + }); + + // TODO ping request = each 30s send PingRequest to client + + let client_pk_c = client_pk.clone(); + let server_c = server.clone(); + let processor = processor + .then(move |r_processing| { + debug!("shutdown PK {:?}", &client_pk_c); + server_c.shutdown_client(&client_pk_c) + .then(move |r_shutdown| r_processing.and(r_shutdown)) + }); + + let processor = Box::new(processor); + ServerProcessor { to_server_tx, to_client_rx, processor } + } +} From d408adc2a8292fef4f3ef9a8fddd3ddf9f582f4d Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Fri, 23 Mar 2018 11:18:56 +0300 Subject: [PATCH 3/3] feat(tcp): use library code in tcp server example --- examples/tcp_server.rs | 113 +++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 55 deletions(-) diff --git a/examples/tcp_server.rs b/examples/tcp_server.rs index ae4f619bc..cdb8cacde 100644 --- a/examples/tcp_server.rs +++ b/examples/tcp_server.rs @@ -20,9 +20,9 @@ extern crate tox; extern crate futures; +extern crate futures_timer; extern crate tokio; extern crate tokio_io; -extern crate tokio_timer; #[macro_use] extern crate log; @@ -31,15 +31,16 @@ extern crate env_logger; use tox::toxcore::crypto_core::*; use tox::toxcore::tcp::make_server_handshake; use tox::toxcore::tcp::codec; -use tox::toxcore::tcp::server::{Server, Client}; +use tox::toxcore::tcp::server::{Server, ServerProcessor}; use futures::prelude::*; -use futures::sync::mpsc; +use futures_timer::ext::FutureExt; use tokio_io::AsyncRead; use tokio::net::TcpListener; use std::time; +use std::io::{Error, ErrorKind}; fn main() { env_logger::init(); @@ -70,68 +71,70 @@ fn main() { })?; debug!("A new client connected from {}", addr); - let server_inner_c = server_inner.clone(); let register_client = make_server_handshake(socket, server_sk.clone()) - .map_err(|e| { - error!("handshake error: {}", e); - e - }) - .and_then(move |(socket, channel, client_pk)| { + .map_err(|e| + Error::new(ErrorKind::Other, + format!("Handshake error {:?}", e)) + ) + .map(|(socket, channel, client_pk)| { debug!("Handshake for client {:?} complited", &client_pk); - let (tx, rx) = mpsc::unbounded(); - server_inner_c.insert(Client::new(tx, &client_pk, addr.ip(), addr.port())); - - Ok((socket, channel, client_pk, rx)) + (socket, channel, client_pk) }); let server_inner_c = server_inner.clone(); - let process_connection = register_client - .and_then(move |(socket, channel, client_pk, rx)| { - let server_inner_c_c = server_inner_c.clone(); - let secure_socket = socket.framed(codec::Codec::new(channel)); - let (to_client, from_client) = secure_socket.split(); - - // reader = for each Packet from client process it - let reader = from_client.for_each(move |packet| { - debug!("Handle {:?} => {:?}", client_pk, packet); - server_inner_c.handle_packet(&client_pk, packet) - }); - - let writer_timer = tokio_timer::wheel() - .tick_duration(time::Duration::from_secs(1)) - .build() - ; - // writer = for each Packet from rx send it to client - let writer = rx - .map_err(|()| unreachable!("rx can't fail")) - .fold(to_client, move |to_client, packet| { - debug!("Send {:?} => {:?}", client_pk, packet); - let sending_future = to_client.send(packet); - let duration = time::Duration::from_secs(30); - let timeout = writer_timer.timeout(sending_future, duration); - timeout + let process = register_client.and_then(move |(socket, channel, client_pk)| { + let secure_socket = socket.framed(codec::Codec::new(channel)); + let (to_client, from_client) = secure_socket.split(); + let ServerProcessor { to_server_tx, to_client_rx, processor } = + ServerProcessor::create( + server_inner_c, + client_pk.clone(), + addr.ip(), + addr.port() + ); + + // writer = for each Packet from to_client_rx send it to client + let writer = to_client_rx + .map_err(|()| Error::from(ErrorKind::UnexpectedEof)) + .fold(to_client, move |to_client, packet| { + debug!("Send {:?} => {:?}", client_pk, packet); + to_client.send(packet) + .timeout(time::Duration::from_secs(30)) + }) + // drop to_client when to_client_rx stream is exhausted + .map(|_to_client| ()) + .map_err(|_| + Error::new(ErrorKind::Other, + format!("Writer ended with error")) + ); + + // reader = for each Packet from client send it to server processor + let reader = from_client + .forward(to_server_tx + .sink_map_err(|e| + Error::new(ErrorKind::Other, + format!("Could not forward message from client to server {:?}", e)) + ) + ) + .map(|(_from_client, _sink_err)| ()) + .map_err(|_| + Error::new(ErrorKind::Other, + format!("Reader ended with error")) + ); + + processor + .select(reader) + .map_err(move |(err, _select_next)| { + err }) - // drop to_client when rx stream is exhausted - .map(|_to_client| ()); - - // TODO ping request = each 30s send PingRequest to client - - reader.select(writer) .map(|_| ()) + .select(writer) .map_err(move |(err, _select_next)| { - error!("Processing client {:?} ended with error: {:?}", &client_pk, err); err }) - .then(move |r_processing| { - debug!("shutdown PK {:?}", &client_pk); - server_inner_c_c.shutdown_client(&client_pk) - .then(move |r_shutdown| r_processing.and(r_shutdown)) - }) - }); - tokio::spawn(process_connection.then(|r| { - debug!("end of processing with result {:?}", r); - Ok(()) - })); + }); + + tokio::spawn( process.map(|_| ()).map_err(|_| ()) ); Ok(()) })