Skip to content

Commit ca254ac

Browse files
committed
feat: wiat for response before sending next package
1 parent 57a9746 commit ca254ac

File tree

4 files changed

+74
-41
lines changed

4 files changed

+74
-41
lines changed

proto/tunnel_v1.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ enum TunnelCommand {
66
TUNNEL_COMMAND_CONNECT = 1;
77
TUNNEL_COMMAND_CONNECT_RESP = 2;
88
TUNNEL_COMMAND_PACKAGE = 3;
9-
TUNNEL_COMMAND_BREAK = 4;
9+
TUNNEL_COMMAND_PACKAGE_RESP = 4;
10+
TUNNEL_COMMAND_BREAK = 5;
1011
}
1112

1213
message Tunnel {

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub enum Error {
2929
TunnelNotWaiting(String),
3030
#[error("Tunnel dial failed: {0}")]
3131
TunnelDialFailed(String),
32+
#[error("")]
33+
TunnelContextNotFound(String),
3234
#[error("Tunnel error: {0:?}")]
3335
Tunnel(TunnelError),
3436
#[error("Protobuf decode error: {0}")]

src/lib.rs

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,19 @@ pub enum PProxyCommandResponse {
9292
ExpirePeerAccess {},
9393
}
9494

95+
pub struct TunnelContext {
96+
tx: mpsc::Sender<ChannelPackage>,
97+
outbound_sent_notifier: Option<CommandNotifier>,
98+
}
99+
95100
pub struct PProxy {
96101
command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
97102
command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>,
98103
swarm: Swarm<PProxyNetworkBehaviour>,
99104
proxy_addr: Option<SocketAddr>,
100105
outbound_ready_notifiers: HashMap<request_response::OutboundRequestId, CommandNotifier>,
101106
inbound_tunnels: HashMap<(PeerId, TunnelId), Tunnel>,
102-
tunnel_txs: HashMap<(PeerId, TunnelId), mpsc::Sender<ChannelPackage>>,
107+
tunnel_ctx: HashMap<(PeerId, TunnelId), TunnelContext>,
103108
access_client: Option<AccessClient>,
104109
}
105110

@@ -129,7 +134,7 @@ impl PProxy {
129134
proxy_addr,
130135
outbound_ready_notifiers: HashMap::new(),
131136
inbound_tunnels: HashMap::new(),
132-
tunnel_txs: HashMap::new(),
137+
tunnel_ctx: HashMap::new(),
133138
access_client,
134139
},
135140
PProxyHandle {
@@ -175,7 +180,10 @@ impl PProxy {
175180
tunnel.listen(stream, tunnel_rx).await?;
176181

177182
self.inbound_tunnels.insert((peer_id, tunnel_id), tunnel);
178-
self.tunnel_txs.insert((peer_id, tunnel_id), tunnel_tx);
183+
self.tunnel_ctx.insert((peer_id, tunnel_id), TunnelContext {
184+
tx: tunnel_tx,
185+
outbound_sent_notifier: None,
186+
});
179187

180188
Ok(())
181189
}
@@ -191,7 +199,7 @@ impl PProxy {
191199
&mut self,
192200
peer_id: PeerId,
193201
request: proto::Tunnel,
194-
) -> Result<Option<proto::Tunnel>> {
202+
) -> Result<proto::Tunnel> {
195203
let tunnel_id = request
196204
.tunnel_id
197205
.parse()
@@ -216,11 +224,11 @@ impl PProxy {
216224
}
217225
};
218226

219-
Ok(Some(proto::Tunnel {
227+
Ok(proto::Tunnel {
220228
tunnel_id: tunnel_id.to_string(),
221229
command: proto::TunnelCommand::ConnectResp.into(),
222230
data,
223-
}))
231+
})
224232
}
225233

226234
proto::TunnelCommand::Package => {
@@ -231,16 +239,20 @@ impl PProxy {
231239
return Err(Error::AccessDenied(peer_id.to_string()));
232240
}
233241

234-
let Some(tx) = self.tunnel_txs.get(&(peer_id, tunnel_id)) else {
242+
let Some(ctx) = self.tunnel_ctx.get(&(peer_id, tunnel_id)) else {
235243
return Err(Error::ProtocolNotSupport(
236244
"No tunnel for Package".to_string(),
237245
));
238246
};
239247

240-
tx.send(Ok(request.data.unwrap_or_default())).await?;
248+
ctx.tx.send(Ok(request.data.unwrap_or_default())).await?;
241249

242250
// Have to do this to close the response waiter in remote.
243-
Ok(None)
251+
Ok(proto::Tunnel {
252+
tunnel_id: tunnel_id.to_string(),
253+
command: proto::TunnelCommand::PackageResp.into(),
254+
data: None,
255+
})
244256
}
245257

246258
_ => Err(Error::ProtocolNotSupport(
@@ -263,7 +275,7 @@ impl PProxy {
263275

264276
SwarmEvent::ConnectionClosed { peer_id, .. } => {
265277
self.inbound_tunnels.retain(|(p, _), _| p != &peer_id);
266-
self.tunnel_txs.retain(|(p, _), _| p != &peer_id);
278+
self.tunnel_ctx.retain(|(p, _), _| p != &peer_id);
267279
}
268280

269281
SwarmEvent::Behaviour(PProxyNetworkBehaviourEvent::RequestResponse(
@@ -278,13 +290,13 @@ impl PProxy {
278290
Err(e) => {
279291
if let Ok(tunnel_id) = tunnel_id.parse() {
280292
self.inbound_tunnels.remove(&(peer, tunnel_id));
281-
self.tunnel_txs.remove(&(peer, tunnel_id));
293+
self.tunnel_ctx.remove(&(peer, tunnel_id));
282294
}
283-
Some(proto::Tunnel {
295+
proto::Tunnel {
284296
tunnel_id,
285297
command: proto::TunnelCommand::Break.into(),
286298
data: Some(e.to_string().as_bytes().to_vec()),
287-
})
299+
}
288300
}
289301
};
290302

@@ -299,11 +311,6 @@ impl PProxy {
299311
request_id,
300312
response,
301313
} => {
302-
// This is response of TunnelCommand::Package
303-
let Some(response) = response else {
304-
return Ok(());
305-
};
306-
307314
match response.command() {
308315
proto::TunnelCommand::ConnectResp => {
309316
let tx = self
@@ -326,6 +333,28 @@ impl PProxy {
326333
.map_err(|_| Error::EssentialTaskClosed)?;
327334
}
328335

336+
proto::TunnelCommand::PackageResp => {
337+
let tunnel_id = response.tunnel_id.parse().map_err(|_| {
338+
Error::TunnelIdParseError(response.tunnel_id.clone())
339+
})?;
340+
341+
let Some(ctx) = self.tunnel_ctx.get_mut(&(peer, tunnel_id)) else {
342+
return Err(Error::ProtocolNotSupport(
343+
"No ctx for Package".to_string(),
344+
));
345+
};
346+
347+
let Some(notifier) = ctx.outbound_sent_notifier.take() else {
348+
return Err(Error::ProtocolNotSupport(
349+
"No notifier for Package".to_string(),
350+
));
351+
};
352+
353+
notifier
354+
.send(Ok(PProxyCommandResponse::SendOutboundPackageCommand {}))
355+
.map_err(|_| Error::EssentialTaskClosed)?;
356+
}
357+
329358
proto::TunnelCommand::Break => {
330359
let tunnel_id = response.tunnel_id.parse().map_err(|_| {
331360
Error::TunnelIdParseError(response.tunnel_id.clone())
@@ -338,8 +367,8 @@ impl PProxy {
338367
}
339368

340369
// Terminat connected tunnel
341-
if let Some(tx) = self.tunnel_txs.remove(&(peer, tunnel_id)) {
342-
tx.send(Err(TunnelError::ConnectionAborted)).await?
370+
if let Some(ctx) = self.tunnel_ctx.remove(&(peer, tunnel_id)) {
371+
ctx.tx.send(Err(TunnelError::ConnectionAborted)).await?
343372
};
344373
}
345374

@@ -432,7 +461,10 @@ impl PProxy {
432461
tunnel_tx: mpsc::Sender<ChannelPackage>,
433462
tx: CommandNotifier,
434463
) -> Result<()> {
435-
self.tunnel_txs.insert((peer_id, tunnel_id), tunnel_tx);
464+
self.tunnel_ctx.insert((peer_id, tunnel_id), TunnelContext {
465+
tx: tunnel_tx,
466+
outbound_sent_notifier: None,
467+
});
436468

437469
let request = proto::Tunnel {
438470
tunnel_id: tunnel_id.to_string(),
@@ -465,13 +497,23 @@ impl PProxy {
465497
data: Some(data),
466498
};
467499

500+
let Some(ctx) = self.tunnel_ctx.get_mut(&(peer_id, tunnel_id)) else {
501+
let err_msg = "No ctx for outbound package";
502+
503+
tx.send(Err(Error::TunnelContextNotFound(err_msg.to_string())))
504+
.map_err(|_| Error::EssentialTaskClosed)?;
505+
506+
return Err(Error::TunnelContextNotFound(err_msg.to_string()));
507+
};
508+
509+
ctx.outbound_sent_notifier = Some(tx);
510+
468511
self.swarm
469512
.behaviour_mut()
470513
.request_response
471514
.send_request(&peer_id, request);
472515

473-
tx.send(Ok(PProxyCommandResponse::SendOutboundPackageCommand {}))
474-
.map_err(|_| Error::EssentialTaskClosed)
516+
Ok(())
475517
}
476518

477519
async fn on_expire_peer_access(&mut self, peer_id: PeerId, tx: CommandNotifier) -> Result<()> {

src/p2p/codec.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Codec;
1919
impl libp2p::request_response::Codec for Codec {
2020
type Protocol = StreamProtocol;
2121
type Request = proto::Tunnel;
22-
type Response = Option<proto::Tunnel>;
22+
type Response = proto::Tunnel;
2323

2424
async fn read_request<T>(
2525
&mut self,
@@ -48,13 +48,7 @@ impl libp2p::request_response::Codec for Codec {
4848

4949
io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?;
5050

51-
if vec.is_empty() {
52-
return Ok(None);
53-
}
54-
55-
proto::Tunnel::decode(vec.as_slice())
56-
.map(Some)
57-
.map_err(decode_into_io_error)
51+
proto::Tunnel::decode(vec.as_slice()).map_err(decode_into_io_error)
5852
}
5953

6054
async fn write_request<T>(
@@ -80,14 +74,8 @@ impl libp2p::request_response::Codec for Codec {
8074
where
8175
T: AsyncWrite + Unpin + Send,
8276
{
83-
let mut data = vec![];
84-
85-
if let Some(resp) = resp {
86-
data.extend_from_slice(resp.encode_to_vec().as_slice());
87-
};
88-
77+
let data = resp.encode_to_vec();
8978
io.write_all(data.as_ref()).await?;
90-
9179
Ok(())
9280
}
9381
}
@@ -137,7 +125,7 @@ mod tests {
137125

138126
let (mut a, mut b) = Endpoint::pair(124, 124);
139127
Codec
140-
.write_response(&protocol, &mut a, Some(expected_response.clone()))
128+
.write_response(&protocol, &mut a, expected_response.clone())
141129
.await
142130
.expect("Should write response");
143131
a.close().await.unwrap();
@@ -148,6 +136,6 @@ mod tests {
148136
.expect("Should read response");
149137
b.close().await.unwrap();
150138

151-
assert_eq!(actual_response, Some(expected_response));
139+
assert_eq!(actual_response, expected_response);
152140
}
153141
}

0 commit comments

Comments
 (0)