From 4819ed71e8eea03e82a432d161b9b880d3773111 Mon Sep 17 00:00:00 2001 From: shimomura-shunsuke Date: Sun, 16 Jun 2024 18:49:27 +0900 Subject: [PATCH 1/3] modify to return a error from Uplink::transmit when kble_gs is not connected to the satellite --- tmtc-c2a/src/kble_gs.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tmtc-c2a/src/kble_gs.rs b/tmtc-c2a/src/kble_gs.rs index 376a3065..9bc09a5c 100644 --- a/tmtc-c2a/src/kble_gs.rs +++ b/tmtc-c2a/src/kble_gs.rs @@ -9,7 +9,7 @@ use gaia_ccsds_c2a::{ }; use tokio::{ net::{TcpListener, ToSocketAddrs}, - sync::{broadcast, mpsc}, + sync::{broadcast, mpsc, oneshot}, }; use tracing::{error, info}; @@ -25,7 +25,7 @@ pub fn new() -> (Link, Socket) { } pub struct Socket { - cmd_rx: mpsc::Receiver>, + cmd_rx: mpsc::Receiver<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -36,24 +36,29 @@ impl Socket { let accept_fut = listener.accept(); let leak_fut = async { loop { - self.cmd_rx.recv().await; + if let Some((_, resp_tx)) = self.cmd_rx.recv().await { + if let Err(e) = resp_tx.send(Err(anyhow!("kble socket to satellite is not ready"))){ + break e; + } + } } }; let (incoming, addr) = tokio::select! { - accept = accept_fut => accept?, - _ = leak_fut => unreachable!(), + accept = accept_fut => accept.map_err(|_| anyhow!("response receiver has gone"))?, + resp_res = leak_fut => return resp_res, }; info!("accept kble connection from {addr}"); let wss = tokio_tungstenite::accept_async(incoming).await?; let (mut sink, mut stream) = kble_socket::from_tungstenite(wss); let uplink = async { loop { - let cmd_bytes = self + let (cmd_bytes, resp_tx) = self .cmd_rx .recv() .await .ok_or_else(|| anyhow!("command sender has gone"))?; sink.send(cmd_bytes.into()).await?; + resp_tx.send(Ok(())).map_err(|_| anyhow!("response receiver has gone"))?; } }; let downlink = async { @@ -78,7 +83,7 @@ impl Socket { } pub struct Link { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -123,7 +128,7 @@ impl aos::SyncAndChannelCoding for Downlink { #[derive(Debug, Clone)] pub struct Uplink { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, } #[async_trait::async_trait] @@ -137,7 +142,9 @@ impl tc::SyncAndChannelCoding for Uplink { data: &[u8], ) -> Result<()> { let tf_bytes = build_tf(scid, vcid, frame_type, sequence_number, data)?; - self.cmd_tx.send(tf_bytes).await?; + let (resp_tx, resp_rx) = oneshot::channel(); + self.cmd_tx.send((tf_bytes,resp_tx)).await?; + resp_rx.await??; Ok(()) } } From 21712a3ae7f85e3c06cd2c94bcd0ab4592128cb1 Mon Sep 17 00:00:00 2001 From: shimomura-shunsuke Date: Sun, 16 Jun 2024 18:58:35 +0900 Subject: [PATCH 2/3] cargo fmt --- tmtc-c2a/src/kble_gs.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tmtc-c2a/src/kble_gs.rs b/tmtc-c2a/src/kble_gs.rs index 9bc09a5c..ccf25baa 100644 --- a/tmtc-c2a/src/kble_gs.rs +++ b/tmtc-c2a/src/kble_gs.rs @@ -37,7 +37,9 @@ impl Socket { let leak_fut = async { loop { if let Some((_, resp_tx)) = self.cmd_rx.recv().await { - if let Err(e) = resp_tx.send(Err(anyhow!("kble socket to satellite is not ready"))){ + if let Err(e) = + resp_tx.send(Err(anyhow!("kble socket to satellite is not ready"))) + { break e; } } @@ -58,7 +60,9 @@ impl Socket { .await .ok_or_else(|| anyhow!("command sender has gone"))?; sink.send(cmd_bytes.into()).await?; - resp_tx.send(Ok(())).map_err(|_| anyhow!("response receiver has gone"))?; + resp_tx + .send(Ok(())) + .map_err(|_| anyhow!("response receiver has gone"))?; } }; let downlink = async { @@ -143,7 +147,7 @@ impl tc::SyncAndChannelCoding for Uplink { ) -> Result<()> { let tf_bytes = build_tf(scid, vcid, frame_type, sequence_number, data)?; let (resp_tx, resp_rx) = oneshot::channel(); - self.cmd_tx.send((tf_bytes,resp_tx)).await?; + self.cmd_tx.send((tf_bytes, resp_tx)).await?; resp_rx.await??; Ok(()) } From b46984c16eb5c49c0b60a5c1bfcf23ceafbf4401 Mon Sep 17 00:00:00 2001 From: shimomura-shunsuke Date: Sun, 16 Jun 2024 19:21:04 +0900 Subject: [PATCH 3/3] modify to return sink.send error from kble_gs --- tmtc-c2a/src/kble_gs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tmtc-c2a/src/kble_gs.rs b/tmtc-c2a/src/kble_gs.rs index ccf25baa..5bd2df3f 100644 --- a/tmtc-c2a/src/kble_gs.rs +++ b/tmtc-c2a/src/kble_gs.rs @@ -59,9 +59,9 @@ impl Socket { .recv() .await .ok_or_else(|| anyhow!("command sender has gone"))?; - sink.send(cmd_bytes.into()).await?; + let res = sink.send(cmd_bytes.into()).await; resp_tx - .send(Ok(())) + .send(res) .map_err(|_| anyhow!("response receiver has gone"))?; } };