diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 37b9b23d..c932859d 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -28,7 +28,6 @@ use std::sync::Arc; use std::time::SystemTime; use tracing::{debug, info, warn}; - use crate::glob; #[derive(Debug)] @@ -39,6 +38,7 @@ pub enum UpdateError { UnsupportedType, PermissionDenied, PermissionExpired, + TimestampTooOld, } #[derive(Debug, Clone)] @@ -201,10 +201,12 @@ impl Entry { if let Some(datapoint) = &update.datapoint { self.validate_value(&datapoint.value)?; self.validate_allowed(&datapoint.value)?; + self.validate_timestamp(datapoint.ts)?; } if let Some(Some(actuatortarget)) = &update.actuator_target { self.validate_value(&actuatortarget.value)?; self.validate_allowed(&actuatortarget.value)?; + self.validate_timestamp(actuatortarget.ts)?; } if let Some(Some(updated_allowed)) = update.allowed.clone() { if Some(updated_allowed.clone()) != self.metadata.allowed { @@ -556,6 +558,18 @@ impl Entry { } } + fn validate_timestamp(&self, timestamp: SystemTime) -> Result<(), UpdateError> { + if self.datapoint.ts > timestamp { + return Err(UpdateError::TimestampTooOld); + } + if let Some(target) = &self.actuator_target { + if target.ts > timestamp { + return Err(UpdateError::TimestampTooOld); + } + } + Ok(()) + } + pub fn apply(&mut self, update: EntryUpdate) -> HashSet { let mut changed = HashSet::new(); if let Some(datapoint) = update.datapoint { @@ -1718,6 +1732,146 @@ mod tests { } } + #[tokio::test] + async fn test_set_values_timestamp() { + let broker = DataBroker::default(); + let broker = broker.authorized_access(&permissions::ALLOW_ALL); + let time1 = SystemTime::now(); + + // if None is provided for timestamp it assumes SystemTime::now() + let id1 = broker + .add_entry( + "test.datapoint1".to_owned(), + DataType::Int32, + ChangeType::OnChange, + EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let id2 = broker + .add_entry( + "test.datapoint2".to_owned(), + DataType::Bool, + ChangeType::OnChange, + EntryType::Actuator, + "Test datapoint 2".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + // wait for 2 sec to get an older timestamp + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; + + let time1_2 = SystemTime::now(); + broker + .update_entries([( + id1, + EntryUpdate { + path: None, + datapoint: Some(Datapoint { + ts: time1_2, + value: DataValue::Int32(100), + }), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await + .expect("setting datapoint #1"); + + broker + .update_entries([( + id2, + EntryUpdate { + path: None, + datapoint: None, + actuator_target: Some(Some(Datapoint { + ts: time1_2, + value: DataValue::Bool(true), + })), + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await + .expect("setting datapoint #2"); + + match broker + .update_entries([( + id1, + EntryUpdate { + path: None, + datapoint: Some(Datapoint { + ts: time1, + value: DataValue::Int32(200), + }), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await + { + Ok(_) => { + panic!("should not have been able to set with older timestamp") + } + Err(e) => match e[0] { + (id, UpdateError::TimestampTooOld) => assert_eq!(id, id1), + _ => panic!( + "should have reported too olt timestamp but got error of type {:?}", + e + ), + }, + } + + match broker + .update_entries([( + id2, + EntryUpdate { + path: None, + datapoint: None, + actuator_target: Some(Some(Datapoint { + ts: time1, + value: DataValue::Bool(true), + })), + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await + { + Ok(_) => { + panic!("should not have been able to set with older timestamp") + } + Err(e) => match e[0] { + (id, UpdateError::TimestampTooOld) => assert_eq!(id, id2), + _ => panic!( + "should have reported too olt timestamp but got error of type {:?}", + e + ), + }, + } + } + #[tokio::test] async fn test_get_set_allowed_values() { let broker = DataBroker::default(); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 79781803..a8c35b0b 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -293,9 +293,7 @@ impl proto::val_server::Val for broker::DataBroker { error: Some(proto::Error { code: 400, reason: String::from("type mismatch"), - message: - "cannot set existing datapoint to value of different type" - .to_string(), + message: String::from("cannot set existing datapoint to value of different type"), }), }, broker::UpdateError::UnsupportedType => DataEntryError { @@ -303,8 +301,7 @@ impl proto::val_server::Val for broker::DataBroker { error: Some(proto::Error { code: 400, reason: String::from("unsupported type"), - message: "cannot set datapoint to value of unsupported type" - .to_string(), + message: String::from("cannot set datapoint to value of unsupported type"), }), }, broker::UpdateError::OutOfBounds => DataEntryError { @@ -331,6 +328,14 @@ impl proto::val_server::Val for broker::DataBroker { message: String::from("Unauthorized"), }), }, + broker::UpdateError::TimestampTooOld => DataEntryError { + path, + error: Some(proto::Error { + code: 400, + reason: String::from("timestamp too old"), + message: String::from("given timestamp is older than the saved timestamp"), + }), + }, }; errors.push(data_entry_error); } diff --git a/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs b/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs index c85888ae..36111ee5 100644 --- a/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs +++ b/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs @@ -312,6 +312,7 @@ impl From<&broker::UpdateError> for proto::DatapointError { broker::UpdateError::OutOfBounds => proto::DatapointError::OutOfBounds, broker::UpdateError::PermissionDenied => proto::DatapointError::AccessDenied, broker::UpdateError::PermissionExpired => proto::DatapointError::AccessDenied, + broker::UpdateError::TimestampTooOld => proto::DatapointError::InternalError, } } }