From 9879a8e1fea130bddcc5281d3d7c0ff361103581 Mon Sep 17 00:00:00 2001 From: bit-ranger Date: Mon, 18 Jan 2021 17:16:10 +0800 Subject: [PATCH] feat(async): upgrade to 1.8.0 --- client/Cargo.toml | 2 +- client/src/tunnel.rs | 44 ++++++++++++++++++++-------------------- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/tunnel.rs | 48 ++++++++++++++++++++++---------------------- 5 files changed, 49 insertions(+), 49 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 5f454de..c7b1b8f 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT" [dependencies] common = {path="../common"} -async-std = { version = "~1.2.0", features = ["unstable"] } +async-std = { version = "1.8.0", features = ["unstable"] } log = { version = "0.4.8", features = ["std"] } time = "0.1.42" getopts = "0.2.21" diff --git a/client/src/tunnel.rs b/client/src/tunnel.rs index df3714d..4061c19 100644 --- a/client/src/tunnel.rs +++ b/client/src/tunnel.rs @@ -1,6 +1,6 @@ use std::net::{Shutdown}; use async_std::net::TcpStream; -use async_std::sync::{channel, Receiver, Sender}; +use async_std::channel::{bounded, Receiver, Sender}; use async_std::task; use std::time::Duration; use async_std::io::{Read, Write}; @@ -25,8 +25,8 @@ impl Tunnel { pub async fn open_entry(&mut self) -> Entry { let entry_id = self.entry_id_current; self.entry_id_current += 1; - let (entry_sender, entry_receiver) = channel(999); - self.sender.send(Message::CS(Cs::EntryOpen(entry_id, entry_sender))).await; + let (entry_sender, entry_receiver) = bounded(999); + let _ = self.sender.send(Message::CS(Cs::EntryOpen(entry_id, entry_sender))).await; Entry { id: entry_id, @@ -47,31 +47,31 @@ pub struct Entry { impl Entry { pub async fn write(&self, buf: Vec) { - self.tunnel_sender.send(Message::CS(Cs::Data(self.id, buf))).await; + let _ = self.tunnel_sender.send(Message::CS(Cs::Data(self.id, buf))).await; } pub async fn connect_ip4(&self, address: Vec) { - self.tunnel_sender.send(Message::CS(Cs::ConnectIp4(self.id, address))).await; + let _ = self.tunnel_sender.send(Message::CS(Cs::ConnectIp4(self.id, address))).await; } pub async fn connect_domain_name(&self, domain_name: Vec, port: u16) { - self.tunnel_sender + let _ = self.tunnel_sender .send(Message::CS(Cs::ConnectDomainName(self.id, domain_name, port))) .await; } pub async fn eof(&self) { - self.tunnel_sender.send(Message::CS(Cs::Eof(self.id))).await; + let _ = self.tunnel_sender.send(Message::CS(Cs::Eof(self.id))).await; } pub async fn close(&self) { - self.tunnel_sender.send(Message::CS(Cs::EntryClose(self.id))).await; + let _ = self.tunnel_sender.send(Message::CS(Cs::EntryClose(self.id))).await; } pub async fn read(&self) -> EntryMessage { match self.entry_receiver.recv().await { - Some(msg) => msg, - None => EntryMessage::Close, + Ok(msg) => msg, + Err(_) => EntryMessage::Close, } } } @@ -80,7 +80,7 @@ pub struct TcpTunnel; impl TcpTunnel { pub fn new(config: &Config, tunnel_id: u32) -> Tunnel { - let (s, r) = channel(10000); + let (s, r) = bounded(10000); let s1 = s.clone(); let config = config.clone(); task::spawn(async move { @@ -141,7 +141,7 @@ impl TcpTunnel { warn!("{}: tunnel broken", tunnel_id); for (_, value) in entry_map.iter() { - value.sender.send(EntryMessage::Close).await; + let _ = value.sender.send(EntryMessage::Close).await; } } } @@ -165,7 +165,7 @@ async fn server_stream_to_tunnel( let op = op[0]; if op == sc::HEARTBEAT { - tunnel_sender.send(Message::SC(Sc::Heartbeat)).await; + let _ = tunnel_sender.send(Message::SC(Sc::Heartbeat)).await; continue; } @@ -175,11 +175,11 @@ async fn server_stream_to_tunnel( match op { sc::ENTRY_CLOSE => { - tunnel_sender.send(Message::SC(Sc::EntryClose(id))).await; + let _ = tunnel_sender.send(Message::SC(Sc::EntryClose(id))).await; } sc::EOF => { - tunnel_sender.send(Message::SC(Sc::Eof(id))).await; + let _ = tunnel_sender.send(Message::SC(Sc::Eof(id))).await; } sc::CONNECT_OK | sc::DATA => { @@ -193,9 +193,9 @@ async fn server_stream_to_tunnel( let data = cryptor.decrypt(&buf); if op == sc::CONNECT_OK { - tunnel_sender.send(Message::SC(Sc::ConnectOk(id, data))).await; + let _ = tunnel_sender.send(Message::SC(Sc::ConnectOk(id, data))).await; } else { - tunnel_sender.send(Message::SC(Sc::Data(id, data))).await; + let _ = tunnel_sender.send(Message::SC(Sc::Data(id, data))).await; } } @@ -314,7 +314,7 @@ async fn process_tunnel_message( match entry_map.get(&id) { Some(value) => { info!("{}.{}: client close {}:{}", tid, id, value.host, value.port); - value.sender.send(EntryMessage::Close).await; + let _ = value.sender.send(EntryMessage::Close).await; server_stream.write_all(&pack_cs_entry_close(id)).await?; } @@ -343,7 +343,7 @@ async fn process_tunnel_message( Some(entry) => { info!("{}.{}: server close {}:{}", tid, id, entry.host, entry.port); - entry.sender.send(EntryMessage::Close).await; + let _ = entry.sender.send(EntryMessage::Close).await; } None => { @@ -364,7 +364,7 @@ async fn process_tunnel_message( tid, id, entry.host, entry.port ); - entry.sender.send(EntryMessage::Eof).await; + let _ = entry.sender.send(EntryMessage::Eof).await; } None => { @@ -380,7 +380,7 @@ async fn process_tunnel_message( Some(value) => { info!("{}.{}: connect ok {}:{}", tid, id, value.host, value.port); - value.sender.send(EntryMessage::ConnectOk(buf)).await; + let _ = value.sender.send(EntryMessage::ConnectOk(buf)).await; } None => { @@ -393,7 +393,7 @@ async fn process_tunnel_message( Sc::Data(id, buf) => { *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value.sender.send(EntryMessage::Data(buf)).await; + let _ = value.sender.send(EntryMessage::Data(buf)).await; }; } } diff --git a/common/Cargo.toml b/common/Cargo.toml index d6bcd46..54b2c74 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-std = { version = "~1.2.0", features = ["unstable"] } +async-std = { version = "1.8.0", features = ["unstable"] } futures-timer = "1.0.2" log = { version = "0.4.8", features = ["std"] } time = "0.1.42" diff --git a/server/Cargo.toml b/server/Cargo.toml index ca01f9d..82edbed 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] common = {path="../common"} -async-std = { version = "~1.2.0", features = ["unstable"] } +async-std = { version = "1.8.0", features = ["unstable"] } time = "0.1.42" getopts = "0.2.21" log = { version = "0.4.8", features = ["std"] } \ No newline at end of file diff --git a/server/src/tunnel.rs b/server/src/tunnel.rs index 2e6fc19..b6ead37 100644 --- a/server/src/tunnel.rs +++ b/server/src/tunnel.rs @@ -7,7 +7,7 @@ use std::vec::Vec; use async_std::io::{Read, Write}; use async_std::net::TcpStream; use async_std::prelude::*; -use async_std::sync::{channel, Receiver, Sender}; +use async_std::channel::{bounded, Receiver, Sender}; use async_std::task; use time::{get_time, Timespec}; @@ -41,13 +41,13 @@ impl TcpTunnel { let tunnel_id:u32 = u32::from_be_bytes(tid_bytes_be); info!("{}: tunnel connect ok", tunnel_id); - let (tunnel_sender, tunnel_receiver) = channel(10000); + let (tunnel_sender, tunnel_receiver) = bounded(10000); let mut entry_map = EntryMap::new(); let r = async { let cstt = client_stream_to_tunnel(config, tunnel_id, client_stream0, tunnel_sender.clone()).await; warn!("{}: tunnel broken, cstt {:?}", tunnel_id, cstt.err()); - tunnel_sender.send(Message::SC(Sc::CloseTunnel)).await; + let _ = tunnel_sender.send(Message::SC(Sc::CloseTunnel)).await; let _ = client_stream.shutdown(Shutdown::Both); }; let w = async { @@ -59,7 +59,7 @@ impl TcpTunnel { let _ = r.join(w).await; for (_, value) in entry_map.iter() { - value.sender.send(EntryMessage::Close).await; + let _ = value.sender.send(EntryMessage::Close).await; } } } @@ -75,25 +75,25 @@ pub struct Entry { impl Entry { async fn connect_ok(&self, buf: Vec) { - self.tunnel_sender.send(Message::SC(Sc::ConnectOk(self.id, buf))).await; + let _ = self.tunnel_sender.send(Message::SC(Sc::ConnectOk(self.id, buf))).await; } async fn write(&self, buf: Vec) { - self.tunnel_sender.send(Message::SC(Sc::Data(self.id, buf))).await; + let _ = self.tunnel_sender.send(Message::SC(Sc::Data(self.id, buf))).await; } async fn eof(&self) { - self.tunnel_sender.send(Message::SC(Sc::Eof(self.id))).await; + let _ = self.tunnel_sender.send(Message::SC(Sc::Eof(self.id))).await; } async fn close(&self) { - self.tunnel_sender.send(Message::SC(Sc::EntryClose(self.id))).await; + let _ = self.tunnel_sender.send(Message::SC(Sc::EntryClose(self.id))).await; } async fn read(&self) -> EntryMessage { match self.entry_receiver.recv().await { - Some(msg) => msg, - None => EntryMessage::Close, + Ok(msg) => msg, + Err(_)=> EntryMessage::Close, } } } @@ -237,7 +237,7 @@ async fn client_stream_to_tunnel( let op = op[0]; if op == cs::HEARTBEAT { - tunnel_sender.send(Message::CS(Cs::Heartbeat)).await; + let _ = tunnel_sender.send(Message::CS(Cs::Heartbeat)).await; continue; } @@ -247,15 +247,15 @@ async fn client_stream_to_tunnel( match op { cs::ENTRY_OPEN => { - tunnel_sender.send(Message::CS(Cs::EntryOpen(id))).await; + let _ = tunnel_sender.send(Message::CS(Cs::EntryOpen(id))).await; } cs::ENTRY_CLOSE => { - tunnel_sender.send(Message::CS(Cs::EntryClose(id))).await; + let _ = tunnel_sender.send(Message::CS(Cs::EntryClose(id))).await; } cs::EOF => { - tunnel_sender.send(Message::CS(Cs::Eof(id))).await; + let _ = tunnel_sender.send(Message::CS(Cs::Eof(id))).await; } cs::CONNECT_DOMAIN_NAME => { @@ -270,7 +270,7 @@ async fn client_stream_to_tunnel( let domain_name = cryptor.decrypt(&buf[0..pos]); let port = u16::from_be(unsafe { *(buf[pos..].as_ptr() as *const u16) }); - tunnel_sender + let _ = tunnel_sender .send(Message::CS(Cs::ConnectDomainName(id, domain_name, port))) .await; } @@ -284,7 +284,7 @@ async fn client_stream_to_tunnel( client_stream.read_exact(&mut buf).await?; let data = cryptor.decrypt(&buf); - tunnel_sender + let _ = tunnel_sender .send(Message::CS(Cs::ConnectIp4(id, data))) .await; } @@ -298,7 +298,7 @@ async fn client_stream_to_tunnel( client_stream.read_exact(&mut buf).await?; let data = cryptor.decrypt(&buf); - tunnel_sender.send(Message::CS(Cs::Data(id, data))).await; + let _ = tunnel_sender.send(Message::CS(Cs::Data(id, data))).await; } } } @@ -363,7 +363,7 @@ async fn process_tunnel_message( Cs::EntryOpen(id) => { *alive_time = get_time(); - let (es, er) = channel(1000); + let (es, er) = bounded(1000); entry_map.insert(id, EntryInternal { sender: es }); let entry = Entry { @@ -382,7 +382,7 @@ async fn process_tunnel_message( *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value.sender.send(EntryMessage::Close).await; + let _ = value.sender.send(EntryMessage::Close).await; }; entry_map.remove(&id); @@ -392,7 +392,7 @@ async fn process_tunnel_message( *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value.sender.send(EntryMessage::Eof).await; + let _ = value.sender.send(EntryMessage::Eof).await; }; } @@ -400,7 +400,7 @@ async fn process_tunnel_message( *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value + let _ = value .sender .send(EntryMessage::ConnectDomainName(domain_name, port)) .await; @@ -411,7 +411,7 @@ async fn process_tunnel_message( *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value + let _ = value .sender .send(EntryMessage::ConnectIp(address)) .await; @@ -422,7 +422,7 @@ async fn process_tunnel_message( *alive_time = get_time(); if let Some(value) = entry_map.get(&id) { - value.sender.send(EntryMessage::Data(buf)).await; + let _ = value.sender.send(EntryMessage::Data(buf)).await; }; } } @@ -432,7 +432,7 @@ async fn process_tunnel_message( match sc { Sc::EntryClose(id) => { if let Some(value) = entry_map.get(&id) { - value.sender.send(EntryMessage::Close).await; + let _ = value.sender.send(EntryMessage::Close).await; client_stream.write_all(&pack_sc_entry_close(id)).await?; };