diff --git a/satrs/src/cfdp/source.rs b/satrs/src/cfdp/source.rs index 7f4ddbb2..7a77704e 100644 --- a/satrs/src/cfdp/source.rs +++ b/satrs/src/cfdp/source.rs @@ -6,11 +6,12 @@ use spacepackets::{ pdu::{ eof::EofPdu, file_data::FileDataPduCreatorWithReservedDatafield, + finished::{DeliveryCode, FileStatus, FinishedPduReader}, metadata::{MetadataGenericParams, MetadataPduCreator}, CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket, }, ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl, - NULL_CHECKSUM_U32, + TransmissionMode, }, util::{UnsignedByteField, UnsignedEnum}, ByteConversionError, @@ -21,9 +22,9 @@ use crate::seq_count::SequenceCountProvider; use super::{ filestore::{FilestoreError, VirtualFilestore}, request::{ReadablePutRequest, StaticPutRequestCacher}, - user::CfdpUser, + user::{CfdpUser, TransactionFinishedParams}, LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig, - RemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, + RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, }; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -38,7 +39,7 @@ pub enum TransactionStep { SendingEof = 6, WaitingForEofAck = 7, WaitingForFinished = 8, - SendingAckOfFinished = 9, + // SendingAckOfFinished = 9, NoticeOfCompletion = 10, } @@ -46,36 +47,33 @@ pub enum TransactionStep { pub struct FileParams { pub progress: u64, pub segment_len: u64, - pub crc32: [u8; 4], + pub crc32: u32, pub metadata_only: bool, pub file_size: u64, pub empty_file: bool, } -impl FileParams { - pub fn reset(&mut self) { - self.progress = 0; - self.segment_len = 0; - self.crc32 = NULL_CHECKSUM_U32; - self.metadata_only = false; - self.file_size = 0; - self.empty_file = false; - } -} - pub struct StateHelper { state: super::State, step: TransactionStep, num_packets_ready: u32, } -#[derive(Debug, Copy, Clone, derive_new::new)] +#[derive(Debug)] +pub struct FinishedParams { + condition_code: ConditionCode, + delivery_code: DeliveryCode, + file_status: FileStatus, +} + +#[derive(Debug, derive_new::new)] pub struct TransferState { transaction_id: TransactionId, remote_cfg: RemoteEntityConfig, transmission_mode: super::TransmissionMode, closure_requested: bool, cond_code_eof: Option, + finished_params: Option, } impl Default for StateHelper { @@ -95,8 +93,11 @@ pub enum SourceError { pdu_type: PduType, directive_type: Option, }, - #[error("unexpected file data PDU")] - UnexpectedFileDataPdu, + #[error("unexpected PDU")] + UnexpectedPdu { + pdu_type: PduType, + directive_type: Option, + }, #[error("source handler is already busy with put request")] PutRequestAlreadyActive, #[error("error caching put request")] @@ -128,7 +129,7 @@ pub struct SourceHandler< > { local_cfg: LocalEntityConfig, pdu_sender: PduSender, - pdu_buffer: RefCell>, + pdu_and_cksum_buffer: RefCell>, put_request_cacher: StaticPutRequestCacher, remote_cfg_table: RemoteCfgTable, vfs: Vfs, @@ -155,7 +156,7 @@ impl< pdu_sender: PduSender, vfs: Vfs, put_request_cacher: StaticPutRequestCacher, - max_pdu_len: usize, + pdu_and_cksum_buf_size: usize, remote_cfg_table: RemoteCfgTable, seq_count_provider: SeqCountProvider, ) -> Self { @@ -163,7 +164,7 @@ impl< local_cfg: cfg, remote_cfg_table, pdu_sender, - pdu_buffer: RefCell::new(alloc::vec![0; max_pdu_len]), + pdu_and_cksum_buffer: RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]), vfs, put_request_cacher, state_helper: Default::default(), @@ -213,7 +214,10 @@ impl< if packet_info.pdu_type() == PduType::FileData { // The [PacketInfo] API should ensure that file data PDUs can not be passed // into a source entity, so this should never happen. - return Err(SourceError::UnexpectedFileDataPdu); + return Err(SourceError::UnexpectedPdu { + pdu_type: PduType::FileData, + directive_type: None, + }); } // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is // always a valid value. @@ -221,7 +225,7 @@ impl< .pdu_directive() .expect("PDU directive type unexpectedly not set") { - FileDirectiveType::FinishedPdu => self.handle_finished_pdu(), + FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_info)?, FileDirectiveType::NakPdu => self.handle_nak_pdu(), FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::AckPdu => todo!("acknowledged mode not implemented yet"), @@ -287,13 +291,14 @@ impl< transmission_mode, closure_requested, None, + None, )); Ok(()) } #[inline] pub fn transmission_mode(&self) -> Option { - self.tstate.map(|v| v.transmission_mode) + self.tstate.as_ref().map(|v| v.transmission_mode) } fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { @@ -315,18 +320,114 @@ impl< } } if self.state_helper.step == TransactionStep::SendingEof { - // TODO: Checksum calculation using VFS. - self.prepare_and_send_eof_pdu(0)?; + self.eof_fsm(cfdp_user)?; + return Ok(1); + } + if self.state_helper.step == TransactionStep::WaitingForFinished { + /* + def _handle_wait_for_finish(self): + if ( + self.transmission_mode == TransmissionMode.ACKNOWLEDGED + and self.__handle_retransmission() + ): + return + if ( + self._inserted_pdu.pdu is None + or self._inserted_pdu.pdu_directive_type is None + or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU + ): + if self._params.check_timer is not None: + if self._params.check_timer.timed_out(): + self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED) + return + finished_pdu = self._inserted_pdu.to_finished_pdu() + self._inserted_pdu.pdu = None + self._params.finished_params = finished_pdu.finished_params + if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: + self._prepare_finished_ack_packet(finished_pdu.condition_code) + self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED + else: + self.states.step = TransactionStep.NOTICE_OF_COMPLETION + */ + } + if self.state_helper.step == TransactionStep::NoticeOfCompletion { + self.notice_of_completion(cfdp_user); + /* + def _notice_of_completion(self): + if self.cfg.indication_cfg.transaction_finished_indication_required: + assert self._params.transaction_id is not None + # This happens for unacknowledged file copy operation with no closure. + if self._params.finished_params is None: + self._params.finished_params = FinishedParams( + condition_code=ConditionCode.NO_ERROR, + delivery_code=DeliveryCode.DATA_COMPLETE, + file_status=FileStatus.FILE_STATUS_UNREPORTED, + ) + indication_params = TransactionFinishedParams( + transaction_id=self._params.transaction_id, + finished_params=self._params.finished_params, + ) + self.user.transaction_finished_indication(indication_params) + # Transaction finished + self.reset() + */ } Ok(0) } + fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> { + let tstate = self.tstate.as_ref().unwrap(); + let checksum = self.vfs.calculate_checksum( + self.put_request_cacher.source_file().unwrap(), + tstate.remote_cfg.default_crc_type, + self.pdu_and_cksum_buffer.get_mut(), + )?; + self.prepare_and_send_eof_pdu(checksum)?; + let tstate = self.tstate.as_ref().unwrap(); + if self.local_cfg.indication_cfg.eof_sent { + cfdp_user.eof_sent_indication(&tstate.transaction_id); + } + if tstate.transmission_mode == TransmissionMode::Unacknowledged { + if tstate.closure_requested { + // TODO: Check timer handling. + self.state_helper.step = TransactionStep::WaitingForFinished; + } else { + self.state_helper.step = TransactionStep::NoticeOfCompletion; + } + } else { + // TODO: Start positive ACK procedure. + } + /* + if self.cfg.indication_cfg.eof_sent_indication_required: + assert self._params.transaction_id is not None + self.user.eof_sent_indication(self._params.transaction_id) + if self.transmission_mode == TransmissionMode.UNACKNOWLEDGED: + if self._params.closure_requested: + assert self._params.remote_cfg is not None + self._params.check_timer = ( + self.check_timer_provider.provide_check_timer( + local_entity_id=self.cfg.local_entity_id, + remote_entity_id=self._params.remote_cfg.entity_id, + entity_type=EntityType.SENDING, + ) + ) + self.states.step = TransactionStep.WAITING_FOR_FINISHED + else: + self.states.step = TransactionStep.NOTICE_OF_COMPLETION + else: + self._start_positive_ack_procedure() + */ + Ok(()) + } + fn handle_transaction_start( &mut self, cfdp_user: &mut impl CfdpUser, ) -> Result<(), SourceError> { - self.fparams.reset(); - let tstate = &self.tstate.expect("transfer state unexpectedly empty"); + let tstate = self + .tstate + .as_ref() + .expect("transfer state unexpectedly empty"); if !self.put_request_cacher.has_source_file() { self.fparams.metadata_only = true; self.fparams.empty_file = true; @@ -435,6 +536,30 @@ impl< Ok(ControlFlow::Continue(())) } + fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) { + let tstate = self.tstate.as_ref().unwrap(); + if self.local_cfg.indication_cfg.transaction_finished { + // The first case happens for unacknowledged file copy operation with no closure. + let finished_params = if tstate.finished_params.is_none() { + TransactionFinishedParams { + id: tstate.transaction_id, + condition_code: ConditionCode::NoError, + delivery_code: DeliveryCode::Complete, + file_status: FileStatus::Unreported, + } + } else { + let finished_params = tstate.finished_params.as_ref().unwrap(); + TransactionFinishedParams { + id: tstate.transaction_id, + condition_code: finished_params.condition_code, + delivery_code: finished_params.delivery_code, + file_status: finished_params.file_status, + } + }; + cfdp_user.transaction_finished_indication(&finished_params); + } + } + fn send_progressing_file_data_pdu(&mut self) -> Result { // Should never be called, but use defensive programming here. if self.fparams.progress >= self.fparams.file_size { @@ -447,7 +572,6 @@ impl< } else { self.fparams.segment_len }; - // TODO: Send File Data PDU. let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata( PduHeader::new_for_file_data( self.pdu_conf, @@ -458,7 +582,8 @@ impl< self.fparams.progress, read_len, ); - let mut unwritten_pdu = pdu_creator.write_to_bytes_partially(self.pdu_buffer.get_mut())?; + let mut unwritten_pdu = + pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?; self.vfs.read_data( self.put_request_cacher.source_file().unwrap(), self.fparams.progress, @@ -469,7 +594,7 @@ impl< self.pdu_sender.send_pdu( PduType::FileData, None, - &self.pdu_buffer.borrow()[0..written_len], + &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; self.fparams.progress += read_len; /* @@ -521,7 +646,6 @@ impl< .tstate .as_ref() .expect("transfer state unexpectedly empty"); - //let checksum_u32 = u32::from_be_bytes(self.fparams.crc32); let eof_pdu = EofPdu::new( PduHeader::new_no_file_data(self.pdu_conf, 0), tstate.cond_code_eof.unwrap_or(ConditionCode::NoError), @@ -530,23 +654,11 @@ impl< None, ); self.pdu_send_helper(&eof_pdu)?; - /* - * - assert self._params.cond_code_eof is not None - self._add_packet_to_be_sent( - EofPdu( - file_checksum=checksum, - file_size=self._params.fp.progress, - pdu_conf=self._params.pdu_conf, - condition_code=self._params.cond_code_eof, - ) - ) - */ Ok(()) } fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), PduError> { - let mut pdu_buffer_mut = self.pdu_buffer.borrow_mut(); + let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut(); let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?; self.pdu_sender.send_pdu( PduType::FileDirective, @@ -556,11 +668,52 @@ impl< Ok(()) } - fn handle_finished_pdu(&mut self) {} + fn handle_finished_pdu(&mut self, packet_info: &PacketInfo) -> Result<(), SourceError> { + // Ignore this packet when we are idle. + if self.state_helper.state == State::Idle { + return Ok(()); + } + if self.state_helper.step != TransactionStep::WaitingForFinished { + return Err(SourceError::UnexpectedPdu { + pdu_type: PduType::FileDirective, + directive_type: Some(FileDirectiveType::FinishedPdu), + }); + } + let finished_pdu = FinishedPduReader::new(packet_info.raw_packet())?; + // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE + // mode. + self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { + condition_code: finished_pdu.condition_code(), + delivery_code: finished_pdu.delivery_code(), + file_status: finished_pdu.file_status(), + }); + if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged { + // TODO: Send ACK packet here immediately and continue. + //self.state_helper.step = TransactionStep::SendingAckOfFinished; + } + self.state_helper.step = TransactionStep::NoticeOfCompletion; + + /* + if self.transmission_mode == TransmissionMode.ACKNOWLEDGED: + self._prepare_finished_ack_packet(finished_pdu.condition_code) + self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED + else: + self.states.step = TransactionStep.NOTICE_OF_COMPLETION + */ + Ok(()) + } fn handle_nak_pdu(&mut self) {} fn handle_keep_alive_pdu(&mut self) {} + + /// This function is public to allow completely resetting the handler, but it is explicitely + /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. + pub fn reset(&mut self) { + self.state_helper = Default::default(); + self.tstate = None; + self.fparams = Default::default(); + } } #[cfg(test)]