Skip to content

Commit 86f7263

Browse files
committed
feat: close immediately if proxy server down
1 parent 2da051e commit 86f7263

File tree

3 files changed

+42
-13
lines changed

3 files changed

+42
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dephy-pproxy"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
edition = "2021"
55

66
[dependencies]

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub enum Error {
2525
UnexpectedResponseType,
2626
#[error("Tunnel not waiting")]
2727
TunnelNotWaiting(String),
28+
#[error("Tunnel dial failed: {0}")]
29+
TunnelDialFailed(String),
2830
#[error("Tunnel error: {0:?}")]
2931
Tunnel(TunnelError),
3032
#[error("Protobuf decode error: {0}")]

src/lib.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION");
3838
/// Default channel size.
3939
const DEFAULT_CHANNEL_SIZE: usize = 4096;
4040

41-
/// Timeout for proxied TCP connections
42-
pub const TCP_SERVER_TIMEOUT: u64 = 30;
41+
/// Timeout for local TCP server.
42+
pub const LOCAL_TCP_TIMEOUT: u64 = 5;
43+
44+
/// Timeout for remote TCP server.
45+
pub const REMOTE_TCP_TIMEOUT: u64 = 30;
4346

4447
/// Public result type error type used by the crate.
4548
pub use crate::error::Error;
@@ -154,6 +157,24 @@ impl PProxy {
154157
}
155158
}
156159

160+
async fn dial_tunnel(
161+
&mut self,
162+
proxy_addr: SocketAddr,
163+
peer_id: PeerId,
164+
tunnel_id: TunnelId,
165+
) -> Result<()> {
166+
let stream = tcp_connect_with_timeout(proxy_addr, LOCAL_TCP_TIMEOUT).await?;
167+
168+
let mut tunnel = Tunnel::new(peer_id, tunnel_id, self.command_tx.clone());
169+
let (tunnel_tx, tunnel_rx) = mpsc::channel(1024);
170+
tunnel.listen(stream, tunnel_rx).await?;
171+
172+
self.inbound_tunnels.insert((peer_id, tunnel_id), tunnel);
173+
self.tunnel_txs.insert((peer_id, tunnel_id), tunnel_tx);
174+
175+
Ok(())
176+
}
177+
157178
async fn handle_p2p_server_event(&mut self, event: P2pServerEvent) -> Result<()> {
158179
tracing::debug!("received P2pServerEvent: {:?}", event);
159180
#[allow(clippy::single_match)]
@@ -182,18 +203,18 @@ impl PProxy {
182203
.parse()
183204
.map_err(|_| Error::TunnelIdParseError(msg.tunnel_id))?;
184205

185-
let stream = tcp_connect_with_timeout(proxy_addr, 60).await?;
186-
let mut tunnel = Tunnel::new(peer, tunnel_id, self.command_tx.clone());
187-
let (tunnel_tx, tunnel_rx) = mpsc::channel(1024);
188-
tunnel.listen(stream, tunnel_rx).await?;
189-
190-
self.inbound_tunnels.insert((peer, tunnel_id), tunnel);
191-
self.tunnel_txs.insert((peer, tunnel_id), tunnel_tx);
206+
let data = match self.dial_tunnel(proxy_addr, peer, tunnel_id).await {
207+
Ok(_) => None,
208+
Err(e) => {
209+
tracing::warn!("failed to dial tunnel: {:?}", e);
210+
Some(e.to_string().into_bytes())
211+
}
212+
};
192213

193214
let response = proto::Tunnel {
194215
tunnel_id: tunnel_id.to_string(),
195216
command: proto::TunnelCommand::ConnectResp.into(),
196-
data: None,
217+
data,
197218
};
198219

199220
self.p2p_server
@@ -254,8 +275,14 @@ impl PProxy {
254275
))
255276
})?;
256277

257-
tx.send(Ok(PProxyCommandResponse::SendConnectCommand {}))
258-
.map_err(|_| Error::EssentialTaskClosed)?;
278+
match msg.data {
279+
None => tx.send(Ok(PProxyCommandResponse::SendConnectCommand {})),
280+
Some(data) => tx.send(Err(Error::TunnelDialFailed(
281+
String::from_utf8(data)
282+
.unwrap_or("Unknown (decode failed)".to_string()),
283+
))),
284+
}
285+
.map_err(|_| Error::EssentialTaskClosed)?;
259286
}
260287

261288
_ => {

0 commit comments

Comments
 (0)