Skip to content

Commit

Permalink
Handle events inside PProxy
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed May 26, 2024
1 parent cba8780 commit b045fff
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::net::SocketAddr;

use litep2p::protocol::request_response::DialOptions;
use litep2p::types::RequestId;
use litep2p::PeerId;
use multiaddr::Multiaddr;
use multiaddr::Protocol;
Expand Down Expand Up @@ -42,6 +44,8 @@ pub enum PProxyCommandResponse {

pub struct PProxy {
command_rx: mpsc::Receiver<(PProxyCommand, oneshot::Sender<PProxyCommandResponse>)>,
#[allow(dead_code)]
tunnel_notifier: HashMap<RequestId, oneshot::Sender<PProxyCommandResponse>>,
p2p_server: P2pServer,
}

Expand All @@ -56,6 +60,7 @@ impl PProxy {
(
Self {
command_rx,
tunnel_notifier: HashMap::new(),
p2p_server: P2pServer::new(server_addr),
},
PProxyHandle { command_tx },
Expand All @@ -70,25 +75,16 @@ impl PProxy {

event = self.p2p_server.next_event() => match event {
None => return,
Some(ref event) => if let Err (error) = self.handle_p2p_server_event(event).await {
Some(ref event) => if let Err(error) = self.handle_p2p_server_event(event).await {
warn!("failed to handle event {:?}: {:?}", event, error);

}
},

command = self.command_rx.recv() => match command {
None => return,
Some((ref command, ref tx)) => match command {
PProxyCommand::AddPeer(request) => {
if let Err(error) = self.on_add_peer(request, tx).await {
warn!("failed to handle command {:?}: {:?}", command, error);
}
}
PProxyCommand::RequestHttpServer(request) => {
if let Err(error) = self.on_request_http_server(request, tx).await {
warn!("failed to handle command {:?}: {:?}", command, error);
}
}
Some((ref command, tx)) => if let Err(error) = self.handle_command(command, tx).await {
warn!("failed to handle command {:?}: {:?}", command, error);
}
}
}
Expand All @@ -100,11 +96,24 @@ impl PProxy {
Ok(())
}

async fn handle_command(
&mut self,
command: &PProxyCommand,
tx: oneshot::Sender<PProxyCommandResponse>,
) -> Result<()> {
match command {
PProxyCommand::AddPeer(request) => self.on_add_peer(request.clone(), tx).await,
PProxyCommand::RequestHttpServer(request) => {
self.on_request_http_server(request.clone(), tx).await
}
}
}

async fn on_add_peer(
&mut self,
request: &AddPeerRequest,
_rx: &oneshot::Sender<PProxyCommandResponse>,
) -> Result<AddPeerResponse> {
request: AddPeerRequest,
tx: oneshot::Sender<PProxyCommandResponse>,
) -> Result<()> {
let addr: Multiaddr = request
.address
.parse()
Expand All @@ -123,15 +132,16 @@ impl PProxy {
.litep2p
.add_known_address(peer_id, vec![addr].into_iter());

Ok(AddPeerResponse {
tx.send(PProxyCommandResponse::AddPeer(AddPeerResponse {
peer_id: peer_id.to_string(),
})
}))
.map_err(|_| Error::EssentialTaskClosed)
}

async fn on_request_http_server(
&mut self,
request: &RequestHttpServerRequest,
_rx: &oneshot::Sender<PProxyCommandResponse>,
request: RequestHttpServerRequest,
_rx: oneshot::Sender<PProxyCommandResponse>,
) -> Result<()> {
let peer_id = request
.peer_id
Expand Down

0 comments on commit b045fff

Please sign in to comment.