Skip to content

Commit

Permalink
upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
xerik committed Apr 3, 2020
1 parent 378141b commit 8d3aea2
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 193 deletions.
202 changes: 85 additions & 117 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ net2 = "^0.2"
tokio-io = "0.2.0-alpha.6"
tokio-net = "0.2.0-alpha.6"
radix_trie = "0.1.*"
structopt = "^0"
structopt = "0.3.12"
treebitmap = "0.3.1"
trust-dns = { version = "^0.17", default-features = false }
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
tokio = "0.2.0-alpha.6"
tokio = { version = "0.2.16", features = ["io-util", "io-driver", "rt-threaded" , "sync", "time", "tcp", "udp"] }

4 changes: 1 addition & 3 deletions asocks5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ bytes = "^0.4"
failure = "0.1.1"
log = "^0.4"
net2 = "^0.2"
tokio-io = "0.2.0-alpha.6"
tokio-net = "0.2.0-alpha.6"
futures-util-preview = "0.2.2"
futures-preview = { version = "0.3.0-alpha.16", features = ["compat"] }
tokio = "0.2.0-alpha.6"
tokio = { version = "0.2.16", features = ["io-util", "io-driver", "macros", "rt-threaded" , "sync", "time", "tcp", "udp"] }

[dev-dependencies]
structopt = "*"
Expand Down
40 changes: 14 additions & 26 deletions asocks5/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,24 @@ use futures::executor::LocalPool;
use futures::task::SpawnExt;

// the main function is actually no different from any ordinary tcp server
fn main() -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
println!("Async socks server");
let opt: Opt = Opt::from_args();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), opt.port);
println!("listening on {:?}", addr);
let mut executor = LocalPool::new();
let mut spawner = executor.spawner();
executor.run_until(async move {
let listner = tokio::net::TcpListener::bind(&addr).await.unwrap();
let _result = listner
.incoming()
.for_each(|s| {
match s {
Ok(s) => {
let peer = s.peer_addr().expect("socket peer address");
println!("got tcp connection from {:?}", peer);
if let Err(e) = spawner.spawn(async move {
if let Err(e) = handle_client(s).await {
eprintln!("error handling client {}", e);
};
}) {
eprintln!("spawn error {:?}", e);
}
}
Err(e) => eprintln!("listen error {}", e),
}
futures::future::ready(())
})
.await;
});

let mut listner = tokio::net::TcpListener::bind(&addr).await.unwrap();
loop {
let (s, _) = listner.accept().await.unwrap();
let peer = s.peer_addr().expect("socket peer address");
println!("got tcp connection from {:?}", peer);
tokio::spawn(async move {
if let Err(e) = handle_client(s).await {
eprintln!("error handling client {}", e);
};
});
}
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions asocks5/src/client/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::net::SocketAddrV4;
use std::net::{self, SocketAddr};
use std::time::Duration;
use tokio::net::{TcpStream, UdpSocket};
use tokio_net::driver::Handle;

#[derive(Debug)]
pub struct Socks5Datagram {
Expand Down Expand Up @@ -47,7 +46,7 @@ impl Socks5Datagram {
}
};
let socket = net::UdpSocket::bind(&local)?;
let socket = UdpSocket::from_std(socket, &Handle::default())?;
let socket = UdpSocket::from_std(socket)?;

