Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Schildt <sebastian.schildt@de.bosch.com>
  • Loading branch information
SebastianSchildt committed Oct 27, 2024
1 parent a4061e9 commit 0ed0451
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 146 deletions.
8 changes: 8 additions & 0 deletions kuksa-persistence-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@ regex = "1.11.0"
#Only this version is compatible with kuksa crate. 0.13 is not
prost-types = "0.11.9"
tokio = { version="1.40.0", features = ["full"] }

#Still broken due to closed repo
#djson = { git="https://github.com/dependix/platform.git" , optional = true }
djson = { path="platform/modules/json-al/djson_rust/" , optional = true }

[features]
# use djson library
djson = [ "dep:djson" ]
235 changes: 201 additions & 34 deletions kuksa-persistence-provider/src/kuksaconnector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use crate::storage;
use crate::storage::{self, StoreItem};

use std::time::SystemTime;
use std::collections::HashMap;
use std::fmt;
use std::time::SystemTime;

use kuksa::proto as proto;
use kuksa::proto;

//use kuksa::proto::v1::{datapoint::Value, DataType, Datapoint};
use std::sync::mpsc::Sender;

use std::sync::{Arc, Mutex};

Expand All @@ -24,52 +25,78 @@ pub struct ParseError {}

pub fn create_kuksa_client(uri: &str) -> Arc<Mutex<kuksa::Client>> {
log::info!("Creating Kuksa Databroker client for URI: {}", uri);
let uri = kuksa::Uri::try_from(uri)
.expect("Invalid URI for Kuksa Databroker connection.");
let uri = kuksa::Uri::try_from(uri).expect("Invalid URI for Kuksa Databroker connection.");
Arc::new(Mutex::new(kuksa::Client::new(uri)))
}


pub async fn get_from_storage_and_set_values(storage: &impl storage::Storage, kuksa_client: &Arc<Mutex<kuksa::Client>>, vsspaths: &Vec<String>) {
pub async fn get_from_storage_and_set_values(
storage: &impl storage::Storage,
kuksa_client: &Arc<Mutex<kuksa::Client>>,
vsspaths: &Vec<String>,
) {
for vsspath in vsspaths {
get_from_storage_and_set(storage, kuksa_client, vsspath, false).await;
}
}

pub async fn get_from_storage_and_set_actuations(storage: &impl storage::Storage, kuksa_client: &Arc<Mutex<kuksa::Client>>, vsspaths: &Vec<String>) {
pub async fn get_from_storage_and_set_actuations(
storage: &impl storage::Storage,
kuksa_client: &Arc<Mutex<kuksa::Client>>,
vsspaths: &Vec<String>,
) {
for vsspath in vsspaths {
get_from_storage_and_set(storage, kuksa_client, vsspath, true).await;
}
}

pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_client: &Arc<Mutex<kuksa::Client>>, vsspath: &str, is_actuator: bool) {
pub async fn get_from_storage_and_set(
storage: &impl storage::Storage,
kuksa_client: &Arc<Mutex<kuksa::Client>>,
vsspath: &str,
is_actuator: bool,
) {
log::debug!("Query storage for VSS signal: {}", vsspath);
let value = match storage.get(vsspath) {
Some(x) => x,
None => {
log::warn!("No value for VSS signal: {} stored", vsspath);
return;
}
};

};

//Figure out metadata:
let datapoint_entries = match kuksa_client.lock().unwrap().get_metadata(vec![vsspath]).await {
let datapoint_entries = match kuksa_client
.lock()
.unwrap()
.get_metadata(vec![vsspath])
.await
{
Ok(data_entries) => Some(data_entries),
Err(kuksa::Error::Status(status)) => {
log::warn!("Error: Could not get metadata for VSS signal: {}, Status: {}", vsspath, &status);
log::warn!(
"Error: Could not get metadata for VSS signal: {}, Status: {}",
vsspath,
&status
);
None
}
Err(kuksa::Error::Connection(msg)) => {
log::warn!("Connection Error: Could not get metadata for VSS signal: {}, Reason: {}", vsspath, &msg);
log::warn!(
"Connection Error: Could not get metadata for VSS signal: {}, Reason: {}",
vsspath,
&msg
);
None
}
Err(kuksa::Error::Function(msg)) => {
log::warn!("Error: Could not get metadata for VSS signal: {}, Errors: {msg:?}", vsspath);
log::warn!(
"Error: Could not get metadata for VSS signal: {}, Errors: {msg:?}",
vsspath
);
None
}
};

if datapoint_entries.is_none() {
return;
}
Expand All @@ -80,15 +107,13 @@ pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_cli
if let Some(metadata) = &entries.first().unwrap().metadata {
let data_value = try_into_data_value(
value,
proto::v1::DataType::from_i32(metadata.data_type)
.unwrap(),
proto::v1::DataType::from_i32(metadata.data_type).unwrap(),
);
if data_value.is_err() {
log::warn!(
"Could not parse \"{}\" as {:?}",
value,
proto::v1::DataType::from_i32(metadata.data_type)
.unwrap()
proto::v1::DataType::from_i32(metadata.data_type).unwrap()
);
return;
}
Expand All @@ -102,32 +127,135 @@ pub async fn get_from_storage_and_set(storage: &impl storage::Storage, kuksa_cli
},
)]);

let result = { if is_actuator {
kuksa_client.lock().unwrap().set_target_values(datapoints).await
}
else {
kuksa_client.lock().unwrap().set_current_values(datapoints).await
}
};
let result = {
if is_actuator {
kuksa_client
.lock()
.unwrap()
.set_target_values(datapoints)
.await
} else {
kuksa_client
.lock()
.unwrap()
.set_current_values(datapoints)
.await
}
};

match result {
Ok(_) => {
log::debug!("Succes setting {} to {}", vsspath, value);
}
Err(kuksa::Error::Status(status)) => {
log::warn!("Error: Could not set value for VSS signal: {}, Status: {}", vsspath, &status);
log::warn!(
"Error: Could not set value for VSS signal: {}, Status: {}",
vsspath,
&status
);
}
Err(kuksa::Error::Connection(msg)) => {
log::warn!("Connection Error: Could not set value for VSS signal: {}, Reason: {}", vsspath, &msg);
log::warn!(
"Connection Error: Could not set value for VSS signal: {}, Reason: {}",
vsspath,
&msg
);
}
Err(kuksa::Error::Function(msg)) => {
log::warn!("Error: Could not set value for VSS signal: {}, Errors: {msg:?}", vsspath);
log::warn!(
"Error: Could not set value for VSS signal: {}, Errors: {msg:?}",
vsspath
);
}
};
}
}
}

