Skip to content

Commit c593a31

Browse files
committed
Handle events inside PProxy
1 parent cba8780 commit c593a31

File tree

3 files changed

+87
-34
lines changed

3 files changed

+87
-34
lines changed

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub enum Error {
1010
EssentialTaskClosed,
1111
#[error("Litep2p error: {0}")]
1212
Litep2p(#[from] litep2p::Error),
13+
#[error("Litep2p request response error: {0:?}")]
14+
Litep2pRequestResponseError(litep2p::protocol::request_response::RequestResponseError),
1315
#[error("Unexpected response type")]
1416
UnexpectedResponseType,
1517
}
@@ -25,3 +27,9 @@ impl From<tokio::sync::oneshot::error::RecvError> for Error {
2527
Error::EssentialTaskClosed
2628
}
2729
}
30+
31+
impl From<litep2p::protocol::request_response::RequestResponseError> for Error {
32+
fn from(err: litep2p::protocol::request_response::RequestResponseError) -> Self {
33+
Error::Litep2pRequestResponseError(err)
34+
}
35+
}

src/lib.rs

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
use std::collections::HashMap;
12
use std::net::SocketAddr;
23

34
use litep2p::protocol::request_response::DialOptions;
5+
use litep2p::protocol::request_response::RequestResponseEvent;
6+
use litep2p::types::RequestId;
47
use litep2p::PeerId;
58
use multiaddr::Multiaddr;
69
use multiaddr::Protocol;
710
use tokio::sync::mpsc;
811
use tokio::sync::oneshot;
9-
use tracing::trace;
1012
use tracing::warn;
1113

1214
use crate::command::proto::AddPeerRequest;
@@ -29,6 +31,9 @@ const DEFAULT_CHANNEL_SIZE: usize = 4096;
2931
/// Public result type used by the crate.
3032
pub type Result<T> = std::result::Result<T, error::Error>;
3133