Ok(Socks5Datagram {
socket,
Expand Down
3 changes: 1 addition & 2 deletions asocks5/src/heads/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::socks::TcpRequestHeader;
use crate::socks::TcpResponseHeader;
use bytes::BufMut;
use bytes::BytesMut;
use futures_util::io::AsyncReadExt;
use futures_util::io::AsyncWriteExt;

use std::net::Shutdown;
use std::net::SocketAddr;
use tokio::net::TcpStream;
Expand Down
2 changes: 0 additions & 2 deletions asocks5/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![feature(async_await)]

mod client;
mod codec;
mod consts;
Expand Down
32 changes: 24 additions & 8 deletions asocks5/src/socks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::u8;

use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use failure::Fail;

use crate::consts;
use crate::consts::Reply;
use crate::Command;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::prelude::*;

Expand Down Expand Up @@ -89,8 +89,8 @@ pub async fn read_socks_address(mut stream: &mut TcpStream) -> Result<Address, S
let mut buf = [0u8; 4 + 2];
stream.read_exact(&mut buf).await?;
let mut cursor = io::Cursor::new(buf);
let v4addr = Ipv4Addr::from(cursor.read_u32::<BigEndian>()?);
let port = cursor.read_u16::<BigEndian>()?;
let v4addr = Ipv4Addr::from(AsyncReadExt::read_u32(&mut cursor).await?);
let port = cursor.read_u16be()?;
let addr = Address::SocketAddress(SocketAddr::V4(SocketAddrV4::new(v4addr, port)));
Ok(addr)
}
Expand All @@ -101,7 +101,7 @@ pub async fn read_socks_address(mut stream: &mut TcpStream) -> Result<Address, S
let mut buf = [0u8; 2];
stream.read_exact(&mut buf).await?;
let mut cursor = io::Cursor::new(buf);
let port = cursor.read_u16::<BigEndian>()?;
let port = cursor.read_u16be()?;

let addr =
Address::SocketAddress(SocketAddr::V6(SocketAddrV6::new(v6addr, port, 0, 0)));
Expand All @@ -116,7 +116,7 @@ pub async fn read_socks_address(mut stream: &mut TcpStream) -> Result<Address, S
let mut raw_addr = [0u8; 257];
stream.read_exact(&mut raw_addr[..addr_len + 2]).await?;
let mut cursor = io::Cursor::new(&raw_addr[addr_len..]);
let port = cursor.read_u16::<BigEndian>()?;
let port = cursor.read_u16be()?;
let addr = String::from_utf8_lossy(&raw_addr[..addr_len]);
let addr = Address::DomainNameAddress(addr.into(), port);
Ok(addr)
Expand All @@ -134,8 +134,8 @@ pub async fn read_socks_socket_addr(mut stream: &mut TcpStream) -> Result<Socket
let mut buf = [0u8; 4 + 2];
stream.read_exact(&mut buf).await?;
let mut cursor = io::Cursor::new(buf);
let v4addr = Ipv4Addr::from(cursor.read_u32::<BigEndian>()?);
let port = cursor.read_u16::<BigEndian>()?;
let v4addr = Ipv4Addr::from(cursor.read_u32be()?);
let port = cursor.read_u16be()?;
let addr = SocketAddr::V4(SocketAddrV4::new(v4addr, port));
Ok(addr)
}
Expand All @@ -146,7 +146,7 @@ pub async fn read_socks_socket_addr(mut stream: &mut TcpStream) -> Result<Socket
let mut buf = [0u8; 2];
stream.read_exact(&mut buf).await?;
let mut cursor = io::Cursor::new(buf);
let port = cursor.read_u16::<BigEndian>()?;
let port = cursor.read_u16be()?;

let addr = SocketAddr::V6(SocketAddrV6::new(v6addr, port, 0, 0));
Ok(addr)
Expand Down Expand Up @@ -245,3 +245,19 @@ pub async fn read_handshake_request(mut s: &mut TcpStream) -> Result<HandshakeRe
pub struct HandshakeResponse {
pub chosen_method: u8,
}

pub(crate) trait CursorRead {
fn read_u16be(&mut self) -> Result<u16, io::Error>;
fn read_u32be(&mut self) -> Result<u32, io::Error>;
}

impl<T: AsRef<[u8]>> CursorRead for io::Cursor<T> {
fn read_u16be(&mut self) -> Result<u16, io::Error> {
use byteorder::ReadBytesExt;
self.read_u16::<BigEndian>()
}
fn read_u32be(&mut self) -> Result<u32, io::Error> {
use byteorder::ReadBytesExt;
self.read_u32::<BigEndian>()
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn run() -> Result<(), i32> {
return Err(100);
}
};
let rt = tokio::runtime::Runtime::new().unwrap();
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let dm = conf.domain_matcher.clone();
let dns = conf.dns.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/relay/forwarding/tcp/copy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::task::Context;
use futures::{Future, Poll as Poll1};
use futures_timer::Delay;
use std::pin::Pin;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite};

/// A future which will copy all data from a reader into a writer.
/// modified version of Copy from tokio
Expand Down
7 changes: 3 additions & 4 deletions src/relay/forwarding/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use std::sync::Arc;

use asocks5::connect_socks_socket_addr;

use tokio::io::split;
use tokio::prelude::*;
use tokio_io::split::split;
use tokio_net::driver::Handle;

