diff --git a/Cargo.lock b/Cargo.lock index 3cb76bb5..d9169041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2157,6 +2157,7 @@ dependencies = [ "structpack", "tlmcmddb", "tokio", + "tokio-stream", "tokio-tungstenite", "tonic", "tonic-build", @@ -2222,9 +2223,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 09780be5..deac9c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ license = "MPL-2.0" [workspace.dependencies] structpack = "1.0" -gaia-stub = "1.0" -gaia-ccsds-c2a = "1.0" -gaia-tmtc = "1.0" +gaia-stub = { path = "gaia-stub" } +gaia-ccsds-c2a = { path = "gaia-ccsds-c2a" } +gaia-tmtc = { path = "gaia-tmtc" } c2a-devtools-frontend = "1.0" diff --git a/gaia-stub/proto/broker.proto b/gaia-stub/proto/broker.proto index 6991bd04..298c4700 100644 --- a/gaia-stub/proto/broker.proto +++ b/gaia-stub/proto/broker.proto @@ -11,6 +11,10 @@ service Broker { rpc OpenCommandStream(stream CommandStreamRequest) returns (stream CommandStreamResponse); rpc PostTelemetry(PostTelemetryRequest) returns (PostTelemetryResponse); + + rpc PostSetVR(PostSetVRRequest) returns (PostSetVRResponse); + rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse); + rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent); } message PostCommandRequest { @@ -52,3 +56,33 @@ message GetLastReceivedTelemetryRequest { message GetLastReceivedTelemetryResponse { tco_tmiv.Tmiv tmiv = 1; } + +message PostSetVRRequest { + uint32 vr = 1; +} + +message PostSetVRResponse { +} + +message PostADCommandRequest { + tco_tmiv.Tco tco = 1; +} + +message PostADCommandResponse { + bool success = 1; + uint64 frame_id = 2; +} + +message SubscribeFopFrameEventsRequest { +} + +message FopFrameEvent { + uint64 frame_id = 1; + enum EventType { + TRANSMIT = 0; + ACKNOWLEDGED = 1; + RETRANSMIT = 2; + CANCEL = 3; + }; + EventType event_type = 2; +} diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index d9f429bc..58c405db 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -3,6 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use anyhow::Result; use futures::prelude::*; use gaia_stub::tco_tmiv::Tco; +use std::pin::Pin; use tokio::sync::Mutex; use tokio_stream::wrappers::BroadcastStream; use tonic::{Request, Response, Status, Streaming}; @@ -11,36 +12,61 @@ use super::telemetry::{self, LastTmivStore}; pub use gaia_stub::broker::*; -pub struct BrokerService { +pub struct BrokerService { cmd_handler: Mutex, + fop_command_service: Mutex, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, } -impl BrokerService { +impl BrokerService { pub fn new( cmd_service: C, + fop_command_service: F, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, ) -> Self { Self { cmd_handler: Mutex::new(cmd_service), + fop_command_service: Mutex::new(fop_command_service), tlm_bus, last_tmiv_store, } } } +use async_trait::async_trait; +pub enum FopFrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} + +#[async_trait] +pub trait FopCommandService { + async fn send_set_vr(&mut self, value: u8); + + async fn send_ad_command(&mut self, tco: Tco) -> Result; + + async fn subscribe_frame_events( + &mut self, + ) -> Result + Send>>>; +} + #[tonic::async_trait] -impl broker_server::Broker for BrokerService +impl broker_server::Broker for BrokerService where C: super::Handle> + Send + Sync + 'static, C::Response: Send + 'static, + F: FopCommandService + Send + Sync + 'static, { type OpenCommandStreamStream = stream::BoxStream<'static, Result>; type OpenTelemetryStreamStream = stream::BoxStream<'static, Result>; + type SubscribeFopFrameEventsStream = + stream::BoxStream<'static, Result>; #[tracing::instrument(skip(self))] async fn post_command( @@ -66,6 +92,50 @@ where Ok(Response::new(PostCommandResponse {})) } + #[tracing::instrument(skip(self))] + async fn post_set_vr( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + let value = message.vr; + self.fop_command_service + .lock() + .await + .send_set_vr(value as _) + .await; + Ok(Response::new(PostSetVrResponse {})) + } + + #[tracing::instrument(skip(self))] + async fn post_ad_command( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + + let tco = message + .tco + .ok_or_else(|| Status::invalid_argument("tco is required"))?; + + fn internal_error(e: E) -> Status { + Status::internal(format!("{:?}", e)) + } + let id = self + .fop_command_service + .lock() + .await + .send_ad_command(tco) + .await + .map_err(internal_error)?; + + tracing::info!("AD command sent"); + Ok(Response::new(PostAdCommandResponse { + success: true, + frame_id: id, + })) + } + #[tracing::instrument(skip(self))] async fn open_telemetry_stream( &self, @@ -115,4 +185,33 @@ where Err(Status::not_found("not received yet")) } } + + #[tracing::instrument(skip(self))] + async fn subscribe_fop_frame_events( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + use futures::StreamExt; + let stream = self + .fop_command_service + .lock() + .await + .subscribe_frame_events() + .await + .map_err(|_| Status::internal("failed to subscribe frame events"))?; + use gaia_stub::broker::fop_frame_event::EventType; + let stream = stream.map(|e| { + let (frame_id, event_type) = match e { + FopFrameEvent::Transmit(id) => (id, EventType::Transmit), + FopFrameEvent::Acknowledged(id) => (id, EventType::Acknowledged), + FopFrameEvent::Retransmit(id) => (id, EventType::Retransmit), + FopFrameEvent::Cancel(id) => (id, EventType::Cancel), + }; + Ok(gaia_stub::broker::FopFrameEvent { + frame_id, + event_type: event_type.into(), + }) + }); + Ok(Response::new(stream.boxed())) + } } diff --git a/tmtc-c2a/Cargo.toml b/tmtc-c2a/Cargo.toml index 61bea911..811a1187 100644 --- a/tmtc-c2a/Cargo.toml +++ b/tmtc-c2a/Cargo.toml @@ -43,6 +43,7 @@ tokio-tungstenite = "0.20.1" itertools = "0.12.1" notalawyer = "0.1.0" notalawyer-clap = "0.1.0" +tokio-stream = { version = "0.1.16", features = ["sync"] } [build-dependencies] tonic-build = "0.11" diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs new file mode 100644 index 00000000..c8354832 --- /dev/null +++ b/tmtc-c2a/src/fop1.rs @@ -0,0 +1,333 @@ +use anyhow::Result; +use gaia_ccsds_c2a::ccsds::tc::{self, clcw::CLCW}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::broadcast; + +fn wrapping_le(a: u8, b: u8) -> bool { + let diff = b.wrapping_sub(a); + diff < 128 +} + +fn wrapping_lt(a: u8, b: u8) -> bool { + a != b && wrapping_le(a, b) +} + +fn remove_acknowledged_frames( + queue: &mut VecDeque, + acknowledged_fsn: u8, + on_acknowledge: impl Fn(u64), +) -> usize { + let mut ack_count = 0; + while !queue.is_empty() { + let front = queue.front().unwrap(); + if wrapping_lt(front.sequence_number, acknowledged_fsn) { + ack_count += 1; + let frame = queue.pop_front().unwrap().frame; + on_acknowledge(frame.id); + } else { + break; + } + } + ack_count +} + +#[derive(Clone, Copy)] +struct FarmState { + next_expected_fsn: u8, + _lockout: bool, + _wait: bool, + retransmit: bool, +} + +enum FopState { + Initial, + Active(ActiveState), + Retransmit(RetransmitState), + Initializing { expected_nr: u8 }, +} + +struct SentFrame { + frame: Arc, + sent_at: std::time::Instant, + sequence_number: u8, +} + +struct ActiveState { + next_fsn: u8, + sent_queue: VecDeque, +} + +struct RetransmitState { + next_fsn: u8, + retransmit_count: usize, + retransmit_sent_queue: VecDeque, + retransmit_wait_queue: VecDeque, +} + +impl ActiveState { + fn acknowledge(&mut self, acknowledged_fsn: u8, on_acknowledge: impl Fn(u64)) { + remove_acknowledged_frames(&mut self.sent_queue, acknowledged_fsn, on_acknowledge); + } + + fn send( + &mut self, + next_frame_id: &mut u64, + data_field: Vec, + on_transmit: impl Fn(u64), + ) -> Option> { + let fsn = self.next_fsn; + self.next_fsn = self.next_fsn.wrapping_add(1); + let frame = Frame { + id: *next_frame_id, + frame_type: tc::sync_and_channel_coding::FrameType::TypeAD, + sequence_number: fsn, + data_field, + }; + *next_frame_id += 1; + let frame = Arc::new(frame); + on_transmit(frame.id); + self.sent_queue.push_back(SentFrame { + frame: frame.clone(), + sent_at: std::time::Instant::now(), + sequence_number: fsn, + }); + + Some(frame) + } + + fn timeout(&self) -> bool { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + if let Some(head) = self.sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + return true; + } + } + false + } +} + +impl RetransmitState { + fn acknowledge( + &mut self, + acknowledged_fsn: u8, + retransmit: bool, + on_acknowledge: impl Fn(u64), + ) -> bool { + let ack_count = remove_acknowledged_frames( + &mut self.retransmit_wait_queue, + acknowledged_fsn, + &on_acknowledge, + ) + remove_acknowledged_frames( + &mut self.retransmit_sent_queue, + acknowledged_fsn, + &on_acknowledge, + ); + if ack_count > 0 { + self.retransmit_count = 0; + } + + if !retransmit { + return self.retransmit_wait_queue.is_empty() && self.retransmit_sent_queue.is_empty(); + } + + if ack_count > 0 { + self.redo_retransmit(); + } + false + } + + fn redo_retransmit(&mut self) { + self.retransmit_count += 1; + // prepend sent_queue to wait_queue + // but the library doesn't provide "prepend" method... + self.retransmit_sent_queue + .append(&mut self.retransmit_wait_queue); + std::mem::swap( + &mut self.retransmit_sent_queue, + &mut self.retransmit_wait_queue, + ); + } + + fn update(&mut self) -> Option> { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); + if let Some(head) = self.retransmit_sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + self.redo_retransmit(); + } + } + + let mut next_retransmit = self.retransmit_wait_queue.pop_front()?; + let frame = next_retransmit.frame.clone(); + next_retransmit.sent_at = std::time::Instant::now(); + self.retransmit_sent_queue.push_back(next_retransmit); + Some(frame) + } +} + +pub(crate) struct Fop { + next_frame_id: u64, + state: FopState, + last_received_farm_state: Option, + event_sender: broadcast::Sender, +} + +impl Fop { + pub(crate) fn new() -> Self { + let (event_sender, _) = broadcast::channel(16); + Self { + next_frame_id: 0, + state: FopState::Initial, + last_received_farm_state: None, + event_sender, + } + } + + pub(crate) fn subscribe_frame_events(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + pub(crate) async fn handle_clcw(&mut self, clcw: CLCW) -> Result<()> { + tracing::debug!("Received CLCW: {:?}", clcw); + let farm_state = FarmState { + next_expected_fsn: clcw.report_value(), + _lockout: clcw.lockout() != 0, + _wait: clcw.wait() != 0, + retransmit: clcw.retransmit() != 0, + }; + self.last_received_farm_state = Some(farm_state); + + let on_acknowledge = |frame_id| { + self.event_sender + .send(FrameEvent::Acknowledged(frame_id)) + .ok(); + }; + + match &mut self.state { + FopState::Initial => { + // do nothing + } + FopState::Initializing { expected_nr } => { + if farm_state.next_expected_fsn == *expected_nr { + tracing::info!("FOP initialized"); + self.state = FopState::Active(ActiveState { + next_fsn: *expected_nr, + sent_queue: VecDeque::new(), + }); + } + } + FopState::Active(state) => { + state.acknowledge(farm_state.next_expected_fsn, on_acknowledge); + if farm_state.retransmit { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + FopState::Retransmit(state) => { + let completed = state.acknowledge( + farm_state.next_expected_fsn, + farm_state.retransmit, + on_acknowledge, + ); + if completed { + self.state = FopState::Active(ActiveState { + next_fsn: state.next_fsn, + sent_queue: VecDeque::new(), + }); + } + } + } + Ok(()) + } + + pub(crate) fn set_vr(&mut self, vr: u8) -> Option { + tracing::info!("Setting VR to {}", vr); + let mut canceled_frames = VecDeque::new(); + match &mut self.state { + FopState::Initializing { .. } => { + return None; + } + FopState::Initial => { + // do nothing + } + FopState::Active(state) => { + canceled_frames.append(&mut state.sent_queue); + } + FopState::Retransmit(state) => { + canceled_frames.append(&mut state.retransmit_sent_queue); + canceled_frames.append(&mut state.retransmit_wait_queue); + } + } + + for frame in canceled_frames { + self.event_sender + .send(FrameEvent::Cancel(frame.frame.id)) + .ok(); + } + + self.state = FopState::Initializing { expected_nr: vr }; + let frame = Frame { + //TODO: manage BC retransmission and frame id for setvr command + //id: self.next_frame_id, + id: 0, + frame_type: tc::sync_and_channel_coding::FrameType::TypeBC, + // TODO: frame number of setvr command??? + sequence_number: 0, + data_field: vec![0x82, 0x00, vr], + }; + Some(frame) + } + + pub(crate) fn send_ad(&mut self, data_field: Vec) -> Option> { + let state = match &mut self.state { + FopState::Active(state) => state, + _ => return None, + }; + + state.send(&mut self.next_frame_id, data_field, |frame_id| { + self.event_sender.send(FrameEvent::Transmit(frame_id)).ok(); + }) + } + + pub(crate) fn update(&mut self) -> Option> { + if let FopState::Active(state) = &mut self.state { + if state.timeout() { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + + let frame = match &mut self.state { + FopState::Retransmit(state) => state.update(), + _ => None, + }; + let frame = frame?; + self.event_sender + .send(FrameEvent::Retransmit(frame.id)) + .ok(); + Some(frame) + } +} + +pub struct Frame { + pub id: u64, + pub frame_type: tc::sync_and_channel_coding::FrameType, + pub sequence_number: u8, + pub data_field: Vec, +} + +#[derive(Debug, Clone)] +pub enum FrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} diff --git a/tmtc-c2a/src/lib.rs b/tmtc-c2a/src/lib.rs index 57aac898..ed1b3ce7 100644 --- a/tmtc-c2a/src/lib.rs +++ b/tmtc-c2a/src/lib.rs @@ -1,5 +1,6 @@ mod satconfig; pub use satconfig::Satconfig; +mod fop1; pub mod kble_gs; pub mod proto; pub mod registry; diff --git a/tmtc-c2a/src/main.rs b/tmtc-c2a/src/main.rs index f893413d..2c4d2703 100644 --- a/tmtc-c2a/src/main.rs +++ b/tmtc-c2a/src/main.rs @@ -126,7 +126,7 @@ async fn main() -> Result<()> { let (link, socket) = kble_gs::new(); let kble_socket_fut = socket.serve((args.kble_addr, args.kble_port)); - let (satellite_svc, sat_tlm_reporter) = satellite::new( + let (satellite_svc, fop_cmd_service, sat_tlm_reporter) = satellite::new( satconfig.aos_scid, satconfig.tc_scid, tlm_registry, @@ -142,7 +142,8 @@ async fn main() -> Result<()> { // Constructing gRPC services let server_task = { - let broker_service = BrokerService::new(cmd_handler, tlm_bus, last_tmiv_store); + let broker_service = + BrokerService::new(cmd_handler, fop_cmd_service, tlm_bus, last_tmiv_store); let broker_server = BrokerServer::new(broker_service); let tmtc_generic_c2a_server = TmtcGenericC2aServer::new(tmtc_generic_c2a_service); diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index 35cd17a9..c00a580e 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -1,4 +1,5 @@ -use std::{sync::Arc, time}; +use std::{pin::Pin, sync::Arc, time}; +use tokio::sync::Mutex; use crate::{ registry::{CommandRegistry, FatCommandSchema, TelemetryRegistry}, @@ -120,7 +121,7 @@ impl<'a> CommandContext<'a> { #[derive(Clone)] pub struct Service { - sync_and_channel_coding: T, + sync_and_channel_coding: Arc>, registry: Arc, tc_scid: u16, } @@ -138,7 +139,8 @@ where fat_schema, tco, }; - ctx.transmit_to(&mut self.sync_and_channel_coding).await?; + ctx.transmit_to(&mut *self.sync_and_channel_coding.lock().await) + .await?; Ok(true) } } @@ -151,21 +153,29 @@ pub fn new( cmd_registry: impl Into>, receiver: R, transmitter: T, -) -> (Service, TelemetryReporter) +) -> (Service, FopCommandService, TelemetryReporter) where - T: tc::SyncAndChannelCoding, + T: tc::SyncAndChannelCoding + Send + 'static, R: aos::SyncAndChannelCoding, { + let registry = cmd_registry.into(); + let transmitter = Arc::new(Mutex::new(transmitter)); + let fop = crate::fop1::Fop::new(); + let fop = Arc::new(Mutex::new(fop)); + let fop_command_service = + FopCommandService::start(fop.clone(), tc_scid, transmitter.clone(), registry.clone()); ( Service { tc_scid, sync_and_channel_coding: transmitter, - registry: cmd_registry.into(), + registry, }, + fop_command_service, TelemetryReporter { aos_scid, receiver, tmiv_builder: TmivBuilder { tlm_registry }, + fop, }, ) } @@ -187,6 +197,7 @@ pub struct TelemetryReporter { aos_scid: u8, tmiv_builder: TmivBuilder, receiver: R, + fop: Arc>, } impl TelemetryReporter @@ -211,6 +222,15 @@ where ); continue; }; + + { + let clcw = tf.trailer.into_ref().clone(); + let mut fop = self.fop.lock().await; + if let Err(e) = fop.handle_clcw(clcw).await { + error!("failed to handle CLCW: {:?}", e); + } + } + let incoming_scid = tf.primary_header.scid(); if incoming_scid != self.aos_scid { warn!("unknown SCID: {incoming_scid}"); @@ -265,3 +285,150 @@ where } } } + +pub struct FopCommandService { + transmitter: Arc>, + tc_scid: u16, + fop: Arc>, + registry: Arc, +} + +impl FopCommandService { + pub(crate) fn start( + fop: Arc>, + tc_scid: u16, + transmitter: Arc>, + registry: Arc, + ) -> Self { + let service = Self { + transmitter, + tc_scid, + fop, + registry, + }; + + tokio::spawn(Self::run_update( + tc_scid, + service.transmitter.clone(), + service.fop.clone(), + )); + service + } + + async fn run_update( + tc_scid: u16, + transmitter: Arc>, + fop: Arc>, + ) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + //tracing::debug!("FopCommandService: update"); + while let Some(frame) = fop.lock().await.update() { + tracing::debug!( + "FopCommandService: retransmitting {}", + frame.sequence_number + ); + let mut transmitter = transmitter.lock().await; + let vcid = 0; + let _ = transmitter + .transmit( + tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + } + } + } +} + +#[async_trait] +impl gaia_tmtc::broker::FopCommandService + for FopCommandService +{ + async fn send_set_vr(&mut self, vr: u8) { + let frame = { + let mut fop = self.fop.lock().await; + let frame = fop.set_vr(vr); + match frame { + Some(frame) => frame, + None => { + //TODO: return error? + return; + } + } + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + //transmitter. + } + + async fn send_ad_command(&mut self, tco: Tco) -> Result { + let Some(fat_schema) = self.registry.lookup(&tco.name) else { + return Err(anyhow!("unknown command: {}", tco.name)); + }; + let ctx = CommandContext { + tc_scid: 0, // dummy + fat_schema, + tco: &tco, + }; + let mut buf = vec![0u8; 1017]; // FIXME: hard-coded max size + let len = ctx.build_tc_segment(&mut buf)?; + buf.truncate(len); + + let mut fop = self.fop.lock().await; + let frame = match fop.send_ad(buf) { + None => { + tracing::warn!("FOP is not ready"); + return Err(anyhow!("FOP is not ready")); + } + Some(frame) => frame, + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + + Ok(frame.id) + } + + async fn subscribe_frame_events( + &mut self, + ) -> Result + Send>>> { + use futures::StreamExt; + let rx = self.fop.lock().await.subscribe_frame_events(); + let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|e| async { + use crate::fop1::FrameEvent; + use gaia_tmtc::broker::FopFrameEvent; + let e = e.ok()?; + let e = match e { + FrameEvent::Transmit(id) => FopFrameEvent::Transmit(id), + FrameEvent::Acknowledged(id) => FopFrameEvent::Acknowledged(id), + FrameEvent::Retransmit(id) => FopFrameEvent::Retransmit(id), + FrameEvent::Cancel(id) => FopFrameEvent::Cancel(id), + }; + Some(e) + }); + Ok(stream.boxed()) + } +}