diff --git a/tmtc-c2a/src/kble_gs.rs b/tmtc-c2a/src/kble_gs.rs index 376a3065..5bd2df3f 100644 --- a/tmtc-c2a/src/kble_gs.rs +++ b/tmtc-c2a/src/kble_gs.rs @@ -9,7 +9,7 @@ use gaia_ccsds_c2a::{ }; use tokio::{ net::{TcpListener, ToSocketAddrs}, - sync::{broadcast, mpsc}, + sync::{broadcast, mpsc, oneshot}, }; use tracing::{error, info}; @@ -25,7 +25,7 @@ pub fn new() -> (Link, Socket) { } pub struct Socket { - cmd_rx: mpsc::Receiver>, + cmd_rx: mpsc::Receiver<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -36,24 +36,33 @@ impl Socket { let accept_fut = listener.accept(); let leak_fut = async { loop { - self.cmd_rx.recv().await; + if let Some((_, resp_tx)) = self.cmd_rx.recv().await { + if let Err(e) = + resp_tx.send(Err(anyhow!("kble socket to satellite is not ready"))) + { + break e; + } + } } }; let (incoming, addr) = tokio::select! { - accept = accept_fut => accept?, - _ = leak_fut => unreachable!(), + accept = accept_fut => accept.map_err(|_| anyhow!("response receiver has gone"))?, + resp_res = leak_fut => return resp_res, }; info!("accept kble connection from {addr}"); let wss = tokio_tungstenite::accept_async(incoming).await?; let (mut sink, mut stream) = kble_socket::from_tungstenite(wss); let uplink = async { loop { - let cmd_bytes = self + let (cmd_bytes, resp_tx) = self .cmd_rx .recv() .await .ok_or_else(|| anyhow!("command sender has gone"))?; - sink.send(cmd_bytes.into()).await?; + let res = sink.send(cmd_bytes.into()).await; + resp_tx + .send(res) + .map_err(|_| anyhow!("response receiver has gone"))?; } }; let downlink = async { @@ -78,7 +87,7 @@ impl Socket { } pub struct Link { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, tlm_tx: broadcast::Sender>, } @@ -123,7 +132,7 @@ impl aos::SyncAndChannelCoding for Downlink { #[derive(Debug, Clone)] pub struct Uplink { - cmd_tx: mpsc::Sender>, + cmd_tx: mpsc::Sender<(Vec, oneshot::Sender>)>, } #[async_trait::async_trait] @@ -137,7 +146,9 @@ impl tc::SyncAndChannelCoding for Uplink { data: &[u8], ) -> Result<()> { let tf_bytes = build_tf(scid, vcid, frame_type, sequence_number, data)?; - self.cmd_tx.send(tf_bytes).await?; + let (resp_tx, resp_rx) = oneshot::channel(); + self.cmd_tx.send((tf_bytes, resp_tx)).await?; + resp_rx.await??; Ok(()) } }