pub async fn handle_incoming_tcp(
mut client_stream: TcpStream,
Expand Down Expand Up @@ -56,7 +55,7 @@ async fn carry_out(
data: Bytes,
a: SocketAddr,
r: RoutingAction,
client_stream: TcpStream,
mut client_stream: TcpStream,
pr: TcpProtocol,
) -> Result<(), Error> {
let mut s = match r {
Expand All @@ -67,7 +66,7 @@ async fn carry_out(
RoutingAction::Named(ref g) => match g.val().addr() {
EgressAddr::From(ip) => {
let x = bind_tcp_socket(ip)?;
tokio::net::TcpStream::connect_std(x, &a, &Handle::default())
tokio::net::TcpStream::connect_std(x, &a)
.await
.map_err(|e| {
format_err!(
Expand Down
28 changes: 14 additions & 14 deletions src/relay/listen/socks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ pub async fn listen_socks(
resolver: Arc<AsyncResolver>,
router: Arc<TcpRouter>,
) -> Result<(), Error> {
let l = tokio::net::TcpListener::bind(addr).await?;
let l = l.incoming();
let f = l.for_each(move |s| {
match s {
Ok(s) => {
let r1 = resolver.clone();
let rt1 = router.clone();
tokio::spawn(async move {
let _r = handle_client(s, r1, rt1).await.map_err(|e| {
error!("error handling client {}", e);
let mut l = tokio::net::TcpListener::bind(addr).await?;
tokio::spawn(async move {
loop {
let s = l.accept().await;
match s {
Ok((s, _addr)) => {
let r1 = resolver.clone();
let rt1 = router.clone();
tokio::spawn(async move {
let _r = handle_client(s, r1, rt1).await.map_err(|e| {
error!("error handling client {}", e);
});
});
});
}
Err(e) => error!("error incoming {:?}", e),
}
Err(e) => error!("error incoming {:?}", e),
}
ready(())
});
tokio::spawn(f);
Ok(())
}

Expand Down
18 changes: 17 additions & 1 deletion src/resolver/client/socks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl SockGetterAsync {
stream.read_exact(&mut b).await?;
trace!("Read reply length {:?}", b);
let mut rdr = io::Cursor::new(b);
let len = rdr.read_u16::<BigEndian>().expect("read u16");
let len = rdr.read_u16be().expect("read u16");
trace!("Reply length is {}", len);
let mut buf = data;
buf.resize(len as usize, 0);
Expand All @@ -90,3 +90,19 @@ fn ns_sock_addr(ns: &NameServerRemote) -> SocketAddr {
NameServerRemote::Tcp(a) => *a,
}
}

pub(crate) trait CursorRead {
fn read_u16be(&mut self) -> Result<u16, io::Error>;
fn read_u32be(&mut self) -> Result<u32, io::Error>;
}

impl<T: AsRef<[u8]>> CursorRead for io::Cursor<T> {
fn read_u16be(&mut self) -> Result<u16, io::Error> {
use byteorder::ReadBytesExt;
self.read_u16::<BigEndian>()
}
fn read_u32be(&mut self) -> Result<u32, io::Error> {
use byteorder::ReadBytesExt;
self.read_u32::<BigEndian>()
}
}
2 changes: 1 addition & 1 deletion src/resolver/client/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio_net::driver::Handle;
pub async fn udp_get(addr: &SocketAddr, data: Vec<u8>) -> io::Result<Vec<u8>> {
let uss = UdpSocketStd::bind("0.0.0.0:0")?;
uss.set_read_timeout(Some(Duration::from_secs(TIMEOUT)))?;
let mut ust = UdpSocketTokio::from_std(uss, &Handle::default())?;
let mut ust = UdpSocketTokio::from_std(uss)?;
ust.send_to(&data, addr).await?;
let mut buf = data;
buf.resize(998, 0);
Expand Down
2 changes: 1 addition & 1 deletion src/resolver/dnsclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn ns_sock_addr(ns: &NameServerRemote) -> SocketAddr {
pub async fn udp_bind_get(addr: SocketAddr, ip: IpAddr, data: Vec<u8>) -> io::Result<Vec<u8>> {
let s = StdUdpSocket::bind(SocketAddr::from((ip, 0)))?;
s.set_read_timeout(Some(Duration::from_secs(TIMEOUT)))?;
let mut s = UdpSocket::from_std(s, &Handle::default())?;
let mut s = UdpSocket::from_std(s)?;
s.send_to(&data, &addr).await?;
let mut buf = vec![0; 998];
let (nb, _a) = s.recv_from(&mut buf).await?;
Expand Down
10 changes: 3 additions & 7 deletions src/resolver/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ use super::dnsclient::DnsClient;
use crate::conf::NameServer;

use failure::Error;
use failure::_core::time::Duration;
use std::net::IpAddr;
use std::str::FromStr;
use tokio::time::timeout;
use trust_dns::op::Message;
use trust_dns::op::Query;
use trust_dns::proto::rr::Name;
use trust_dns::rr::RecordType;
use trust_dns::serialize::binary::{BinDecodable, BinDecoder};

use failure::_core::time::Duration;
use tokio::future::FutureExt;

pub struct AsyncResolver {
client: DnsClient,
}
Expand All @@ -23,10 +22,7 @@ impl AsyncResolver {
}

pub async fn resolve(&self, name: &str) -> Result<Vec<IpAddr>, Error> {
let result = self
.resolve_eternal(name)
.timeout(Duration::from_secs(10))
.await?;
let result = timeout(Duration::from_secs(10), self.resolve_eternal(name)).await?;
result
}
async fn resolve_eternal(&self, name: &str) -> Result<Vec<IpAddr>, Error> {
Expand Down
2 changes: 1 addition & 1 deletion src/resolver/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn serve(conf: DnsProxy, matcher: Arc<DomainMatcher>) -> Result<(), Error> {
}

async fn recv_req(sock: &UdpSocketStd) -> io::Result<(UdpSocket, Vec<u8>, usize, SocketAddr)> {
let mut sock_tok = UdpSocket::from_std(sock.try_clone()?, &Handle::default())?;
let mut sock_tok = UdpSocket::from_std(sock.try_clone()?)?;
let mut buf = vec![0; 998];
let (n, a) = sock_tok.recv_from(&mut buf).await?;
Ok((sock_tok, buf, n, a))
Expand Down

0 comments on commit 8d3aea2

Please sign in to comment.