Skip to content

Commit

Permalink
Merge pull request #66 from tox-rs/split_tcp
Browse files Browse the repository at this point in the history
Split tcp server into library code
  • Loading branch information
kpp authored Mar 23, 2018
2 parents ccf6d00 + d408adc commit aca251a
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ clippy = { version = "<= 0.1.0", optional = true }
bytes = "0.4"
byteorder = "1"
futures = "0.1"
futures-timer = "0.1"
log = "0.4"
sodiumoxide = "0.0.16"
tokio = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
nom = "3.2"
cookie-factory = "0.2.2"
futures-timer = "0.1"

[dev-dependencies]
env_logger = "0.5"
Expand Down
113 changes: 58 additions & 55 deletions examples/tcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

extern crate tox;
extern crate futures;
extern crate futures_timer;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_timer;

#[macro_use]
extern crate log;
Expand All @@ -31,15 +31,16 @@ extern crate env_logger;
use tox::toxcore::crypto_core::*;
use tox::toxcore::tcp::make_server_handshake;
use tox::toxcore::tcp::codec;
use tox::toxcore::tcp::server::{Server, Client};
use tox::toxcore::tcp::server::{Server, ServerProcessor};

use futures::prelude::*;
use futures::sync::mpsc;
use futures_timer::ext::FutureExt;

use tokio_io::AsyncRead;
use tokio::net::TcpListener;

use std::time;
use std::io::{Error, ErrorKind};

fn main() {
env_logger::init();
Expand Down Expand Up @@ -70,68 +71,70 @@ fn main() {
})?;
debug!("A new client connected from {}", addr);