34+
type CommandNotification = Result<PProxyCommandResponse>;
35+
type CommandNotifier = oneshot::Sender<CommandNotification>;
36+
3237
#[derive(Debug)]
3338
pub enum PProxyCommand {
3439
AddPeer(AddPeerRequest),
@@ -41,12 +46,13 @@ pub enum PProxyCommandResponse {
4146
}
4247

4348
pub struct PProxy {
44-
command_rx: mpsc::Receiver<(PProxyCommand, oneshot::Sender<PProxyCommandResponse>)>,
49+
command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>,
50+
tunnel_notifier: HashMap<RequestId, CommandNotifier>,
4551
p2p_server: P2pServer,
4652
}
4753

4854
pub struct PProxyHandle {
49-
command_tx: mpsc::Sender<(PProxyCommand, oneshot::Sender<PProxyCommandResponse>)>,
55+
command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
5056
}
5157

5258
impl PProxy {
@@ -56,6 +62,7 @@ impl PProxy {
5662
(
5763
Self {
5864
command_rx,
65+
tunnel_notifier: HashMap::new(),
5966
p2p_server: P2pServer::new(server_addr),
6067
},
6168
PProxyHandle { command_tx },
@@ -70,41 +77,75 @@ impl PProxy {
7077

7178
event = self.p2p_server.next_event() => match event {
7279
None => return,
73-
Some(ref event) => if let Err (error) = self.handle_p2p_server_event(event).await {
80+
Some(ref event) => if let Err(error) = self.handle_p2p_server_event(event).await {
7481
warn!("failed to handle event {:?}: {:?}", event, error);
7582

7683
}
7784
},
7885

7986
command = self.command_rx.recv() => match command {
8087
None => return,
81-
Some((ref command, ref tx)) => match command {
82-
PProxyCommand::AddPeer(request) => {
83-
if let Err(error) = self.on_add_peer(request, tx).await {
84-
warn!("failed to handle command {:?}: {:?}", command, error);
85-
}
86-
}
87-
PProxyCommand::RequestHttpServer(request) => {
88-
if let Err(error) = self.on_request_http_server(request, tx).await {
89-
warn!("failed to handle command {:?}: {:?}", command, error);
90-
}
91-
}
88+
Some((ref command, tx)) => if let Err(error) = self.handle_command(command, tx).await {
89+
warn!("failed to handle command {:?}: {:?}", command, error);
9290
}
9391
}
9492
}
9593
}
9694
}
9795

9896
async fn handle_p2p_server_event(&mut self, event: &P2pServerEvent) -> Result<()> {
99-
trace!("handle P2pServerEvent: {:?}", event);
97+
match event {
98+
P2pServerEvent::TunnelEvent(RequestResponseEvent::RequestReceived {
99+
request_id,
100+
..
101+
}) => self
102+
.p2p_server
103+
.tunnel_handle
104+
.send_response(*request_id, vec![1, 2, 3, 4]),
105+
106+
P2pServerEvent::TunnelEvent(RequestResponseEvent::RequestFailed {
107+
request_id,
108+
error,
109+
..
110+
}) => {
111+
let Some(tx) = self.tunnel_notifier.remove(request_id) else {
112+
todo!();
113+
};
114+
tx.send(Err(Error::Litep2pRequestResponseError(error.clone())))
115+
.map_err(|_| Error::EssentialTaskClosed)?;
116+
}
117+
118+
P2pServerEvent::TunnelEvent(RequestResponseEvent::ResponseReceived {
119+
request_id,
120+
response,
121+
..
122+
}) => {
123+
let Some(tx) = self.tunnel_notifier.remove(request_id) else {
124+
todo!();
125+
};
126+
tx.send(Ok(PProxyCommandResponse::RequestHttpServer(
127+
RequestHttpServerResponse {
128+
data: response.clone(),
129+
},
130+
)))
131+
.map_err(|_| Error::EssentialTaskClosed)?;
132+
}
133+
_ => {}
134+
}
135+
100136
Ok(())
101137
}
102138

103-
async fn on_add_peer(
104-
&mut self,
105-
request: &AddPeerRequest,
106-
_rx: &oneshot::Sender<PProxyCommandResponse>,
107-
) -> Result<AddPeerResponse> {
139+
async fn handle_command(&mut self, command: &PProxyCommand, tx: CommandNotifier) -> Result<()> {
140+
match command {
141+
PProxyCommand::AddPeer(request) => self.on_add_peer(request.clone(), tx).await,
142+
PProxyCommand::RequestHttpServer(request) => {
143+
self.on_request_http_server(request.clone(), tx).await
144+
}
145+
}
146+
}
147+
148+
async fn on_add_peer(&mut self, request: AddPeerRequest, tx: CommandNotifier) -> Result<()> {
108149
let addr: Multiaddr = request
109150
.address
110151
.parse()
@@ -123,27 +164,30 @@ impl PProxy {
123164
.litep2p
124165
.add_known_address(peer_id, vec![addr].into_iter());
125166

126-
Ok(AddPeerResponse {
167+
tx.send(Ok(PProxyCommandResponse::AddPeer(AddPeerResponse {
127168
peer_id: peer_id.to_string(),
128-
})
169+
})))
170+
.map_err(|_| Error::EssentialTaskClosed)
129171
}
130172

131173
async fn on_request_http_server(
132174
&mut self,
133-
request: &RequestHttpServerRequest,
134-
_rx: &oneshot::Sender<PProxyCommandResponse>,
175+
request: RequestHttpServerRequest,
176+
rx: CommandNotifier,
135177
) -> Result<()> {
136178
let peer_id = request
137179
.peer_id
138180
.parse()
139181
.map_err(|_| Error::PeerIdParseError(request.peer_id.clone()))?;
140182

141-
let _request_id = self
183+
let request_id = self
142184
.p2p_server
143185
.tunnel_handle
144186
.send_request(peer_id, request.data.clone(), DialOptions::Dial)
145187
.await?;
146188

189+
self.tunnel_notifier.insert(request_id, rx);
190+
147191
Ok(())
148192
}
149193
}
@@ -156,7 +200,7 @@ impl PProxyHandle {
156200
.send((PProxyCommand::AddPeer(request), tx))
157201
.await?;
158202

159-
let response = rx.await?;
203+
let response = rx.await??;
160204

161205
match response {
162206
PProxyCommandResponse::AddPeer(response) => Ok(response),
@@ -174,7 +218,7 @@ impl PProxyHandle {
174218
.send((PProxyCommand::RequestHttpServer(request), tx))
175219
.await?;
176220

177-
let response = rx.await?;
221+
let response = rx.await??;
178222

179223
match response {
180224
PProxyCommandResponse::RequestHttpServer(response) => Ok(response),

src/server.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures::StreamExt;
55
use litep2p::config::ConfigBuilder;
66
use litep2p::protocol::request_response::Config as RequestResponseConfig;
77
use litep2p::protocol::request_response::ConfigBuilder as RequestResponseConfigBuilder;
8+
use litep2p::protocol::request_response::RequestResponseEvent;
89
use litep2p::protocol::request_response::RequestResponseHandle;
910
use litep2p::transport::quic::config::Config as QuicConfig;
1011
use litep2p::types::protocol::ProtocolName;
@@ -14,8 +15,8 @@ use litep2p::Litep2pEvent;
1415
#[derive(Debug)]
1516
pub enum P2pServerEvent {
1617
Litep2p(Litep2pEvent),
17-
ControlEvent,
18-
TunnelEvent,
18+
ControlEvent(RequestResponseEvent),
19+
TunnelEvent(RequestResponseEvent),
1920
}
2021

2122
pub struct P2pServer {
@@ -74,11 +75,11 @@ impl P2pServer {
7475
ev = self.litep2p.next_event() => {
7576
ev.map(P2pServerEvent::Litep2p)
7677
}
77-
_ev = self.control_handle.next() => {
78-
Some(P2pServerEvent::ControlEvent)
78+
ev = self.control_handle.next() => {
79+
ev.map(P2pServerEvent::ControlEvent)
7980
}
80-
_ev = self.tunnel_handle.next() => {
81-
Some(P2pServerEvent::TunnelEvent)
81+
ev = self.tunnel_handle.next() => {
82+
ev.map(P2pServerEvent::TunnelEvent)
8283
}
8384
}
8485
}

0 commit comments

Comments
 (0)