Skip to content
This repository was archived by the owner on Jan 25, 2025. It is now read-only.

Commit 46f5241

Browse files
Disabled timeout for server (#81)
- server does not require connection mode in UPP_CASE anymore - client does not crash any more if upstream offline - server does no longer just abandon TCP connections after 60 seconds of inactivity
1 parent c89e7c2 commit 46f5241

File tree

8 files changed

+18
-40
lines changed

8 files changed

+18
-40
lines changed

zia-client/src/handler.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ impl<U: Upstream + Send> UdpHandler<U> {
3838
Entry::Vacant(vacant) => {
3939
info!("New socket at {}/udp, opening upstream connection...", addr);
4040

41-
let (sink, mut stream) = self.upstream.open().await?;
41+
let (sink, mut stream) = match self.upstream.open().await {
42+
Ok(conn) => conn,
43+
Err(err) => {
44+
warn!("Error while opening upstream connection: {err}");
45+
continue;
46+
}
47+
};
4248

4349
let known = self.known.clone();
4450
let socket = socket.clone();

zia-client/src/upstream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub(crate) async fn transmit(
1313
upstream: &Url,
1414
proxy: &Option<Url>,
1515
) -> anyhow::Result<()> {
16-
match upstream.scheme() {
16+
match upstream.scheme().to_lowercase().as_str() {
1717
"tcp" | "tcps" => tcp::transmit(socket, upstream, proxy).await,
1818
"ws" | "wss" => ws::transmit(socket, upstream, proxy).await,
1919
_ => Err(anyhow!("Unsupported upstream scheme {}", upstream.scheme())),

zia-common/src/forwarding/tcp.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::convert::{Infallible, TryFrom};
44
use std::mem;
55
use std::sync::Arc;
6-
use std::time::Duration;
76

87
use anyhow::Context;
98
use futures_util::future::select;
@@ -12,7 +11,7 @@ use tokio::io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
1211
use tokio::net::{TcpStream, UdpSocket};
1312
use tracing::error;
1413

15-
use crate::{maybe_timeout, Stream};
14+
use crate::{Stream};
1615

1716
/// A UDP datagram header has a 16 bit field containing an unsigned integer
1817
/// describing the length of the datagram (including the header itself).
@@ -29,14 +28,13 @@ const HEADER_LEN: usize = mem::size_of::<u16>();
2928
pub async fn process_udp_over_tcp(
3029
udp_socket: UdpSocket,
3130
tcp_stream: Stream<TcpStream>,
32-
tcp_recv_timeout: Option<Duration>,
3331
) {
3432
let udp_in = Arc::new(udp_socket);
3533
let udp_out = udp_in.clone();
3634
let (tcp_in, tcp_out) = split(tcp_stream);
3735

3836
let tcp2udp = tokio::spawn(async move {
39-
if let Err(error) = process_tcp2udp(tcp_in, udp_out, tcp_recv_timeout).await {
37+
if let Err(error) = process_tcp2udp(tcp_in, udp_out).await {
4038
error!("Error: {}", error);
4139
}
4240
});
@@ -58,16 +56,14 @@ pub async fn process_udp_over_tcp(
5856
/// Returns if the TCP socket is closed, or an IO error happens on either socket.
5957
async fn process_tcp2udp(
6058
mut tcp_in: ReadHalf<Stream<TcpStream>>,
61-
udp_out: Arc<UdpSocket>,
62-
tcp_recv_timeout: Option<Duration>,
59+
udp_out: Arc<UdpSocket>
6360
) -> anyhow::Result<()> {
6461
let mut buffer = datagram_buffer();
6562
// `buffer` has unprocessed data from the TCP socket up until this index.
6663
let mut unprocessed_i = 0;
6764
loop {
68-
let tcp_read_len = maybe_timeout(tcp_recv_timeout, tcp_in.read(&mut buffer[unprocessed_i..]))
65+
let tcp_read_len = tcp_in.read(&mut buffer[unprocessed_i..])
6966
.await
70-
.context("Timeout while reading from TCP")?
7167
.context("Failed reading from TCP")?;
7268
if tcp_read_len == 0 {
7369
break;

zia-common/src/forwarding/ws.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::convert::Infallible;
44
use std::mem;
55
use std::sync::Arc;
6-
use std::time::Duration;
76

87
use anyhow::Context;
98
use futures_util::future::select;
@@ -17,7 +16,7 @@ use tokio_tungstenite::tungstenite::Message;
1716
use tokio_tungstenite::WebSocketStream;
1817
use tracing::error;
1918

20-
use crate::{maybe_timeout, Stream};
19+
use crate::{Stream};
2120

2221
/// A UDP datagram header has a 16 bit field containing an unsigned integer
2322
/// describing the length of the datagram (including the header itself).
@@ -32,7 +31,6 @@ const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - mem::size_of::<u16>();
3231
pub async fn process_udp_over_ws(
3332
udp_socket: UdpSocket,
3433
ws_stream: WebSocketStream<Stream<TcpStream>>,
35-
ws_recv_timeout: Option<Duration>,
3634
) {
3735
let (mut ws_out, mut ws_in) = ws_stream.split();
3836

@@ -41,7 +39,7 @@ pub async fn process_udp_over_ws(
4139
let udp_out = udp_in.clone();
4240

4341
let ws2udp = async {
44-
if let Err(error) = process_ws2udp(&mut ws_in, udp_out, ws_recv_timeout).await {
42+
if let Err(error) = process_ws2udp(&mut ws_in, udp_out).await {
4543
error!("Error: {}", error);
4644
}
4745
};
@@ -77,11 +75,9 @@ pub async fn process_udp_over_ws(
7775
async fn process_ws2udp(
7876
ws_in: &mut SplitStream<WebSocketStream<Stream<TcpStream>>>,
7977
udp_out: Arc<UdpSocket>,
80-
ws_recv_timeout: Option<Duration>,
8178
) -> anyhow::Result<()> {
82-
while let Some(message) = maybe_timeout(ws_recv_timeout, ws_in.next())
79+
while let Some(message) = ws_in.next()
8380
.await
84-
.context("Timeout while reading from WS")?
8581
.transpose()
8682
.context("Failed reading from WS")?
8783
{

zia-common/src/lib.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,5 @@
1-
use std::future::Future;
2-
use std::time::Duration;
3-
4-
use tokio::time::error::Elapsed;
5-
use tokio::time::timeout;
6-
71
pub use crate::forwarding::*;
82
pub use crate::stream::*;
93

104
mod forwarding;
115
mod stream;
12-
13-
pub async fn maybe_timeout<F: Future>(
14-
duration: Option<Duration>,
15-
future: F,
16-
) -> Result<F::Output, Elapsed> {
17-
match duration {
18-
Some(duration) => timeout(duration, future).await,
19-
None => Ok(future.await),
20-
}
21-
}

zia-server/src/cfg.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@ pub(crate) struct ClientCfg {
1010
pub(crate) listen_addr: SocketAddr,
1111
#[arg(short, long, env = "ZIA_UPSTREAM")]
1212
pub(crate) upstream: String,
13-
#[arg(short, long, env = "ZIA_MODE", default_value = "WS", value_enum)]
13+
#[arg(short, long, env = "ZIA_MODE", default_value = "WS", value_enum, ignore_case(true))]
1414
pub(crate) mode: Mode,
1515
}
1616

1717
#[derive(ValueEnum, Clone)]
1818
pub(crate) enum Mode {
19-
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
2019
Ws,
21-
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
2220
Tcp,
2321
}
2422

zia-server/src/listener/tcp.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::net::SocketAddr;
2-
use std::time::Duration;
32

43
use tokio::net::{TcpStream, UdpSocket};
54
use tracing::{error, info};
@@ -48,7 +47,7 @@ impl TcpListener {
4847
downstream_addr
4948
);
5049

51-
process_udp_over_tcp(upstream, Plain(downstream), Some(Duration::from_secs(60))).await;
50+
process_udp_over_tcp(upstream, Plain(downstream)).await;
5251

5352
info!("Connection with downstream {} closed...", downstream_addr);
5453

zia-server/src/listener/ws.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::net::SocketAddr;
2-
use std::time::Duration;
32

43
use tokio::net::{TcpListener, TcpStream, UdpSocket};
54
use tracing::{error, info};
@@ -49,7 +48,7 @@ impl WsListener {
4948
downstream_addr
5049
);
5150

52-
process_udp_over_ws(upstream, downstream, Some(Duration::from_secs(60))).await;
51+
process_udp_over_ws(upstream, downstream).await;
5352

5453
info!("Connection with downstream {} closed...", downstream_addr);
5554

0 commit comments

Comments
 (0)