Skip to content

Commit

Permalink
Experiment implementation of FOP-1
Browse files Browse the repository at this point in the history
  • Loading branch information
kobkaz committed Oct 17, 2024
1 parent 9901ce9 commit d4927a0
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 16 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ license = "MPL-2.0"

[workspace.dependencies]
structpack = "1.0"
gaia-stub = "1.0"
gaia-ccsds-c2a = "1.0"
gaia-tmtc = "1.0"
gaia-stub = { path = "gaia-stub" }
gaia-ccsds-c2a = { path = "gaia-ccsds-c2a" }
gaia-tmtc = { path = "gaia-tmtc" }
c2a-devtools-frontend = "1.0"
34 changes: 34 additions & 0 deletions gaia-stub/proto/broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ service Broker {

rpc OpenCommandStream(stream CommandStreamRequest) returns (stream CommandStreamResponse);
rpc PostTelemetry(PostTelemetryRequest) returns (PostTelemetryResponse);

rpc PostSetVR(PostSetVRRequest) returns (PostSetVRResponse);
rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse);
rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent);
}

message PostCommandRequest {
Expand Down Expand Up @@ -52,3 +56,33 @@ message GetLastReceivedTelemetryRequest {
message GetLastReceivedTelemetryResponse {
tco_tmiv.Tmiv tmiv = 1;
}

message PostSetVRRequest {
uint32 vr = 1;
}

message PostSetVRResponse {
}

message PostADCommandRequest {
tco_tmiv.Tco tco = 1;
}

message PostADCommandResponse {
bool success = 1;
uint64 frame_id = 2;
}

message SubscribeFopFrameEventsRequest {
}

message FopFrameEvent {
uint64 frame_id = 1;
enum EventType {
TRANSMIT = 0;
ACKNOWLEDGED = 1;
RETRANSMIT = 2;
CANCEL = 3;
};
EventType event_type = 2;
}
105 changes: 102 additions & 3 deletions gaia-tmtc/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt::Debug, sync::Arc};
use anyhow::Result;
use futures::prelude::*;
use gaia_stub::tco_tmiv::Tco;
use std::pin::Pin;
use tokio::sync::Mutex;
use tokio_stream::wrappers::BroadcastStream;
use tonic::{Request, Response, Status, Streaming};
Expand All @@ -11,36 +12,61 @@ use super::telemetry::{self, LastTmivStore};

pub use gaia_stub::broker::*;

pub struct BrokerService<C> {
pub struct BrokerService<C, F> {
cmd_handler: Mutex<C>,
fop_command_service: Mutex<F>,
tlm_bus: telemetry::Bus,
last_tmiv_store: Arc<LastTmivStore>,
}

impl<C> BrokerService<C> {
impl<C, F> BrokerService<C, F> {
pub fn new(
cmd_service: C,
fop_command_service: F,
tlm_bus: telemetry::Bus,
last_tmiv_store: Arc<LastTmivStore>,
) -> Self {
Self {
cmd_handler: Mutex::new(cmd_service),
fop_command_service: Mutex::new(fop_command_service),
tlm_bus,
last_tmiv_store,
}
}
}

use async_trait::async_trait;
pub enum FopFrameEvent {
Transmit(u64),
Acknowledged(u64),
Retransmit(u64),
Cancel(u64),
}

#[async_trait]
pub trait FopCommandService {
async fn send_set_vr(&mut self, value: u8);

async fn send_ad_command(&mut self, tco: Tco) -> Result<u64>;

async fn subscribe_frame_events(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = FopFrameEvent> + Send>>>;
}

#[tonic::async_trait]
impl<C> broker_server::Broker for BrokerService<C>
impl<C, F> broker_server::Broker for BrokerService<C, F>
where
C: super::Handle<Arc<Tco>> + Send + Sync + 'static,
C::Response: Send + 'static,
F: FopCommandService + Send + Sync + 'static,
{
type OpenCommandStreamStream =
stream::BoxStream<'static, Result<CommandStreamResponse, Status>>;
type OpenTelemetryStreamStream =
stream::BoxStream<'static, Result<TelemetryStreamResponse, Status>>;
type SubscribeFopFrameEventsStream =
stream::BoxStream<'static, Result<gaia_stub::broker::FopFrameEvent, Status>>;

#[tracing::instrument(skip(self))]
async fn post_command(
Expand All @@ -66,6 +92,50 @@ where
Ok(Response::new(PostCommandResponse {}))
}

#[tracing::instrument(skip(self))]
async fn post_set_vr(
&self,
request: Request<PostSetVrRequest>,
) -> Result<Response<PostSetVrResponse>, tonic::Status> {
let message = request.into_inner();
let value = message.vr;
self.fop_command_service
.lock()
.await
.send_set_vr(value as _)
.await;
Ok(Response::new(PostSetVrResponse {}))
}

#[tracing::instrument(skip(self))]
async fn post_ad_command(
&self,
request: Request<PostAdCommandRequest>,
) -> Result<Response<PostAdCommandResponse>, tonic::Status> {
let message = request.into_inner();

let tco = message
.tco
.ok_or_else(|| Status::invalid_argument("tco is required"))?;

fn internal_error<E: Debug>(e: E) -> Status {
Status::internal(format!("{:?}", e))
}
let id = self
.fop_command_service
.lock()
.await
.send_ad_command(tco)
.await
.map_err(internal_error)?;

tracing::info!("AD command sent");
Ok(Response::new(PostAdCommandResponse {
success: true,
frame_id: id,
}))
}

#[tracing::instrument(skip(self))]
async fn open_telemetry_stream(
&self,
Expand Down Expand Up @@ -115,4 +185,33 @@ where
Err(Status::not_found("not received yet"))
}
}

#[tracing::instrument(skip(self))]
async fn subscribe_fop_frame_events(
&self,
_request: tonic::Request<SubscribeFopFrameEventsRequest>,
) -> Result<tonic::Response<Self::SubscribeFopFrameEventsStream>, tonic::Status> {
use futures::StreamExt;
let stream = self
.fop_command_service
.lock()
.await
.subscribe_frame_events()
.await
.map_err(|_| Status::internal("failed to subscribe frame events"))?;
use gaia_stub::broker::fop_frame_event::EventType;
let stream = stream.map(|e| {
let (frame_id, event_type) = match e {
FopFrameEvent::Transmit(id) => (id, EventType::Transmit),
FopFrameEvent::Acknowledged(id) => (id, EventType::Acknowledged),
FopFrameEvent::Retransmit(id) => (id, EventType::Retransmit),
FopFrameEvent::Cancel(id) => (id, EventType::Cancel),
};
Ok(gaia_stub::broker::FopFrameEvent {
frame_id,
event_type: event_type.into(),
})
});
Ok(Response::new(stream.boxed()))
}
}
1 change: 1 addition & 0 deletions tmtc-c2a/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio-tungstenite = "0.20.1"
itertools = "0.12.1"
notalawyer = "0.1.0"
notalawyer-clap = "0.1.0"
tokio-stream = { version = "0.1.16", features = ["sync"] }

[build-dependencies]
tonic-build = "0.11"
Expand Down
Loading

0 comments on commit d4927a0

Please sign in to comment.