let server_inner_c = server_inner.clone();
let register_client = make_server_handshake(socket, server_sk.clone())
.map_err(|e| {
error!("handshake error: {}", e);
e
})
.and_then(move |(socket, channel, client_pk)| {
.map_err(|e|
Error::new(ErrorKind::Other,
format!("Handshake error {:?}", e))
)
.map(|(socket, channel, client_pk)| {
debug!("Handshake for client {:?} complited", &client_pk);
let (tx, rx) = mpsc::unbounded();
server_inner_c.insert(Client::new(tx, &client_pk, addr.ip(), addr.port()));

Ok((socket, channel, client_pk, rx))
(socket, channel, client_pk)
});

let server_inner_c = server_inner.clone();
let process_connection = register_client
.and_then(move |(socket, channel, client_pk, rx)| {
let server_inner_c_c = server_inner_c.clone();
let secure_socket = socket.framed(codec::Codec::new(channel));
let (to_client, from_client) = secure_socket.split();

// reader = for each Packet from client process it
let reader = from_client.for_each(move |packet| {
debug!("Handle {:?} => {:?}", client_pk, packet);
server_inner_c.handle_packet(&client_pk, packet)
});

let writer_timer = tokio_timer::wheel()
.tick_duration(time::Duration::from_secs(1))
.build()
;
// writer = for each Packet from rx send it to client
let writer = rx
.map_err(|()| unreachable!("rx can't fail"))
.fold(to_client, move |to_client, packet| {
debug!("Send {:?} => {:?}", client_pk, packet);
let sending_future = to_client.send(packet);
let duration = time::Duration::from_secs(30);
let timeout = writer_timer.timeout(sending_future, duration);
timeout
let process = register_client.and_then(move |(socket, channel, client_pk)| {
let secure_socket = socket.framed(codec::Codec::new(channel));
let (to_client, from_client) = secure_socket.split();
let ServerProcessor { to_server_tx, to_client_rx, processor } =
ServerProcessor::create(
server_inner_c,
client_pk.clone(),
addr.ip(),
addr.port()
);

// writer = for each Packet from to_client_rx send it to client
let writer = to_client_rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.fold(to_client, move |to_client, packet| {
debug!("Send {:?} => {:?}", client_pk, packet);
to_client.send(packet)
.timeout(time::Duration::from_secs(30))
})
// drop to_client when to_client_rx stream is exhausted
.map(|_to_client| ())
.map_err(|_|
Error::new(ErrorKind::Other,
format!("Writer ended with error"))
);

// reader = for each Packet from client send it to server processor
let reader = from_client
.forward(to_server_tx
.sink_map_err(|e|
Error::new(ErrorKind::Other,
format!("Could not forward message from client to server {:?}", e))
)
)
.map(|(_from_client, _sink_err)| ())
.map_err(|_|
Error::new(ErrorKind::Other,
format!("Reader ended with error"))
);

processor
.select(reader)
.map_err(move |(err, _select_next)| {
err
})
// drop to_client when rx stream is exhausted
.map(|_to_client| ());

// TODO ping request = each 30s send PingRequest to client

reader.select(writer)
.map(|_| ())
.select(writer)
.map_err(move |(err, _select_next)| {
error!("Processing client {:?} ended with error: {:?}", &client_pk, err);
err
})
.then(move |r_processing| {
debug!("shutdown PK {:?}", &client_pk);
server_inner_c_c.shutdown_client(&client_pk)
.then(move |r_shutdown| r_processing.and(r_shutdown))
})
});
tokio::spawn(process_connection.then(|r| {
debug!("end of processing with result {:?}", r);
Ok(())
}));
});

tokio::spawn( process.map(|_| ()).map_err(|_| ()) );

Ok(())
})
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Repo: https://github.com/tox-rs/tox
extern crate bytes;
extern crate byteorder;
extern crate futures;
extern crate futures_timer;
#[macro_use]
extern crate log;
#[macro_use]
Expand Down
2 changes: 2 additions & 0 deletions src/toxcore/tcp/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

mod client;
mod server;
mod processor;

pub use self::client::Client;
pub use self::server::Server;
pub use self::processor::ServerProcessor;
83 changes: 83 additions & 0 deletions src/toxcore/tcp/server/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright (C) 2013 Tox project All Rights Reserved.
Copyright © 2018 Roman Proskuryakov <humbug@deeptown.org>
This file is part of Tox.
Tox is libre software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Tox is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Tox. If not, see <http://www.gnu.org/licenses/>.
*/

/*! The implementation of TCP Relay ServerProcessor
*/

use toxcore::tcp::packet::*;
use toxcore::tcp::server::{Server, Client};
use toxcore::crypto_core::PublicKey;

use futures::prelude::*;
use futures::sync::mpsc;

use tokio_io::IoFuture;

use std::net::IpAddr;
use std::io::{Error, ErrorKind};

/** `ServerProcessor` helps you to manage incoming clients, handle packets
and shutdown the connection gracefully
*/
pub struct ServerProcessor {
/// Push all `Packet`'s received via network from client into this channel
pub to_server_tx: mpsc::UnboundedSender<Packet>,
/// Send all `Packet`'s from this channel to client via network
pub to_client_rx: mpsc::UnboundedReceiver<Packet>,
/// Run this future to process connection
pub processor: IoFuture<()>
}

impl ServerProcessor {
/** Create `ServerProcessor` for the given server and connection
*/
pub fn create(server: Server, client_pk: PublicKey, addr: IpAddr, port: u16) -> ServerProcessor {
// Push all `Packet`'s received via network from client into this channel
let (to_server_tx, to_server_rx) = mpsc::unbounded();
// Send all `Packet`'s from this channel to client via network
let (to_client_tx, to_client_rx) = mpsc::unbounded();

server.insert(Client::new(to_client_tx, &client_pk, addr, port));

let client_pk_c = client_pk.clone();
let server_c = server.clone();
// processor = for each Packet from client process it
let processor = to_server_rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(move |packet| {
debug!("Handle {:?} => {:?}", client_pk_c, packet);
server_c.handle_packet(&client_pk, packet)
});

// TODO ping request = each 30s send PingRequest to client

let client_pk_c = client_pk.clone();
let server_c = server.clone();
let processor = processor
.then(move |r_processing| {
debug!("shutdown PK {:?}", &client_pk_c);
server_c.shutdown_client(&client_pk_c)
.then(move |r_shutdown| r_processing.and(r_shutdown))
});

let processor = Box::new(processor);
ServerProcessor { to_server_tx, to_client_rx, processor }
}
}

0 comments on commit aca251a

Please sign in to comment.