From b045fffa976a819ed3e569dea24a56ab8e19bafc Mon Sep 17 00:00:00 2001 From: magine Date: Sun, 26 May 2024 15:45:48 +0800 Subject: [PATCH] Handle events inside PProxy --- src/lib.rs | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b761a62..68dc641 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ +use std::collections::HashMap; use std::net::SocketAddr; use litep2p::protocol::request_response::DialOptions; +use litep2p::types::RequestId; use litep2p::PeerId; use multiaddr::Multiaddr; use multiaddr::Protocol; @@ -42,6 +44,8 @@ pub enum PProxyCommandResponse { pub struct PProxy { command_rx: mpsc::Receiver<(PProxyCommand, oneshot::Sender)>, + #[allow(dead_code)] + tunnel_notifier: HashMap>, p2p_server: P2pServer, } @@ -56,6 +60,7 @@ impl PProxy { ( Self { command_rx, + tunnel_notifier: HashMap::new(), p2p_server: P2pServer::new(server_addr), }, PProxyHandle { command_tx }, @@ -70,7 +75,7 @@ impl PProxy { event = self.p2p_server.next_event() => match event { None => return, - Some(ref event) => if let Err (error) = self.handle_p2p_server_event(event).await { + Some(ref event) => if let Err(error) = self.handle_p2p_server_event(event).await { warn!("failed to handle event {:?}: {:?}", event, error); } @@ -78,17 +83,8 @@ impl PProxy { command = self.command_rx.recv() => match command { None => return, - Some((ref command, ref tx)) => match command { - PProxyCommand::AddPeer(request) => { - if let Err(error) = self.on_add_peer(request, tx).await { - warn!("failed to handle command {:?}: {:?}", command, error); - } - } - PProxyCommand::RequestHttpServer(request) => { - if let Err(error) = self.on_request_http_server(request, tx).await { - warn!("failed to handle command {:?}: {:?}", command, error); - } - } + Some((ref command, tx)) => if let Err(error) = self.handle_command(command, tx).await { + warn!("failed to handle command {:?}: {:?}", command, error); } } } @@ -100,11 +96,24 @@ impl PProxy { Ok(()) } + async fn handle_command( + &mut self, + command: &PProxyCommand, + tx: oneshot::Sender, + ) -> Result<()> { + match command { + PProxyCommand::AddPeer(request) => self.on_add_peer(request.clone(), tx).await, + PProxyCommand::RequestHttpServer(request) => { + self.on_request_http_server(request.clone(), tx).await + } + } + } + async fn on_add_peer( &mut self, - request: &AddPeerRequest, - _rx: &oneshot::Sender, - ) -> Result { + request: AddPeerRequest, + tx: oneshot::Sender, + ) -> Result<()> { let addr: Multiaddr = request .address .parse() @@ -123,15 +132,16 @@ impl PProxy { .litep2p .add_known_address(peer_id, vec![addr].into_iter()); - Ok(AddPeerResponse { + tx.send(PProxyCommandResponse::AddPeer(AddPeerResponse { peer_id: peer_id.to_string(), - }) + })) + .map_err(|_| Error::EssentialTaskClosed) } async fn on_request_http_server( &mut self, - request: &RequestHttpServerRequest, - _rx: &oneshot::Sender, + request: RequestHttpServerRequest, + _rx: oneshot::Sender, ) -> Result<()> { let peer_id = request .peer_id