Skip to content

Commit

Permalink
feat(async): upgrade to 1.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bit-ranger committed Jan 18, 2021
1 parent 773dcd8 commit 9879a8e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 49 deletions.
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "MIT"

[dependencies]
common = {path="../common"}
async-std = { version = "~1.2.0", features = ["unstable"] }
async-std = { version = "1.8.0", features = ["unstable"] }
log = { version = "0.4.8", features = ["std"] }
time = "0.1.42"
getopts = "0.2.21"
44 changes: 22 additions & 22 deletions client/src/tunnel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::{Shutdown};
use async_std::net::TcpStream;
use async_std::sync::{channel, Receiver, Sender};
use async_std::channel::{bounded, Receiver, Sender};
use async_std::task;
use std::time::Duration;
use async_std::io::{Read, Write};
Expand All @@ -25,8 +25,8 @@ impl Tunnel {
pub async fn open_entry(&mut self) -> Entry {
let entry_id = self.entry_id_current;
self.entry_id_current += 1;
let (entry_sender, entry_receiver) = channel(999);
self.sender.send(Message::CS(Cs::EntryOpen(entry_id, entry_sender))).await;
let (entry_sender, entry_receiver) = bounded(999);
let _ = self.sender.send(Message::CS(Cs::EntryOpen(entry_id, entry_sender))).await;

Entry {
id: entry_id,
Expand All @@ -47,31 +47,31 @@ pub struct Entry {
impl Entry {

pub async fn write(&self, buf: Vec<u8>) {
self.tunnel_sender.send(Message::CS(Cs::Data(self.id, buf))).await;
let _ = self.tunnel_sender.send(Message::CS(Cs::Data(self.id, buf))).await;
}

pub async fn connect_ip4(&self, address: Vec<u8>) {
self.tunnel_sender.send(Message::CS(Cs::ConnectIp4(self.id, address))).await;
let _ = self.tunnel_sender.send(Message::CS(Cs::ConnectIp4(self.id, address))).await;
}

pub async fn connect_domain_name(&self, domain_name: Vec<u8>, port: u16) {
self.tunnel_sender
let _ = self.tunnel_sender
.send(Message::CS(Cs::ConnectDomainName(self.id, domain_name, port)))
.await;
}

pub async fn eof(&self) {
self.tunnel_sender.send(Message::CS(Cs::Eof(self.id))).await;
let _ = self.tunnel_sender.send(Message::CS(Cs::Eof(self.id))).await;
}

pub async fn close(&self) {
self.tunnel_sender.send(Message::CS(Cs::EntryClose(self.id))).await;
let _ = self.tunnel_sender.send(Message::CS(Cs::EntryClose(self.id))).await;
}

pub async fn read(&self) -> EntryMessage {
match self.entry_receiver.recv().await {
Some(msg) => msg,
None => EntryMessage::Close,
Ok(msg) => msg,
Err(_) => EntryMessage::Close,
}
}
}
Expand All @@ -80,7 +80,7 @@ pub struct TcpTunnel;

impl TcpTunnel {
pub fn new(config: &Config, tunnel_id: u32) -> Tunnel {
let (s, r) = channel(10000);
let (s, r) = bounded(10000);
let s1 = s.clone();
let config = config.clone();
task::spawn(async move {
Expand Down Expand Up @@ -141,7 +141,7 @@ impl TcpTunnel {
warn!("{}: tunnel broken", tunnel_id);

for (_, value) in entry_map.iter() {
value.sender.send(EntryMessage::Close).await;
let _ = value.sender.send(EntryMessage::Close).await;
}
}
}
Expand All @@ -165,7 +165,7 @@ async fn server_stream_to_tunnel<R: Read + Unpin>(
let op = op[0];

if op == sc::HEARTBEAT {
tunnel_sender.send(Message::SC(Sc::Heartbeat)).await;
let _ = tunnel_sender.send(Message::SC(Sc::Heartbeat)).await;
continue;
}

Expand All @@ -175,11 +175,11 @@ async fn server_stream_to_tunnel<R: Read + Unpin>(

match op {
sc::ENTRY_CLOSE => {
tunnel_sender.send(Message::SC(Sc::EntryClose(id))).await;
let _ = tunnel_sender.send(Message::SC(Sc::EntryClose(id))).await;
}

sc::EOF => {
tunnel_sender.send(Message::SC(Sc::Eof(id))).await;
let _ = tunnel_sender.send(Message::SC(Sc::Eof(id))).await;
}

sc::CONNECT_OK | sc::DATA => {
Expand All @@ -193,9 +193,9 @@ async fn server_stream_to_tunnel<R: Read + Unpin>(
let data = cryptor.decrypt(&buf);

if op == sc::CONNECT_OK {
tunnel_sender.send(Message::SC(Sc::ConnectOk(id, data))).await;
let _ = tunnel_sender.send(Message::SC(Sc::ConnectOk(id, data))).await;
} else {
tunnel_sender.send(Message::SC(Sc::Data(id, data))).await;
let _ = tunnel_sender.send(Message::SC(Sc::Data(id, data))).await;
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
match entry_map.get(&id) {
Some(value) => {
info!("{}.{}: client close {}:{}", tid, id, value.host, value.port);
value.sender.send(EntryMessage::Close).await;
let _ = value.sender.send(EntryMessage::Close).await;
server_stream.write_all(&pack_cs_entry_close(id)).await?;
}

Expand Down Expand Up @@ -343,7 +343,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
Some(entry) => {
info!("{}.{}: server close {}:{}", tid, id, entry.host, entry.port);

entry.sender.send(EntryMessage::Close).await;
let _ = entry.sender.send(EntryMessage::Close).await;
}

None => {
Expand All @@ -364,7 +364,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
tid, id, entry.host, entry.port
);

entry.sender.send(EntryMessage::Eof).await;
let _ = entry.sender.send(EntryMessage::Eof).await;
}

None => {
Expand All @@ -380,7 +380,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
Some(value) => {
info!("{}.{}: connect ok {}:{}", tid, id, value.host, value.port);

value.sender.send(EntryMessage::ConnectOk(buf)).await;
let _ = value.sender.send(EntryMessage::ConnectOk(buf)).await;
}

None => {
Expand All @@ -393,7 +393,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
Sc::Data(id, buf) => {
*alive_time = get_time();
if let Some(value) = entry_map.get(&id) {
value.sender.send(EntryMessage::Data(buf)).await;
let _ = value.sender.send(EntryMessage::Data(buf)).await;
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { version = "~1.2.0", features = ["unstable"] }
async-std = { version = "1.8.0", features = ["unstable"] }
futures-timer = "1.0.2"
log = { version = "0.4.8", features = ["std"] }
time = "0.1.42"
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"

[dependencies]
common = {path="../common"}
async-std = { version = "~1.2.0", features = ["unstable"] }
async-std = { version = "1.8.0", features = ["unstable"] }
time = "0.1.42"
getopts = "0.2.21"
log = { version = "0.4.8", features = ["std"] }
48 changes: 24 additions & 24 deletions server/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::vec::Vec;
use async_std::io::{Read, Write};
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
use async_std::channel::{bounded, Receiver, Sender};
use async_std::task;

use time::{get_time, Timespec};
Expand Down Expand Up @@ -41,13 +41,13 @@ impl TcpTunnel {
let tunnel_id:u32 = u32::from_be_bytes(tid_bytes_be);
info!("{}: tunnel connect ok", tunnel_id);

let (tunnel_sender, tunnel_receiver) = channel(10000);
let (tunnel_sender, tunnel_receiver) = bounded(10000);
let mut entry_map = EntryMap::new();

let r = async {
let cstt = client_stream_to_tunnel(config, tunnel_id, client_stream0, tunnel_sender.clone()).await;
warn!("{}: tunnel broken, cstt {:?}", tunnel_id, cstt.err());
tunnel_sender.send(Message::SC(Sc::CloseTunnel)).await;
let _ = tunnel_sender.send(Message::SC(Sc::CloseTunnel)).await;
let _ = client_stream.shutdown(Shutdown::Both);
};
let w = async {
Expand All @@ -59,7 +59,7 @@ impl TcpTunnel {
let _ = r.join(w).await;

for (_, value) in entry_map.iter() {
value.sender.send(EntryMessage::Close).await;
let _ = value.sender.send(EntryMessage::Close).await;
}
}
}
Expand All @@ -75,25 +75,25 @@ pub struct Entry {

impl Entry {
async fn connect_ok(&self, buf: Vec<u8>) {
self.tunnel_sender.send(Message::SC(Sc::ConnectOk(self.id, buf))).await;
let _ = self.tunnel_sender.send(Message::SC(Sc::ConnectOk(self.id, buf))).await;
}

async fn write(&self, buf: Vec<u8>) {
self.tunnel_sender.send(Message::SC(Sc::Data(self.id, buf))).await;
let _ = self.tunnel_sender.send(Message::SC(Sc::Data(self.id, buf))).await;
}

async fn eof(&self) {
self.tunnel_sender.send(Message::SC(Sc::Eof(self.id))).await;
let _ = self.tunnel_sender.send(Message::SC(Sc::Eof(self.id))).await;
}

async fn close(&self) {
self.tunnel_sender.send(Message::SC(Sc::EntryClose(self.id))).await;
let _ = self.tunnel_sender.send(Message::SC(Sc::EntryClose(self.id))).await;
}

async fn read(&self) -> EntryMessage {
match self.entry_receiver.recv().await {
Some(msg) => msg,
None => EntryMessage::Close,
Ok(msg) => msg,
Err(_)=> EntryMessage::Close,
}
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn client_stream_to_tunnel<R: Read + Unpin>(
let op = op[0];

if op == cs::HEARTBEAT {
tunnel_sender.send(Message::CS(Cs::Heartbeat)).await;
let _ = tunnel_sender.send(Message::CS(Cs::Heartbeat)).await;
continue;
}

Expand All @@ -247,15 +247,15 @@ async fn client_stream_to_tunnel<R: Read + Unpin>(

match op {
cs::ENTRY_OPEN => {
tunnel_sender.send(Message::CS(Cs::EntryOpen(id))).await;
let _ = tunnel_sender.send(Message::CS(Cs::EntryOpen(id))).await;
}

cs::ENTRY_CLOSE => {
tunnel_sender.send(Message::CS(Cs::EntryClose(id))).await;
let _ = tunnel_sender.send(Message::CS(Cs::EntryClose(id))).await;
}

cs::EOF => {
tunnel_sender.send(Message::CS(Cs::Eof(id))).await;
let _ = tunnel_sender.send(Message::CS(Cs::Eof(id))).await;
}

cs::CONNECT_DOMAIN_NAME => {
Expand All @@ -270,7 +270,7 @@ async fn client_stream_to_tunnel<R: Read + Unpin>(
let domain_name = cryptor.decrypt(&buf[0..pos]);
let port = u16::from_be(unsafe { *(buf[pos..].as_ptr() as *const u16) });

tunnel_sender
let _ = tunnel_sender
.send(Message::CS(Cs::ConnectDomainName(id, domain_name, port)))
.await;
}
Expand All @@ -284,7 +284,7 @@ async fn client_stream_to_tunnel<R: Read + Unpin>(
client_stream.read_exact(&mut buf).await?;

let data = cryptor.decrypt(&buf);
tunnel_sender
let _ = tunnel_sender
.send(Message::CS(Cs::ConnectIp4(id, data)))
.await;
}
Expand All @@ -298,7 +298,7 @@ async fn client_stream_to_tunnel<R: Read + Unpin>(
client_stream.read_exact(&mut buf).await?;

let data = cryptor.decrypt(&buf);
tunnel_sender.send(Message::CS(Cs::Data(id, data))).await;
let _ = tunnel_sender.send(Message::CS(Cs::Data(id, data))).await;
}
}
}
Expand Down Expand Up @@ -363,7 +363,7 @@ async fn process_tunnel_message<W: Write + Unpin>(

Cs::EntryOpen(id) => {
*alive_time = get_time();
let (es, er) = channel(1000);
let (es, er) = bounded(1000);
entry_map.insert(id, EntryInternal { sender: es });

let entry = Entry {
Expand All @@ -382,7 +382,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
*alive_time = get_time();

if let Some(value) = entry_map.get(&id) {
value.sender.send(EntryMessage::Close).await;
let _ = value.sender.send(EntryMessage::Close).await;
};

entry_map.remove(&id);
Expand All @@ -392,15 +392,15 @@ async fn process_tunnel_message<W: Write + Unpin>(
*alive_time = get_time();

if let Some(value) = entry_map.get(&id) {
value.sender.send(EntryMessage::Eof).await;
let _ = value.sender.send(EntryMessage::Eof).await;
};
}

Cs::ConnectDomainName(id, domain_name, port) => {
*alive_time = get_time();

if let Some(value) = entry_map.get(&id) {
value
let _ = value
.sender
.send(EntryMessage::ConnectDomainName(domain_name, port))
.await;
Expand All @@ -411,7 +411,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
*alive_time = get_time();

if let Some(value) = entry_map.get(&id) {
value
let _ = value
.sender
.send(EntryMessage::ConnectIp(address))
.await;
Expand All @@ -422,7 +422,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
*alive_time = get_time();

if let Some(value) = entry_map.get(&id) {
value.sender.send(EntryMessage::Data(buf)).await;
let _ = value.sender.send(EntryMessage::Data(buf)).await;
};
}
}
Expand All @@ -432,7 +432,7 @@ async fn process_tunnel_message<W: Write + Unpin>(
match sc {
Sc::EntryClose(id) => {
if let Some(value) = entry_map.get(&id) {
value.sender.send(EntryMessage::Close).await;
let _ = value.sender.send(EntryMessage::Close).await;
client_stream.write_all(&pack_sc_entry_close(id)).await?;
};

Expand Down

0 comments on commit 9879a8e

Please sign in to comment.