From 0ed0451f6401bbf3da6171f1a8b1232828f11b17 Mon Sep 17 00:00:00 2001 From: Sebastian Schildt Date: Sun, 27 Oct 2024 12:13:50 +0100 Subject: [PATCH] Format Signed-off-by: Sebastian Schildt --- kuksa-persistence-provider/Cargo.toml | 8 + .../src/kuksaconnector.rs | 235 +++++++++++++++--- kuksa-persistence-provider/src/main.rs | 154 +++++++----- kuksa-persistence-provider/src/storage.rs | 3 +- .../src/storage/filestorage.rs | 73 +++--- kuksa-persistence-provider/statestore.json | 16 +- 6 files changed, 343 insertions(+), 146 deletions(-) diff --git a/kuksa-persistence-provider/Cargo.toml b/kuksa-persistence-provider/Cargo.toml index d05d90e..9b6d62b 100644 --- a/kuksa-persistence-provider/Cargo.toml +++ b/kuksa-persistence-provider/Cargo.toml @@ -15,3 +15,11 @@ regex = "1.11.0" #Only this version is compatible with kuksa crate. 0.13 is not prost-types = "0.11.9" tokio = { version="1.40.0", features = ["full"] } + +#Still broken due to closed repo +#djson = { git="https://github.com/dependix/platform.git" , optional = true } +djson = { path="platform/modules/json-al/djson_rust/" , optional = true } + +[features] +# use djson library +djson = [ "dep:djson" ] diff --git a/kuksa-persistence-provider/src/kuksaconnector.rs b/kuksa-persistence-provider/src/kuksaconnector.rs index de3e746..cf8ca88 100644 --- a/kuksa-persistence-provider/src/kuksaconnector.rs +++ b/kuksa-persistence-provider/src/kuksaconnector.rs @@ -8,14 +8,15 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use crate::storage; +use crate::storage::{self, StoreItem}; -use std::time::SystemTime; use std::collections::HashMap; +use std::fmt; +use std::time::SystemTime; -use kuksa::proto as proto; +use kuksa::proto; -//use kuksa::proto::v1::{datapoint::Value, DataType, Datapoint}; +use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; @@ -24,25 +25,36 @@ pub struct ParseError {} pub fn create_kuksa_client(uri: &str) -> Arc> { log::info!("Creating Kuksa Databroker client for URI: {}", uri); - let uri = kuksa::Uri::try_from(uri) - .expect("Invalid URI for Kuksa Databroker connection."); + let uri = kuksa::Uri::try_from(uri).expect("Invalid URI for Kuksa Databroker connection."); Arc::new(Mutex::new(kuksa::Client::new(uri))) } - -pub async fn get_from_storage_and_set_values(storage: &impl storage::Storage, kuksa_client: &Arc>, vsspaths: &Vec) { +pub async fn get_from_storage_and_set_values( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspaths: &Vec, +) { for vsspath in vsspaths { get_from_storage_and_set(storage, kuksa_client, vsspath, false).await; } } -pub async fn get_from_storage_and_set_actuations(storage: &impl storage::Storage, kuksa_client: &Arc>, vsspaths: &Vec) { +pub async fn get_from_storage_and_set_actuations( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspaths: &Vec, +) { for vsspath in vsspaths { get_from_storage_and_set(storage, kuksa_client, vsspath, true).await; } } -pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_client: &Arc>, vsspath: &str, is_actuator: bool) { +pub async fn get_from_storage_and_set( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspath: &str, + is_actuator: bool, +) { log::debug!("Query storage for VSS signal: {}", vsspath); let value = match storage.get(vsspath) { Some(x) => x, @@ -50,26 +62,41 @@ pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_cli log::warn!("No value for VSS signal: {} stored", vsspath); return; } - }; - + }; //Figure out metadata: - let datapoint_entries = match kuksa_client.lock().unwrap().get_metadata(vec![vsspath]).await { + let datapoint_entries = match kuksa_client + .lock() + .unwrap() + .get_metadata(vec![vsspath]) + .await + { Ok(data_entries) => Some(data_entries), Err(kuksa::Error::Status(status)) => { - log::warn!("Error: Could not get metadata for VSS signal: {}, Status: {}", vsspath, &status); + log::warn!( + "Error: Could not get metadata for VSS signal: {}, Status: {}", + vsspath, + &status + ); None } Err(kuksa::Error::Connection(msg)) => { - log::warn!("Connection Error: Could not get metadata for VSS signal: {}, Reason: {}", vsspath, &msg); + log::warn!( + "Connection Error: Could not get metadata for VSS signal: {}, Reason: {}", + vsspath, + &msg + ); None } Err(kuksa::Error::Function(msg)) => { - log::warn!("Error: Could not get metadata for VSS signal: {}, Errors: {msg:?}", vsspath); + log::warn!( + "Error: Could not get metadata for VSS signal: {}, Errors: {msg:?}", + vsspath + ); None } }; - + if datapoint_entries.is_none() { return; } @@ -80,15 +107,13 @@ pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_cli if let Some(metadata) = &entries.first().unwrap().metadata { let data_value = try_into_data_value( value, - proto::v1::DataType::from_i32(metadata.data_type) - .unwrap(), + proto::v1::DataType::from_i32(metadata.data_type).unwrap(), ); if data_value.is_err() { log::warn!( "Could not parse \"{}\" as {:?}", value, - proto::v1::DataType::from_i32(metadata.data_type) - .unwrap() + proto::v1::DataType::from_i32(metadata.data_type).unwrap() ); return; } @@ -102,32 +127,135 @@ pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_cli }, )]); - let result = { if is_actuator { - kuksa_client.lock().unwrap().set_target_values(datapoints).await - } - else { - kuksa_client.lock().unwrap().set_current_values(datapoints).await - } - }; + let result = { + if is_actuator { + kuksa_client + .lock() + .unwrap() + .set_target_values(datapoints) + .await + } else { + kuksa_client + .lock() + .unwrap() + .set_current_values(datapoints) + .await + } + }; match result { Ok(_) => { log::debug!("Succes setting {} to {}", vsspath, value); } Err(kuksa::Error::Status(status)) => { - log::warn!("Error: Could not set value for VSS signal: {}, Status: {}", vsspath, &status); + log::warn!( + "Error: Could not set value for VSS signal: {}, Status: {}", + vsspath, + &status + ); } Err(kuksa::Error::Connection(msg)) => { - log::warn!("Connection Error: Could not set value for VSS signal: {}, Reason: {}", vsspath, &msg); + log::warn!( + "Connection Error: Could not set value for VSS signal: {}, Reason: {}", + vsspath, + &msg + ); } Err(kuksa::Error::Function(msg)) => { - log::warn!("Error: Could not set value for VSS signal: {}, Errors: {msg:?}", vsspath); + log::warn!( + "Error: Could not set value for VSS signal: {}, Errors: {msg:?}", + vsspath + ); } }; } } } +pub async fn watch_values( + storage_queue: Sender, + kuksa_client: &Arc>, + vsspaths: Vec<&str>, + is_actuator: bool, +) { + log::info!( + "Subscribing to {} for VSS signals: {:?}", + { + match is_actuator { + true => "actuators", + false => "current values", + } + }, + &vsspaths + ); + + let res = match is_actuator { + true => { + kuksa_client + .lock() + .unwrap() + .subscribe_target_values(vsspaths) + .await + } + false => { + kuksa_client + .lock() + .unwrap() + .subscribe_current_values(vsspaths) + .await + } + }; + match res { + Ok(mut subs) => { + tokio::spawn(async move { + loop { + match subs.message().await { + Ok(resp) => { + if let Some(r) = resp { + for update in r.updates { + if let Some(entry) = update.entry { + let newdp = match is_actuator { + true => entry.actuator_target, + false => entry.value, + }; + if let Some(datapoint) = newdp { + let data = DisplayDatapoint(datapoint); + log::info!( + "Received value {} for VSS signal {}", + data.to_string(), + entry.path + ); + + match storage_queue.send(StoreItem { + path: entry.path.clone(), + value: data.to_string(), + }) { + Ok(_) => {} + Err(err) => { + log::warn!( + "Error sending data to storage {:?}", + err + ); + } + } + } + } + } + } + } + Err(err) => { + log::warn!("Error: Could not receive message: {:?}", err); + break; + } + } + } + }); + } + Err(err) => { + log::warn!("Error: Could not subscribe to VSS signals: {:?}", err); + } + } +} /* Donation from databroker-cli */ fn try_into_data_value( @@ -260,9 +388,6 @@ fn try_into_data_value( } } - - - pub fn get_array_from_input(values: String) -> Result, ParseError> { let raw_input = values .strip_prefix('[') @@ -289,3 +414,45 @@ pub fn get_array_from_input(values: String) -> Result(f: &mut fmt::Formatter<'_>, array: &[T]) -> fmt::Result +where + T: fmt::Display, +{ + f.write_str("[")?; + let real_delimiter = ", "; + let mut delimiter = ""; + for value in array { + write!(f, "{delimiter}")?; + delimiter = real_delimiter; + write!(f, "{value}")?; + } + f.write_str("]") +} + +impl fmt::Display for DisplayDatapoint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0.value { + Some(value) => match value { + proto::v1::datapoint::Value::Bool(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Int32(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Int64(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Uint32(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Uint64(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Float(value) => f.pad(&format!("{value:.2}")), + proto::v1::datapoint::Value::Double(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::String(value) => f.pad(&value.to_owned()), + proto::v1::datapoint::Value::StringArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::BoolArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Int32Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Int64Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Uint32Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Uint64Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::FloatArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::DoubleArray(array) => display_array(f, &array.values), + }, + None => f.pad("None"), + } + } +} diff --git a/kuksa-persistence-provider/src/main.rs b/kuksa-persistence-provider/src/main.rs index d039784..cc29f9a 100644 --- a/kuksa-persistence-provider/src/main.rs +++ b/kuksa-persistence-provider/src/main.rs @@ -8,19 +8,18 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -mod storage; mod kuksaconnector; +mod storage; use storage::Storage; -use std::collections::HashMap; -use std::{path::PathBuf,env}; use clap::Parser; +use std::collections::HashMap; +use std::{env, path::PathBuf}; use tinyjson::JsonValue; use tokio::signal::ctrl_c; - #[derive(Parser)] #[command(version, about, long_about = None)] struct CmdLine { @@ -39,12 +38,11 @@ async fn main() { env::set_var("RUST_LOG", "info") } env_logger::init(); - let args = CmdLine::parse(); let config = args.config.unwrap_or_else(|| PathBuf::from("config.json")); - + //Check config exists if !config.exists() { log::error!("Error: Can not find configuration at {}", config.display()); @@ -60,81 +58,99 @@ async fn main() { let parsed: JsonValue = config_str.parse().unwrap(); log::debug!("Parsed JSON data structure: {:?}", parsed); + let storage = match parsed["state-storage"]["type"] + .get::() + .unwrap() + .as_str() + { + "file" => storage::FileStorage::new(&parsed["state-storage"]), + _ => { + log::error!("Error: state storage type is invalid"); + std::process::exit(1); + } + }; - - let storage = match parsed["state-storage"]["type"].get::().unwrap().as_str() { - "file" => { - storage::FileStorage::new(&parsed["state-storage"]) - }, - _ => { - log::error!("Error: state storage type is invalid"); - std::process::exit(1); - } - }; - //let storage = Arc::new(Mutex::new(_storage)); - let mut restore_current_values: Vec = vec![]; - let mut restore_actuation_values: Vec = vec![]; - let mut watch_current_values: Vec = vec![]; - let mut watch_actuation_values: Vec = vec![]; + let mut restore_current_values: Vec = vec![]; + let mut restore_actuation_values: Vec = vec![]; + let mut watch_current_values: Vec = vec![]; + let mut watch_actuation_values: Vec = vec![]; + let section: Option<&HashMap> = parsed["restore-only"].get(); - - let section: Option<&HashMap> = parsed["restore-only"].get(); - - if section.is_some() { - let elements: Option<&Vec> = section.unwrap()["values"].get(); - if elements.is_some() { - for path in elements.unwrap() { - restore_current_values.push(path.get::().unwrap().to_string()); - } + if section.is_some() { + let elements: Option<&Vec> = section.unwrap()["values"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_current_values.push(path.get::().unwrap().to_string()); } - let elements: Option<&Vec> = section.unwrap()["actuators"].get(); - if elements.is_some() { - for path in elements.unwrap() { - restore_actuation_values.push(path.get::().unwrap().to_string()); - } + } + let elements: Option<&Vec> = section.unwrap()["actuators"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_actuation_values.push(path.get::().unwrap().to_string()); } } + } - let section: Option<&HashMap> = parsed["restore-and-watch"].get(); - if section.is_some() { - let elements: Option<&Vec> = section.unwrap()["values"].get(); - if elements.is_some() { - for path in elements.unwrap() { - restore_current_values.push(path.get::().unwrap().to_string()); - watch_current_values.push(path.get::().unwrap().to_string()); - } - } - let elements: Option<&Vec> = section.unwrap()["actuators"].get(); - if elements.is_some() { - for path in elements.unwrap() { - restore_actuation_values.push(path.get::().unwrap().to_string()); - watch_actuation_values.push(path.get::().unwrap().to_string()); - } + let section: Option<&HashMap> = parsed["restore-and-watch"].get(); + if section.is_some() { + let elements: Option<&Vec> = section.unwrap()["values"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_current_values.push(path.get::().unwrap().to_string()); + watch_current_values.push(path.get::().unwrap().to_string()); } } - - - let kuksa_client = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); - //Each subscription needs a separate client - let kuksa_client2 = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); - - kuksaconnector::get_from_storage_and_set_values(&storage, &kuksa_client, &restore_current_values).await; - kuksaconnector::get_from_storage_and_set_actuations(&storage, &kuksa_client, &restore_actuation_values).await; - - drop(restore_actuation_values); - drop(restore_current_values); - - kuksaconnector::watch_values(storage.get_queue(), &kuksa_client, watch_current_values.iter().map(|s| &**s).collect(), false).await; - kuksaconnector::watch_values(storage.get_queue(), &kuksa_client2, watch_actuation_values.iter().map(|s| &**s).collect(), true).await; - - tokio::select! { - _ = ctrl_c() => { - println!("Received Ctrl+C, exiting."); - return; + let elements: Option<&Vec> = section.unwrap()["actuators"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_actuation_values.push(path.get::().unwrap().to_string()); + watch_actuation_values.push(path.get::().unwrap().to_string()); } } + } + let kuksa_client = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); + //Each subscription needs a separate client + let kuksa_client2 = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); + + kuksaconnector::get_from_storage_and_set_values( + &storage, + &kuksa_client, + &restore_current_values, + ) + .await; + kuksaconnector::get_from_storage_and_set_actuations( + &storage, + &kuksa_client, + &restore_actuation_values, + ) + .await; + + drop(restore_actuation_values); + drop(restore_current_values); + + kuksaconnector::watch_values( + storage.get_queue(), + &kuksa_client, + watch_current_values.iter().map(|s| &**s).collect(), + false, + ) + .await; + kuksaconnector::watch_values( + storage.get_queue(), + &kuksa_client2, + watch_actuation_values.iter().map(|s| &**s).collect(), + true, + ) + .await; + + tokio::select! { + _ = ctrl_c() => { + println!("Received Ctrl+C, exiting."); + return; + } + } } diff --git a/kuksa-persistence-provider/src/storage.rs b/kuksa-persistence-provider/src/storage.rs index 77972b9..6a7a7ef 100644 --- a/kuksa-persistence-provider/src/storage.rs +++ b/kuksa-persistence-provider/src/storage.rs @@ -8,7 +8,6 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ - pub mod filestorage; use std::sync::mpsc::Sender; @@ -25,7 +24,7 @@ pub trait Storage { fn new(config: &JsonValue) -> Self; fn get(&self, vsspath: &str) -> Option<&str>; - + fn set(&self, vsspath: &str, vssvalue: &str) -> Result<(), ()>; fn get_queue(&self) -> Sender; diff --git a/kuksa-persistence-provider/src/storage/filestorage.rs b/kuksa-persistence-provider/src/storage/filestorage.rs index 282be61..a216212 100644 --- a/kuksa-persistence-provider/src/storage/filestorage.rs +++ b/kuksa-persistence-provider/src/storage/filestorage.rs @@ -14,8 +14,8 @@ use super::{Storage, StoreItem}; use std::collections::HashMap; use std::io::Write; -use std::sync::mpsc::{Sender, Receiver}; use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; use std::fs::File; @@ -28,8 +28,7 @@ pub struct FileStorage { impl Storage for FileStorage { fn new(config: &JsonValue) -> Self { - match config["path"].get::() - { + match config["path"].get::() { Some(x) => { log::info!("Initializing file storage on {}", x); let path = x.clone(); @@ -39,36 +38,34 @@ impl Storage for FileStorage { let mut state_copy = state.get::>().unwrap().clone(); let (tx, rx): (Sender, Receiver) = mpsc::channel(); let fs = FileStorage { state, queue: tx }; - std::thread::spawn(move || { - loop { - match rx.recv() { - Ok(msg) => { - log::info!("Store value: {} for path {}", msg.value, msg.path); - let mut val: HashMap:: = HashMap::new(); - val.insert("value".to_string(), JsonValue::String(msg.value.clone())); - state_copy.insert(msg.path.clone(), JsonValue::from(val)); - let out_json: JsonValue = JsonValue::from(state_copy.to_owned()); - let mut file = File::create(&path).unwrap(); - match file.write_all(out_json.format().unwrap().as_bytes()) { - Ok(_) => {} - Err(e) => { - log::error!("Error writing to state storage file: {:?}",e); - break; - } + std::thread::spawn(move || loop { + match rx.recv() { + Ok(msg) => { + log::info!("Store value: {} for path {}", msg.value, msg.path); + let mut val: HashMap = HashMap::new(); + val.insert("value".to_string(), JsonValue::String(msg.value.clone())); + state_copy.insert(msg.path.clone(), JsonValue::from(val)); + let out_json: JsonValue = JsonValue::from(state_copy.to_owned()); + let mut file = File::create(&path).unwrap(); + match file.write_all(out_json.format().unwrap().as_bytes()) { + Ok(_) => {} + Err(e) => { + log::error!("Error writing to state storage file: {:?}", e); + break; } - let _ = file.flush(); - drop(file); - } - Err(_) => { - log::error!("Error receiving message"); - break; } + let _ = file.flush(); + drop(file); + } + Err(_) => { + log::error!("Error receiving message"); + break; } } }); fs } - _ => { + _ => { log::error!("Error: file storage path is invalid"); std::process::exit(1); } @@ -77,7 +74,12 @@ impl Storage for FileStorage { fn get(&self, vsspath: &str) -> Option<&str> { log::debug!("Try getting VSS signal {}", vsspath); - if !self.state.get::>().unwrap().contains_key(vsspath) { + if !self + .state + .get::>() + .unwrap() + .contains_key(vsspath) + { return None; } @@ -85,18 +87,25 @@ impl Storage for FileStorage { if entry.is_some() && entry.unwrap().contains_key("value") { let value = entry.unwrap()["value"].get::(); - + if let Some(v) = value { return Some(v); } - log::warn!("Error reading {vsspath}, make sure all values are quoted and stored as string") + log::warn!( + "Error reading {vsspath}, make sure all values are quoted and stored as string" + ) } None } fn set(&self, vsspath: &str, vssvalue: &str) -> Result<(), ()> { log::debug!("Setting VSS signal {} to {}", vsspath, vssvalue); - self.queue.send(StoreItem { path: vsspath.to_string(), value: vssvalue.to_string()}).map_err(|_| ()) + self.queue + .send(StoreItem { + path: vsspath.to_string(), + value: vssvalue.to_string(), + }) + .map_err(|_| ()) } fn get_queue(&self) -> Sender { @@ -104,6 +113,4 @@ impl Storage for FileStorage { } } -impl FileStorage { - -} \ No newline at end of file +impl FileStorage {} diff --git a/kuksa-persistence-provider/statestore.json b/kuksa-persistence-provider/statestore.json index e3cf2b1..68c59e8 100644 --- a/kuksa-persistence-provider/statestore.json +++ b/kuksa-persistence-provider/statestore.json @@ -1,20 +1,20 @@ { - "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit": { - "value": "C" + "Vehicle.VehicleIdentification.VIN": { + "value": "DEADBEEF" }, "Vehicle.Cabin.HVAC.Station.Row4.Passenger.FanSpeed": { "value": "41" }, + "Vehicle.Cabin.Infotainment.Media.Volume": { + "value": "22" + }, "Vehicle.VehicleIdentification.VehicleInteriorColor": { "value": "Black" }, + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit": { + "value": "C" + }, "LOL": { "value": "LOL" - }, - "Vehicle.VehicleIdentification.VIN": { - "value": "DEADBEEF" - }, - "Vehicle.Cabin.Infotainment.Media.Volume": { - "value": "22" } } \ No newline at end of file