pub async fn watch_values(
storage_queue: Sender<storage::StoreItem>,
kuksa_client: &Arc<Mutex<kuksa::Client>>,
vsspaths: Vec<&str>,
is_actuator: bool,
) {
log::info!(
"Subscribing to {} for VSS signals: {:?}",
{
match is_actuator {
true => "actuators",
false => "current values",
}
},
&vsspaths
);

let res = match is_actuator {
true => {
kuksa_client
.lock()
.unwrap()
.subscribe_target_values(vsspaths)
.await
}
false => {
kuksa_client
.lock()
.unwrap()
.subscribe_current_values(vsspaths)
.await
}
};
match res {
Ok(mut subs) => {
tokio::spawn(async move {
loop {
match subs.message().await {
Ok(resp) => {
if let Some(r) = resp {
for update in r.updates {
if let Some(entry) = update.entry {
let newdp = match is_actuator {
true => entry.actuator_target,
false => entry.value,
};
if let Some(datapoint) = newdp {
let data = DisplayDatapoint(datapoint);
log::info!(
"Received value {} for VSS signal {}",
data.to_string(),
entry.path
);

match storage_queue.send(StoreItem {
path: entry.path.clone(),
value: data.to_string(),
}) {
Ok(_) => {}
Err(err) => {
log::warn!(
"Error sending data to storage {:?}",
err
);
}
}
}
}
}
}
}
Err(err) => {
log::warn!("Error: Could not receive message: {:?}", err);
break;
}
}
}
});
}
Err(err) => {
log::warn!("Error: Could not subscribe to VSS signals: {:?}", err);
}
}
}

/* Donation from databroker-cli */
fn try_into_data_value(
Expand Down Expand Up @@ -260,9 +388,6 @@ fn try_into_data_value(
}
}




pub fn get_array_from_input<T: std::str::FromStr>(values: String) -> Result<Vec<T>, ParseError> {
let raw_input = values
.strip_prefix('[')
Expand All @@ -289,3 +414,45 @@ pub fn get_array_from_input<T: std::str::FromStr>(values: String) -> Result<Vec<
Ok(array)
}

struct DisplayDatapoint(proto::v1::Datapoint);

fn display_array<T>(f: &mut fmt::Formatter<'_>, array: &[T]) -> fmt::Result
where
T: fmt::Display,
{
f.write_str("[")?;
let real_delimiter = ", ";
let mut delimiter = "";
for value in array {
write!(f, "{delimiter}")?;
delimiter = real_delimiter;
write!(f, "{value}")?;
}
f.write_str("]")
}

impl fmt::Display for DisplayDatapoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0.value {
Some(value) => match value {
proto::v1::datapoint::Value::Bool(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::Int32(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::Int64(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::Uint32(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::Uint64(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::Float(value) => f.pad(&format!("{value:.2}")),
proto::v1::datapoint::Value::Double(value) => f.pad(&format!("{value}")),
proto::v1::datapoint::Value::String(value) => f.pad(&value.to_owned()),
proto::v1::datapoint::Value::StringArray(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::BoolArray(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::Int32Array(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::Int64Array(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::Uint32Array(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::Uint64Array(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::FloatArray(array) => display_array(f, &array.values),
proto::v1::datapoint::Value::DoubleArray(array) => display_array(f, &array.values),
},
None => f.pad("None"),
}
}
}
Loading

0 comments on commit 0ed0451

Please sign in to comment.