From 6544aae688339bb8716e509e5bd4bdab5c24d779 Mon Sep 17 00:00:00 2001 From: magine Date: Mon, 7 Oct 2024 20:35:02 +0800 Subject: [PATCH] refact: replace request response protocol with stream protocol --- Cargo.lock | 43 +--- Cargo.toml | 13 +- build.rs | 2 +- proto/tunnel_v1.proto | 17 -- src/error.rs | 41 +++- src/lib.rs | 382 +++++++------------------------ src/p2p/behaviour.rs | 16 +- src/p2p/codec.rs | 141 ------------ src/p2p/mod.rs | 4 +- src/{tunnel.rs => tunnel/mod.rs} | 156 +++---------- src/tunnel/protocol.rs | 135 +++++++++++ src/types.rs | 122 +++++++++- 12 files changed, 430 insertions(+), 642 deletions(-) delete mode 100644 proto/tunnel_v1.proto delete mode 100644 src/p2p/codec.rs rename src/{tunnel.rs => tunnel/mod.rs} (58%) create mode 100644 src/tunnel/protocol.rs diff --git a/Cargo.lock b/Cargo.lock index 1928f78..e577a17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,9 +432,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cc" @@ -665,14 +665,15 @@ dependencies = [ [[package]] name = "dephy-pproxy" -version = "0.3.2" +version = "0.4.0" dependencies = [ "async-trait", + "bytes", "clap", "futures", - "futures_ringbuf", "hex", "libp2p", + "libp2p-stream", "prost", "reqwest", "serde", @@ -996,18 +997,6 @@ dependencies = [ "slab", ] -[[package]] -name = "futures_ringbuf" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6628abb6eb1fc74beaeb20cd0670c43d158b0150f7689b38c3eaf663f99bdec7" -dependencies = [ - "futures", - "log", - "ringbuf", - "rustc_version", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -1561,7 +1550,6 @@ dependencies = [ "libp2p-noise", "libp2p-quic", "libp2p-relay", - "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -1773,23 +1761,18 @@ dependencies = [ ] [[package]] -name = "libp2p-request-response" -version = "0.27.0" +name = "libp2p-stream" +version = "0.2.0-alpha" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" +checksum = "d454f6647bc9054e7fede2dc86e625786c4d1304bff7afc995285f53ef9091f0" dependencies = [ - "async-trait", "futures", - "futures-bounded", - "futures-timer", "libp2p-core", "libp2p-identity", "libp2p-swarm", "rand", - "smallvec", "tracing", "void", - "web-time", ] [[package]] @@ -2806,15 +2789,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ringbuf" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "rtnetlink" version = "0.10.1" @@ -3348,6 +3322,7 @@ checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d4fbe13..dc6d62e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,20 +1,22 @@ [package] name = "dephy-pproxy" -version = "0.3.2" +version = "0.4.0" edition = "2021" [dependencies] async-trait = "0.1.81" +bytes = "1.7.2" clap = "4.5.4" futures = "0.3.30" hex = "0.4.3" -libp2p = { version = "0.54.1", features = ["tokio", "macros", "yamux", "noise", "tcp", "request-response", "relay", "secp256k1"] } +libp2p = { version = "0.54.1", features = ["tokio", "macros", "yamux", "noise", "tcp", "relay", "secp256k1"] } +libp2p-stream = "0.2.0-alpha" prost = "0.13.1" reqwest = { version = "0.12.5", features = ["json", "rustls-tls"], default-features = false } serde = { version = "1.0.207", features = ["derive"] } thiserror = "1.0.60" -tokio = { version = "1.37.0", features = ["rt-multi-thread"] } -tokio-util = "0.7.11" +tokio = { version = "1.37.0", features = ["io-util", "rt-multi-thread"] } +tokio-util = { version = "0.7.11", features = ["compat"] } tonic = "0.12.1" tonic-web = "0.12.1" tracing = "0.1.40" @@ -26,6 +28,3 @@ tonic-build = "0.12.1" [[bin]] name = "pproxy" path = "src/main.rs" - -[dev-dependencies] -futures_ringbuf = "0.4.0" diff --git a/build.rs b/build.rs index 20cb68b..73343f7 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,5 @@ fn main() -> Result<(), Box> { - let protos = ["proto/command_v1.proto", "proto/tunnel_v1.proto"]; + let protos = ["proto/command_v1.proto"]; tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") .compile(&protos, &["proto"])?; diff --git a/proto/tunnel_v1.proto b/proto/tunnel_v1.proto deleted file mode 100644 index 1680642..0000000 --- a/proto/tunnel_v1.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto3"; -package org.dephy.pproxy.tunnel.v1; - -enum TunnelCommand { - TUNNEL_COMMAND_UNSPECIFIED = 0; - TUNNEL_COMMAND_CONNECT = 1; - TUNNEL_COMMAND_CONNECT_RESP = 2; - TUNNEL_COMMAND_PACKAGE = 3; - TUNNEL_COMMAND_PACKAGE_RESP = 4; - TUNNEL_COMMAND_BREAK = 5; -} - -message Tunnel { - string tunnel_id = 1; - TunnelCommand command = 2; - optional bytes data = 3; -} diff --git a/src/error.rs b/src/error.rs index 192ab61..6f57493 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,8 +1,12 @@ use std::io::ErrorKind as IOErrorKind; +use crate::types::TunnelReply; + #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum Error { + #[error("Io error: {0}")] + IoError(#[from] std::io::Error), #[error("Multiaddr parse error: {0}")] MultiaddrParseError(String), #[error("SocketAddr parse error: {0}")] @@ -17,8 +21,12 @@ pub enum Error { EssentialTaskClosed, #[error("Libp2p swarm create error: {0}")] Libp2pSwarmCreateError(String), + #[error("Libp2p dial error: {0}")] + Libp2pDialError(#[from] libp2p::swarm::DialError), #[error("Libp2p transport error: {0}")] Libp2pTransportError(#[from] libp2p::core::transport::TransportError), + #[error("Libp2p open stream error")] + Libp2pOpenStreamError(#[from] libp2p_stream::OpenStreamError), #[error("Reqwest error: {0}")] ReqwestError(#[from] reqwest::Error), #[error("Protocol not support: {0}")] @@ -32,7 +40,9 @@ pub enum Error { #[error("Tunnel context not found: {0}")] TunnelContextNotFound(String), #[error("Tunnel error: {0:?}")] - Tunnel(TunnelError), + TunnelError(TunnelError), + #[error("Tunnel protocol error: {0}")] + TunnelProtocolError(#[from] TunnelProtocolError), #[error("Protobuf decode error: {0}")] ProtobufDecode(#[from] prost::DecodeError), #[error("Access denied, peer: {0}")] @@ -67,6 +77,33 @@ pub enum TunnelError { Unknown = u8::MAX, } +#[derive(Debug, thiserror::Error)] +pub enum TunnelProtocolError { + #[error("{0}")] + IoError(#[from] std::io::Error), + #[error("unsupported tunnel version {0:#x}")] + UnsupportedTunnelVersion(u8), + #[error("unsupported command {0:#x}")] + UnsupportedCommand(u8), + #[error("{0}")] + TunnelReply(TunnelReply), +} + +impl TunnelProtocolError { + /// Convert to `TunnelReply` for responding + pub fn as_reply(&self) -> TunnelReply { + match *self { + TunnelProtocolError::IoError(ref err) => match err.kind() { + std::io::ErrorKind::ConnectionRefused => TunnelReply::ConnectionRefused, + _ => TunnelReply::GeneralFailure, + }, + TunnelProtocolError::UnsupportedTunnelVersion(..) => TunnelReply::GeneralFailure, + TunnelProtocolError::UnsupportedCommand(..) => TunnelReply::CommandNotSupported, + TunnelProtocolError::TunnelReply(r) => r, + } + } +} + impl From> for Error { fn from(_: tokio::sync::mpsc::error::SendError) -> Self { Error::EssentialTaskClosed @@ -81,7 +118,7 @@ impl From for Error { impl From for Error { fn from(error: TunnelError) -> Self { - Error::Tunnel(error) + Error::TunnelError(error) } } diff --git a/src/lib.rs b/src/lib.rs index 4947436..a1f4967 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,15 +5,17 @@ use std::sync::Arc; use std::sync::Mutex; use futures::channel::oneshot; +use futures::AsyncWriteExt; use futures::StreamExt; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; -use libp2p::request_response; use libp2p::swarm::SwarmEvent; use libp2p::Multiaddr; use libp2p::PeerId; +use libp2p::Stream; use libp2p::Swarm; use tokio::sync::mpsc; +use tunnel::protocol::TcpResponseHeader; use crate::access::AccessClient; use crate::command::proto::AddPeerRequest; @@ -24,14 +26,17 @@ use crate::command::proto::CreateTunnelServerRequest; use crate::command::proto::CreateTunnelServerResponse; use crate::command::proto::ExpirePeerAccessRequest; use crate::command::proto::ExpirePeerAccessResponse; -use crate::error::TunnelError; +use crate::error::TunnelProtocolError; use crate::p2p::PProxyNetworkBehaviour; use crate::p2p::PProxyNetworkBehaviourEvent; -use crate::tunnel::proto; +use crate::p2p::PPROXY_PROTOCOL; +use crate::tunnel::protocol::TunnelRequestHeader; use crate::tunnel::tcp_connect_with_timeout; use crate::tunnel::Tunnel; use crate::tunnel::TunnelServer; -use crate::types::*; +use crate::types::TunnelCommand; +use crate::types::TunnelId; +use crate::types::TunnelReply; mod access; pub mod command; @@ -58,7 +63,6 @@ pub type Result = std::result::Result; type CommandNotification = Result; type CommandNotifier = oneshot::Sender; -type ChannelPackage = std::result::Result, TunnelError>; #[derive(Debug)] pub enum PProxyCommand { @@ -72,39 +76,25 @@ pub enum PProxyCommand { SendConnectCommand { peer_id: PeerId, tunnel_id: TunnelId, - tunnel_tx: mpsc::Sender, - }, - SendOutboundPackageCommand { - peer_id: PeerId, - tunnel_id: TunnelId, - data: Vec, }, ExpirePeerAccess { peer_id: PeerId, }, } +#[derive(Debug)] pub enum PProxyCommandResponse { AddPeer { peer_id: PeerId }, ConnectRelay { relaied_multiaddr: Multiaddr }, - SendConnectCommand {}, - SendOutboundPackageCommand {}, + SendConnectCommand { remote_stream: Stream }, ExpirePeerAccess {}, } -pub struct TunnelContext { - tx: mpsc::Sender, - outbound_sent_notifier: Option, -} - pub struct PProxy { - command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>, swarm: Swarm, + stream_control: libp2p_stream::Control, proxy_addr: Option, - outbound_ready_notifiers: HashMap, - inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>, - tunnel_ctx: HashMap<(PeerId, TunnelId), TunnelContext>, access_client: Option, } @@ -124,17 +114,15 @@ impl PProxy { let (command_tx, command_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let swarm = crate::p2p::new_swarm(keypair, listen_addr) .map_err(|e| Error::Libp2pSwarmCreateError(e.to_string()))?; + let stream_control = swarm.behaviour().stream.new_control(); let access_client = access_server_endpoint.map(AccessClient::new); Ok(( Self { - command_tx: command_tx.clone(), command_rx, swarm, + stream_control, proxy_addr, - outbound_ready_notifiers: HashMap::new(), - inbound_tunnels: HashMap::new(), - tunnel_ctx: HashMap::new(), access_client, }, PProxyHandle { @@ -146,17 +134,29 @@ impl PProxy { } pub async fn run(mut self) { + let mut incoming_streams = self + .stream_control + .accept(PPROXY_PROTOCOL) + .expect("Failed to accept incoming streams"); + loop { tokio::select! { // Events coming from the network have higher priority than user commands biased; event = self.swarm.select_next_some() => { - if let Err(error) = self.handle_p2p_server_event(event).await { + if let Err(error) = self.handle_p2p_server_event(event).await { tracing::warn!("failed to handle event: {:?}", error); } }, + stream = incoming_streams.next() => match stream { + None => return, + Some((peer_id, stream)) => if let Err(error) = self.handle_incoming_stream(peer_id, stream).await { + tracing::warn!("failed to handle incoming stream: {:?}", error); + } + }, + command = self.command_rx.recv() => match command { None => return, Some((command, tx)) => if let Err(error) = self.handle_command(command, tx).await { @@ -167,23 +167,20 @@ impl PProxy { } } - async fn dial_tunnel( + async fn handle_p2p_server_event( &mut self, - proxy_addr: SocketAddr, - peer_id: PeerId, - tunnel_id: TunnelId, + event: SwarmEvent, ) -> Result<()> { - let stream = tcp_connect_with_timeout(proxy_addr, LOCAL_TCP_TIMEOUT).await?; - - let mut tunnel = Tunnel::new(peer_id, tunnel_id, self.command_tx.clone()); - let (tunnel_tx, tunnel_rx) = mpsc::channel(1024); - tunnel.listen(stream, tunnel_rx).await?; + tracing::debug!("received SwarmEvent: {:?}", event); - self.inbound_tunnels.insert((peer_id, tunnel_id), tunnel); - self.tunnel_ctx.insert((peer_id, tunnel_id), TunnelContext { - tx: tunnel_tx, - outbound_sent_notifier: None, - }); + #[allow(clippy::single_match)] + match event { + SwarmEvent::NewListenAddr { mut address, .. } => { + address.push(Protocol::P2p(*self.swarm.local_peer_id())); + println!("Local node is listening on {address}"); + } + _ => {} + } Ok(()) } @@ -195,207 +192,42 @@ impl PProxy { ac.is_valid(peer_id).await } - async fn handle_tunnel_request( + // TODO: Should spawn? + async fn handle_incoming_stream( &mut self, peer_id: PeerId, - request: proto::Tunnel, - ) -> Result { - let tunnel_id = request - .tunnel_id - .parse() - .map_err(|_| Error::TunnelIdParseError(request.tunnel_id.clone()))?; - - match request.command() { - proto::TunnelCommand::Connect => { - tracing::info!("received connect command from peer: {:?}", peer_id); - if !self.is_tunnel_valid(&peer_id).await { - return Err(Error::AccessDenied(peer_id.to_string())); - } - - let Some(proxy_addr) = self.proxy_addr else { - return Err(Error::ProtocolNotSupport("No proxy_addr".to_string())); - }; - - let data = match self.dial_tunnel(proxy_addr, peer_id, tunnel_id).await { - Ok(_) => None, - Err(e) => { - tracing::warn!("failed to dial tunnel: {:?}", e); - Some(e.to_string().into_bytes()) - } - }; - - Ok(proto::Tunnel { - tunnel_id: tunnel_id.to_string(), - command: proto::TunnelCommand::ConnectResp.into(), - data, - }) - } - - proto::TunnelCommand::Package => { - // Note that only inbound package need access check. - if self.inbound_tunnels.contains_key(&(peer_id, tunnel_id)) - && !self.is_tunnel_valid(&peer_id).await - { - return Err(Error::AccessDenied(peer_id.to_string())); - } - - let Some(ctx) = self.tunnel_ctx.get(&(peer_id, tunnel_id)) else { - return Err(Error::ProtocolNotSupport( - "No tunnel for Package".to_string(), - )); - }; - - ctx.tx.send(Ok(request.data.unwrap_or_default())).await?; - - // Have to do this to close the response waiter in remote. - Ok(proto::Tunnel { - tunnel_id: tunnel_id.to_string(), - command: proto::TunnelCommand::PackageResp.into(), - data: None, - }) - } - - _ => Err(Error::ProtocolNotSupport( - "Wrong tunnel request command".to_string(), - )), - } - } - - async fn handle_p2p_server_event( - &mut self, - event: SwarmEvent, + mut remote_stream: Stream, ) -> Result<()> { - tracing::debug!("received SwarmEvent: {:?}", event); + tracing::info!("received channel stream from peer: {:?}", peer_id); - match event { - SwarmEvent::NewListenAddr { mut address, .. } => { - address.push(Protocol::P2p(*self.swarm.local_peer_id())); - println!("Local node is listening on {address}"); - } + let Some(proxy_addr) = self.proxy_addr else { + remote_stream.close().await?; + let e = Error::ProtocolNotSupport("No proxy_addr".to_string()); + return Err(e); + }; - SwarmEvent::ConnectionClosed { peer_id, .. } => { - self.inbound_tunnels.retain(|(p, _), _| p != &peer_id); - self.tunnel_ctx.retain(|(p, _), _| p != &peer_id); + match TunnelRequestHeader::read_from(&mut remote_stream).await { + Err(e) => { + remote_stream.close().await?; + return Err(e.into()); } - SwarmEvent::Behaviour(PProxyNetworkBehaviourEvent::RequestResponse( - request_response::Event::Message { peer, message }, - )) => match message { - request_response::Message::Request { - request, channel, .. - } => { - let tunnel_id = request.tunnel_id.clone(); - let resp = match self.handle_tunnel_request(peer, request).await { - Ok(resp) => resp, - Err(e) => { - if let Ok(tunnel_id) = tunnel_id.parse() { - self.inbound_tunnels.remove(&(peer, tunnel_id)); - self.tunnel_ctx.remove(&(peer, tunnel_id)); - } - proto::Tunnel { - tunnel_id, - command: proto::TunnelCommand::Break.into(), - data: Some(e.to_string().as_bytes().to_vec()), - } - } - }; - - self.swarm - .behaviour_mut() - .request_response - .send_response(channel, resp) - .map_err(|_| Error::EssentialTaskClosed)?; + Ok(header) => { + if !self.is_tunnel_valid(&peer_id).await { + remote_stream.close().await?; + let e = Error::AccessDenied(peer_id.to_string()); + return Err(e); } - request_response::Message::Response { - request_id, - response, - } => { - match response.command() { - proto::TunnelCommand::ConnectResp => { - let tx = self - .outbound_ready_notifiers - .remove(&request_id) - .ok_or_else(|| { - Error::TunnelNotWaiting(format!( - "peer {}, tunnel {} ready but not waiting", - peer, response.tunnel_id - )) - })?; - - match response.data { - None => tx.send(Ok(PProxyCommandResponse::SendConnectCommand {})), - Some(data) => tx.send(Err(Error::TunnelDialFailed( - String::from_utf8(data) - .unwrap_or("Unknown (decode failed)".to_string()), - ))), - } - .map_err(|_| Error::EssentialTaskClosed)?; - } - - proto::TunnelCommand::PackageResp => { - let tunnel_id = response.tunnel_id.parse().map_err(|_| { - Error::TunnelIdParseError(response.tunnel_id.clone()) - })?; - - let Some(ctx) = self.tunnel_ctx.get_mut(&(peer, tunnel_id)) else { - return Err(Error::ProtocolNotSupport( - "No ctx for Package".to_string(), - )); - }; - - let Some(notifier) = ctx.outbound_sent_notifier.take() else { - return Err(Error::ProtocolNotSupport( - "No notifier for Package".to_string(), - )); - }; - - notifier - .send(Ok(PProxyCommandResponse::SendOutboundPackageCommand {})) - .map_err(|_| Error::EssentialTaskClosed)?; - } - - proto::TunnelCommand::Break => { - let tunnel_id = response.tunnel_id.parse().map_err(|_| { - Error::TunnelIdParseError(response.tunnel_id.clone()) - })?; - - // Terminat connecting tunnel - if let Some(tx) = self.outbound_ready_notifiers.remove(&request_id) { - tx.send(Err(Error::Tunnel(TunnelError::ConnectionAborted))) - .map_err(|_| Error::EssentialTaskClosed)? - } - - // Terminat connected tunnel - if let Some(ctx) = self.tunnel_ctx.remove(&(peer, tunnel_id)) { - ctx.tx.send(Err(TunnelError::ConnectionAborted)).await? - }; - } - - _ => { - return Err(Error::ProtocolNotSupport( - "Wrong tunnel response command".to_string(), - )); - } - } - } - }, + let local_stream = tcp_connect_with_timeout(proxy_addr, LOCAL_TCP_TIMEOUT).await?; + let mut tunnel = Tunnel::new(peer_id, header.id); - SwarmEvent::Behaviour(PProxyNetworkBehaviourEvent::RequestResponse( - request_response::Event::OutboundFailure { - request_id, error, .. - }, - )) => { - // Tell tunnel dial failed - if let Some(tx) = self.outbound_ready_notifiers.remove(&request_id) { - tx.send(Err(Error::TunnelDialFailed(error.to_string()))) - .map_err(|_| Error::EssentialTaskClosed)? - } + TcpResponseHeader::new(TunnelReply::Succeeded, header.id) + .write_to(&mut remote_stream) + .await?; - // TODO: Should also tell tunnel sent failed. But cannot get tunnel_id here. + tunnel.listen(local_stream, remote_stream).await?; } - - _ => {} } Ok(()) @@ -407,21 +239,8 @@ impl PProxy { self.on_add_peer(multiaddr, peer_id, tx).await } PProxyCommand::ConnectRelay { multiaddr } => self.on_connect_relay(multiaddr, tx).await, - PProxyCommand::SendConnectCommand { - peer_id, - tunnel_id, - tunnel_tx, - } => { - self.on_send_connect_command(peer_id, tunnel_id, tunnel_tx, tx) - .await - } - PProxyCommand::SendOutboundPackageCommand { - peer_id, - tunnel_id, - data, - } => { - self.on_send_outbound_package_command(peer_id, tunnel_id, data, tx) - .await + PProxyCommand::SendConnectCommand { peer_id, tunnel_id } => { + self.on_send_connect_command(peer_id, tunnel_id, tx).await } PProxyCommand::ExpirePeerAccess { peer_id } => { self.on_expire_peer_access(peer_id, tx).await @@ -435,8 +254,7 @@ impl PProxy { peer_id: PeerId, tx: CommandNotifier, ) -> Result<()> { - self.swarm.add_peer_address(peer_id, multiaddr); - + self.swarm.dial(multiaddr)?; tx.send(Ok(PProxyCommandResponse::AddPeer { peer_id })) .map_err(|_| Error::EssentialTaskClosed) } @@ -458,60 +276,34 @@ impl PProxy { &mut self, peer_id: PeerId, tunnel_id: TunnelId, - tunnel_tx: mpsc::Sender, tx: CommandNotifier, ) -> Result<()> { - self.tunnel_ctx.insert((peer_id, tunnel_id), TunnelContext { - tx: tunnel_tx, - outbound_sent_notifier: None, - }); - - let request = proto::Tunnel { - tunnel_id: tunnel_id.to_string(), - command: proto::TunnelCommand::Connect.into(), - data: None, - }; - - tracing::info!("send connect command to peer_id: {:?}", peer_id); - let request_id = self - .swarm - .behaviour_mut() - .request_response - .send_request(&peer_id, request); - - self.outbound_ready_notifiers.insert(request_id, tx); - - Ok(()) - } + let mut remote_stream = self + .stream_control + .open_stream(peer_id, PPROXY_PROTOCOL) + .await?; - async fn on_send_outbound_package_command( - &mut self, - peer_id: PeerId, - tunnel_id: TunnelId, - data: Vec, - tx: CommandNotifier, - ) -> Result<()> { - let request = proto::Tunnel { - tunnel_id: tunnel_id.to_string(), - command: proto::TunnelCommand::Package.into(), - data: Some(data), - }; + TunnelRequestHeader::new(TunnelCommand::TcpConnect, tunnel_id) + .write_to(&mut remote_stream) + .await?; - let Some(ctx) = self.tunnel_ctx.get_mut(&(peer_id, tunnel_id)) else { - let err_msg = "No ctx for outbound package"; + let resp = TcpResponseHeader::read_from(&mut remote_stream).await?; - tx.send(Err(Error::TunnelContextNotFound(err_msg.to_string()))) + match resp.reply { + TunnelReply::Succeeded => { + tx.send(Ok(PProxyCommandResponse::SendConnectCommand { + remote_stream, + })) .map_err(|_| Error::EssentialTaskClosed)?; - - return Err(Error::TunnelContextNotFound(err_msg.to_string())); - }; - - ctx.outbound_sent_notifier = Some(tx); - - self.swarm - .behaviour_mut() - .request_response - .send_request(&peer_id, request); + } + e => { + remote_stream.close().await?; + tx.send(Err(Error::TunnelProtocolError( + TunnelProtocolError::TunnelReply(e), + ))) + .map_err(|_| Error::EssentialTaskClosed)?; + } + } Ok(()) } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 8bcf2a7..9b69375 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -1,30 +1,20 @@ use libp2p::identity::Keypair; use libp2p::relay; -use libp2p::request_response; use libp2p::swarm::NetworkBehaviour; -use libp2p::StreamProtocol; - -use crate::p2p::codec::Codec; #[derive(NetworkBehaviour)] pub(crate) struct PProxyNetworkBehaviour { - pub(crate) request_response: request_response::Behaviour, + pub(crate) stream: libp2p_stream::Behaviour, pub(crate) relay: relay::Behaviour, pub(crate) relay_client: relay::client::Behaviour, } impl PProxyNetworkBehaviour { pub fn new(key: &Keypair, relay_client: relay::client::Behaviour) -> Self { - let request_response = request_response::Behaviour::new( - [( - StreamProtocol::new("/pproxy/1.0.0"), - request_response::ProtocolSupport::Full, - )], - request_response::Config::default(), - ); + let stream = libp2p_stream::Behaviour::new(); let relay = relay::Behaviour::new(key.public().to_peer_id(), Default::default()); Self { - request_response, + stream, relay, relay_client, } diff --git a/src/p2p/codec.rs b/src/p2p/codec.rs deleted file mode 100644 index b52e658..0000000 --- a/src/p2p/codec.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::io; - -use async_trait::async_trait; -use futures::prelude::*; -use libp2p::swarm::StreamProtocol; -use prost::Message; - -use crate::tunnel::proto; - -/// Max request size in bytes -const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024; -/// Max response size in bytes -const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024; - -#[derive(Clone, Default)] -pub struct Codec; - -#[async_trait] -impl libp2p::request_response::Codec for Codec { - type Protocol = StreamProtocol; - type Request = proto::Tunnel; - type Response = proto::Tunnel; - - async fn read_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut vec = Vec::new(); - - io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?; - - proto::Tunnel::decode(vec.as_slice()).map_err(decode_into_io_error) - } - - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut vec = Vec::new(); - - io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?; - - proto::Tunnel::decode(vec.as_slice()).map_err(decode_into_io_error) - } - - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let data = req.encode_to_vec(); - io.write_all(data.as_ref()).await?; - Ok(()) - } - - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - resp: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let data = resp.encode_to_vec(); - io.write_all(data.as_ref()).await?; - Ok(()) - } -} - -fn decode_into_io_error(err: prost::DecodeError) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, err) -} - -#[cfg(test)] -mod tests { - use futures::AsyncWriteExt; - use futures_ringbuf::Endpoint; - use libp2p::request_response::Codec as _; - use libp2p::swarm::StreamProtocol; - - use super::*; - use crate::proto; - - #[tokio::test] - async fn test_codec() { - let expected_request = proto::Tunnel { - tunnel_id: "1".to_string(), - command: proto::TunnelCommand::Connect.into(), - data: None, - }; - let expected_response = proto::Tunnel { - tunnel_id: "1".to_string(), - command: proto::TunnelCommand::ConnectResp.into(), - data: None, - }; - let protocol = StreamProtocol::new("/test_pproxy/1"); - - let (mut a, mut b) = Endpoint::pair(124, 124); - Codec - .write_request(&protocol, &mut a, expected_request.clone()) - .await - .expect("Should write request"); - a.close().await.unwrap(); - - let actual_request = Codec - .read_request(&protocol, &mut b) - .await - .expect("Should read request"); - b.close().await.unwrap(); - - assert_eq!(actual_request, expected_request); - - let (mut a, mut b) = Endpoint::pair(124, 124); - Codec - .write_response(&protocol, &mut a, expected_response.clone()) - .await - .expect("Should write response"); - a.close().await.unwrap(); - - let actual_response = Codec - .read_response(&protocol, &mut b) - .await - .expect("Should read response"); - b.close().await.unwrap(); - - assert_eq!(actual_response, expected_response); - } -} diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 468b3a3..bab0d63 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -6,12 +6,14 @@ use libp2p::noise; use libp2p::swarm::Swarm; use libp2p::tcp; use libp2p::yamux; +use libp2p::StreamProtocol; pub(crate) use crate::p2p::behaviour::PProxyNetworkBehaviour; pub(crate) use crate::p2p::behaviour::PProxyNetworkBehaviourEvent; mod behaviour; -mod codec; + +pub const PPROXY_PROTOCOL: StreamProtocol = StreamProtocol::new("/pproxy/1.0.0"); pub(crate) fn new_swarm( keypair: Keypair, diff --git a/src/tunnel.rs b/src/tunnel/mod.rs similarity index 58% rename from src/tunnel.rs rename to src/tunnel/mod.rs index 184ef50..b491ae6 100644 --- a/src/tunnel.rs +++ b/src/tunnel/mod.rs @@ -7,22 +7,22 @@ use std::time::Duration; use futures::channel::oneshot; use libp2p::PeerId; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; +use libp2p::Stream; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::time::timeout; +use tokio_util::compat::Compat; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::sync::CancellationToken; use crate::error::TunnelError; use crate::types::TunnelId; use crate::CommandNotifier; use crate::PProxyCommand; +use crate::PProxyCommandResponse; -pub mod proto { - tonic::include_proto!("org.dephy.pproxy.tunnel.v1"); -} +pub mod protocol; pub struct TunnelServer { peer_id: PeerId, @@ -43,17 +43,13 @@ pub struct TunnelServerListener { pub struct Tunnel { peer_id: PeerId, tunnel_id: TunnelId, - pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, listener_cancel_token: Option, listener: Option>, } pub struct TunnelListener { - peer_id: PeerId, - tunnel_id: TunnelId, local_stream: TcpStream, - remote_stream_rx: mpsc::Receiver, TunnelError>>, - pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, + remote_stream: Compat, cancel_token: CancellationToken, } @@ -70,7 +66,7 @@ impl Drop for TunnelServer { }); } - tracing::info!("TunnelServer {} dropped", self.peer_id); + tracing::info!("TunnelServer to {} dropped", self.peer_id); } } @@ -146,7 +142,7 @@ impl TunnelServerListener { } fn next_tunnel_id(&mut self) -> TunnelId { - TunnelId::from(self.next_tunnel_id.fetch_add(1usize, Ordering::Relaxed)) + TunnelId::from(self.next_tunnel_id.fetch_add(1usize, Ordering::Relaxed) as u64) } fn cancel_token(&self) -> CancellationToken { @@ -165,17 +161,15 @@ impl TunnelServerListener { tracing::debug!("Received new connection from: {address}"); let tunnel_id = self.next_tunnel_id(); - let mut tunnel = Tunnel::new(self.peer_id, tunnel_id, self.pproxy_command_tx.clone()); + let mut tunnel = Tunnel::new(self.peer_id, tunnel_id); let (tx, rx) = oneshot::channel(); - let (tunnel_tx, tunnel_rx) = mpsc::channel(1024); if let Err(e) = self .pproxy_command_tx .send(( PProxyCommand::SendConnectCommand { peer_id: self.peer_id, tunnel_id, - tunnel_tx, }, tx, )) @@ -194,29 +188,30 @@ impl TunnelServerListener { tracing::error!("Send connect command channel failed: {e:?}"); continue; } - Ok(Ok(_resp)) => {} + Ok(Ok(resp)) => match resp { + PProxyCommandResponse::SendConnectCommand { remote_stream } => { + if let Err(e) = tunnel.listen(stream, remote_stream).await { + tracing::error!("Tunnel listen failed: {e:?}"); + continue; + }; + } + other_resp => { + tracing::error!("Send connect command channel got invalid pproxy command response {other_resp:?}"); + continue; + } + }, } - if let Err(e) = tunnel.listen(stream, tunnel_rx).await { - tracing::error!("Tunnel listen failed: {e:?}"); - continue; - }; - self.tunnels.insert(tunnel_id, tunnel); } } } impl Tunnel { - pub fn new( - peer_id: PeerId, - tunnel_id: TunnelId, - pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, - ) -> Self { + pub fn new(peer_id: PeerId, tunnel_id: TunnelId) -> Self { Self { peer_id, tunnel_id, - pproxy_command_tx, listener: None, listener_cancel_token: None, } @@ -225,20 +220,13 @@ impl Tunnel { pub async fn listen( &mut self, local_stream: TcpStream, - remote_stream_rx: mpsc::Receiver, TunnelError>>, + remote_stream: Stream, ) -> Result<(), TunnelError> { if self.listener.is_some() { return Err(TunnelError::AlreadyListened); } - let mut listener = TunnelListener::new( - self.peer_id, - self.tunnel_id, - local_stream, - remote_stream_rx, - self.pproxy_command_tx.clone(), - ) - .await; + let mut listener = TunnelListener::new(local_stream, remote_stream).await; let listener_cancel_token = listener.cancel_token(); let listener_handler = tokio::spawn(Box::pin(async move { listener.listen().await })); @@ -250,19 +238,11 @@ impl Tunnel { } impl TunnelListener { - async fn new( - peer_id: PeerId, - tunnel_id: TunnelId, - local_stream: TcpStream, - remote_stream_rx: mpsc::Receiver, TunnelError>>, - pproxy_command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>, - ) -> Self { + async fn new(local_stream: TcpStream, remote_stream: Stream) -> Self { + let remote_stream = remote_stream.compat(); Self { - peer_id, - tunnel_id, local_stream, - remote_stream_rx, - pproxy_command_tx, + remote_stream, cancel_token: CancellationToken::new(), } } @@ -272,85 +252,9 @@ impl TunnelListener { } async fn listen(&mut self) { - let (mut local_read, mut local_write) = self.local_stream.split(); - - let listen_local = async { - loop { - if self.cancel_token.is_cancelled() { - break TunnelError::ConnectionClosed; - } - - let mut buf = [0u8; 30000]; - match local_read.read(&mut buf).await { - Err(e) => { - break e.kind().into(); - } - Ok(0) => { - break TunnelError::ConnectionClosed; - } - Ok(n) => { - tracing::debug!("Received {} bytes from local stream", n); - let (tx, rx) = oneshot::channel(); - let data = buf[..n].to_vec(); - let command = PProxyCommand::SendOutboundPackageCommand { - peer_id: self.peer_id, - tunnel_id: self.tunnel_id, - data, - }; - if let Err(e) = self.pproxy_command_tx.send((command, tx)).await { - tracing::error!("Send tcp package channel tx failed: {e:?}"); - break TunnelError::DataSendFailed; - }; - - match rx.await { - Err(e) => { - tracing::error!("Send tcp package channel rx failed: {e:?}"); - break TunnelError::DataSendFailed; - } - Ok(Err(e)) => { - tracing::error!("Send tcp package channel failed: {e:?}"); - break TunnelError::DataSendFailed; - } - Ok(Ok(_resp)) => {} - } - } - } - } - }; - - let listen_remote = async { - loop { - if self.cancel_token.is_cancelled() { - break TunnelError::ConnectionClosed; - } - - let Some(data) = self.remote_stream_rx.recv().await else { - break TunnelError::ConnectionClosed; - }; - - match data { - Err(e) => { - break e; - } - Ok(body) => { - tracing::debug!("Received {} bytes from local stream", body.len()); - if let Err(e) = local_write.write_all(&body).await { - tracing::error!("Write to local stream failed: {e:?}"); - break e.kind().into(); - } - } - } - } - }; - - tokio::select! { - defeat = listen_local => { - tracing::info!("Local stream closed: {defeat:?}"); - }, - defeat = listen_remote => { - tracing::info!("Remote stream closed: {defeat:?}"); - } - } + tokio::io::copy_bidirectional(&mut self.local_stream, &mut self.remote_stream) + .await + .unwrap(); } } diff --git a/src/tunnel/protocol.rs b/src/tunnel/protocol.rs new file mode 100644 index 0000000..088268f --- /dev/null +++ b/src/tunnel/protocol.rs @@ -0,0 +1,135 @@ +use std::io; + +use bytes::BufMut; +use bytes::BytesMut; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::AsyncWrite; +use futures::AsyncWriteExt; + +use crate::error::TunnelProtocolError; +use crate::types::consts; +use crate::types::TunnelCommand; +use crate::types::TunnelId; +use crate::types::TunnelReply; + +/// Request header when connected to the tunnel server. +/// +/// ```plain +/// +-----+-----+------+ +/// | VER | CMD | ID | +/// +-----+-----+------+ +/// | 1 | 1 | 8 | +/// +-----+-----+------+ +/// ``` +#[derive(Clone, Debug)] +pub struct TunnelRequestHeader { + pub command: TunnelCommand, + pub id: TunnelId, +} + +impl TunnelRequestHeader { + /// Creates a request header + pub fn new(command: TunnelCommand, id: TunnelId) -> TunnelRequestHeader { + TunnelRequestHeader { command, id } + } + + /// Read from a reader + pub async fn read_from(r: &mut R) -> Result + where R: AsyncRead + Unpin { + let mut buf = [0u8; 10]; + r.read_exact(&mut buf).await?; + + let ver = buf[0]; + if ver != consts::TUNNEL_VERSION { + return Err(TunnelProtocolError::UnsupportedTunnelVersion(ver)); + } + + let cmd = buf[1]; + let command = match TunnelCommand::from_u8(cmd) { + Some(c) => c, + None => return Err(TunnelProtocolError::UnsupportedCommand(cmd)), + }; + + let id = TunnelId::from_be_bytes(buf[2..].try_into().unwrap()); + + Ok(TunnelRequestHeader { command, id }) + } + + /// Write data into a writer + pub async fn write_to(&self, w: &mut W) -> io::Result<()> + where W: AsyncWrite + Unpin { + let mut buf = BytesMut::with_capacity(10); + self.write_to_buf(&mut buf); + w.write_all(&buf).await + } + + /// Writes to buffer + pub fn write_to_buf(&self, buf: &mut B) { + let TunnelRequestHeader { + ref command, + ref id, + } = *self; + + buf.put_slice(&[consts::TUNNEL_VERSION, command.as_u8()]); + buf.put_slice(&id.to_be_bytes()) + } +} + +/// Response header for TunnelRequestHeader with TcpConnect command. +/// +/// ```plain +/// +-----+-----+------+ +/// | VER | REP | ID | +/// +-----+-----+------+ +/// | 1 | 1 | 8 | +/// +-----+-----+------+ +/// ``` +#[derive(Clone, Debug)] +pub struct TcpResponseHeader { + pub reply: TunnelReply, + pub id: TunnelId, +} + +impl TcpResponseHeader { + /// Creates a response header + pub fn new(reply: TunnelReply, id: TunnelId) -> TcpResponseHeader { + TcpResponseHeader { reply, id } + } + + /// Read from a reader + pub async fn read_from(r: &mut R) -> Result + where R: AsyncRead + Unpin { + let mut buf = [0u8; 10]; + r.read_exact(&mut buf).await?; + + let ver = buf[0]; + let reply_code = buf[1]; + + if ver != consts::TUNNEL_VERSION { + return Err(TunnelProtocolError::UnsupportedTunnelVersion(ver)); + } + + let id = TunnelId::from_be_bytes(buf[2..].try_into().unwrap()); + + Ok(TcpResponseHeader { + reply: TunnelReply::from_u8(reply_code), + id, + }) + } + + /// Write to a writer + pub async fn write_to(&self, w: &mut W) -> io::Result<()> + where W: AsyncWrite + Unpin { + let mut buf = BytesMut::with_capacity(10); + self.write_to_buf(&mut buf); + w.write_all(&buf).await + } + + /// Writes to buffer + pub fn write_to_buf(&self, buf: &mut B) { + let TcpResponseHeader { ref reply, ref id } = *self; + buf.put_slice(&[consts::TUNNEL_VERSION, reply.as_u8()]); + buf.put_slice(&id.to_be_bytes()); + } +} diff --git a/src/types.rs b/src/types.rs index 498302d..d5fd54d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,17 +1,42 @@ -use std::fmt::Display; +use std::fmt; use std::str::FromStr; +pub mod consts { + pub const TUNNEL_VERSION: u8 = 0x01; + + pub const TUNNEL_CMD_TCP_CONNECT: u8 = 0x01; + pub const TUNNEL_CMD_UDP_CONNECT: u8 = 0x02; // Not supported + + pub const TUNNEL_REPLY_SUCCEEDED: u8 = 0x00; + pub const TUNNEL_REPLY_GENERAL_FAILURE: u8 = 0x01; + pub const TUNNEL_REPLY_CONNECTION_NOT_ALLOWED: u8 = 0x02; + pub const TUNNEL_REPLY_NETWORK_UNREACHABLE: u8 = 0x03; + pub const TUNNEL_REPLY_HOST_UNREACHABLE: u8 = 0x04; + pub const TUNNEL_REPLY_CONNECTION_REFUSED: u8 = 0x05; + pub const TUNNEL_REPLY_TTL_EXPIRED: u8 = 0x06; + pub const TUNNEL_REPLY_COMMAND_NOT_SUPPORTED: u8 = 0x07; + pub const TUNNEL_REPLY_ADDRESS_TYPE_NOT_SUPPORTED: u8 = 0x08; +} + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct TunnelId(usize); +pub struct TunnelId(u64); impl TunnelId { - pub fn from>(value: T) -> Self { + pub fn from>(value: T) -> Self { TunnelId(value.into()) } + + pub fn from_be_bytes(value: [u8; 8]) -> Self { + TunnelId(u64::from_be_bytes(value)) + } + + pub fn to_be_bytes(self) -> [u8; 8] { + self.0.to_be_bytes() + } } -impl Display for TunnelId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for TunnelId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } @@ -22,3 +47,90 @@ impl FromStr for TunnelId { s.parse().map(TunnelId) } } + +#[derive(Clone, Debug, Copy)] +pub enum TunnelCommand { + TcpConnect, + UdpConnect, // Not supported +} + +impl TunnelCommand { + pub fn as_u8(self) -> u8 { + match self { + TunnelCommand::TcpConnect => consts::TUNNEL_CMD_TCP_CONNECT, + TunnelCommand::UdpConnect => consts::TUNNEL_CMD_UDP_CONNECT, + } + } + + pub fn from_u8(code: u8) -> Option { + match code { + consts::TUNNEL_CMD_TCP_CONNECT => Some(TunnelCommand::TcpConnect), + consts::TUNNEL_CMD_UDP_CONNECT => Some(TunnelCommand::UdpConnect), + _ => None, + } + } +} + +#[derive(Clone, Debug, Copy)] +pub enum TunnelReply { + Succeeded, + GeneralFailure, + ConnectionNotAllowed, + NetworkUnreachable, + HostUnreachable, + ConnectionRefused, + TtlExpired, + CommandNotSupported, + AddressTypeNotSupported, + + OtherReply(u8), +} + +impl TunnelReply { + pub fn as_u8(self) -> u8 { + match self { + TunnelReply::Succeeded => consts::TUNNEL_REPLY_SUCCEEDED, + TunnelReply::GeneralFailure => consts::TUNNEL_REPLY_GENERAL_FAILURE, + TunnelReply::ConnectionNotAllowed => consts::TUNNEL_REPLY_CONNECTION_NOT_ALLOWED, + TunnelReply::NetworkUnreachable => consts::TUNNEL_REPLY_NETWORK_UNREACHABLE, + TunnelReply::HostUnreachable => consts::TUNNEL_REPLY_HOST_UNREACHABLE, + TunnelReply::ConnectionRefused => consts::TUNNEL_REPLY_CONNECTION_REFUSED, + TunnelReply::TtlExpired => consts::TUNNEL_REPLY_TTL_EXPIRED, + TunnelReply::CommandNotSupported => consts::TUNNEL_REPLY_COMMAND_NOT_SUPPORTED, + TunnelReply::AddressTypeNotSupported => consts::TUNNEL_REPLY_ADDRESS_TYPE_NOT_SUPPORTED, + TunnelReply::OtherReply(c) => c, + } + } + + pub fn from_u8(code: u8) -> TunnelReply { + match code { + consts::TUNNEL_REPLY_SUCCEEDED => TunnelReply::Succeeded, + consts::TUNNEL_REPLY_GENERAL_FAILURE => TunnelReply::GeneralFailure, + consts::TUNNEL_REPLY_CONNECTION_NOT_ALLOWED => TunnelReply::ConnectionNotAllowed, + consts::TUNNEL_REPLY_NETWORK_UNREACHABLE => TunnelReply::NetworkUnreachable, + consts::TUNNEL_REPLY_HOST_UNREACHABLE => TunnelReply::HostUnreachable, + consts::TUNNEL_REPLY_CONNECTION_REFUSED => TunnelReply::ConnectionRefused, + consts::TUNNEL_REPLY_TTL_EXPIRED => TunnelReply::TtlExpired, + consts::TUNNEL_REPLY_COMMAND_NOT_SUPPORTED => TunnelReply::CommandNotSupported, + consts::TUNNEL_REPLY_ADDRESS_TYPE_NOT_SUPPORTED => TunnelReply::AddressTypeNotSupported, + _ => TunnelReply::OtherReply(code), + } + } +} + +impl fmt::Display for TunnelReply { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TunnelReply::Succeeded => write!(f, "Succeeded"), + TunnelReply::AddressTypeNotSupported => write!(f, "Address type not supported"), + TunnelReply::CommandNotSupported => write!(f, "Command not supported"), + TunnelReply::ConnectionNotAllowed => write!(f, "Connection not allowed"), + TunnelReply::ConnectionRefused => write!(f, "Connection refused"), + TunnelReply::GeneralFailure => write!(f, "General failure"), + TunnelReply::HostUnreachable => write!(f, "Host unreachable"), + TunnelReply::NetworkUnreachable => write!(f, "Network unreachable"), + TunnelReply::OtherReply(u) => write!(f, "Other reply ({u})"), + TunnelReply::TtlExpired => write!(f, "TTL expired"), + } + } +}