Skip to content

Commit

Permalink
Merge branch 'main' into pre-commit-fmt-clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag authored Feb 6, 2025
2 parents 3d3a72a + ec91336 commit b976797
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 8 deletions.
21 changes: 21 additions & 0 deletions databroker-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
&["proto"],
)
.unwrap();
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("kuksa.val.v1_descriptor.bin"))
.compile(
&[
"proto/kuksa/val/v1/val.proto",
"proto/kuksa/val/v1/types.proto",
],
&["proto"],
)
.unwrap();
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("sdv.databroker.v1_descriptor.bin"))
.compile(
&[
"proto/sdv/databroker/v1/broker.proto",
"proto/sdv/databroker/v1/types.proto",
"proto/sdv/databroker/v1/collector.proto",
],
&["proto"],
)
.unwrap();

Ok(())
}
4 changes: 4 additions & 0 deletions databroker-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
pub mod sdv {
pub mod databroker {
pub mod v1 {
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("sdv.databroker.v1_descriptor");
tonic::include_proto!("sdv.databroker.v1");
}
}
Expand All @@ -24,6 +26,8 @@ pub mod sdv {
pub mod kuksa {
pub mod val {
pub mod v1 {
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("kuksa.val.v1_descriptor");
tonic::include_proto!("kuksa.val.v1");

use datapoint::Value;
Expand Down
7 changes: 5 additions & 2 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,10 +1670,13 @@ impl AuthorizedAccess<'_, '_> {
.await
.add_change_subscription(subscription);

let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
let stream = BroadcastStream::new(receiver).filter_map(move |result| match result {
Ok(message) => Some(message),
Err(err) => {
debug!("Lagged entries: {}", err);
warn!(
"Slow subscriber with capacity {} lagged and missed signal updates: {}",
channel_capacity, err
);
None
}
});
Expand Down
16 changes: 11 additions & 5 deletions databroker/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ where
}
};

let mut reflection_builder = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(kuksa::val::v1::FILE_DESCRIPTOR_SET);
let mut router = server.add_optional_service(kuksa_val_v1);

if apis.contains(&Api::KuksaValV2) {
let service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(kuksa::val::v2::FILE_DESCRIPTOR_SET)
.build()
.unwrap();
reflection_builder = reflection_builder
.register_encoded_file_descriptor_set(kuksa::val::v2::FILE_DESCRIPTOR_SET);

router = router.add_service(service).add_optional_service(Some(
router = router.add_optional_service(Some(
kuksa::val::v2::val_server::ValServer::with_interceptor(
broker.clone(),
authorization.clone(),
Expand All @@ -229,6 +229,9 @@ where
}

if apis.contains(&Api::SdvDatabrokerV1) {
reflection_builder = reflection_builder
.register_encoded_file_descriptor_set(sdv::databroker::v1::FILE_DESCRIPTOR_SET);

router = router.add_optional_service(Some(
sdv::databroker::v1::broker_server::BrokerServer::with_interceptor(
broker.clone(),
Expand All @@ -243,6 +246,9 @@ where
));
}

let reflection_service = reflection_builder.build().unwrap();
router = router.add_service(reflection_service);

router
.serve_with_incoming_shutdown(incoming, shutdown(broker, signal))
.await?;
Expand Down
76 changes: 76 additions & 0 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ resolver = "2"
members = [
"common",
"kuksa",
"sdv"
"sdv",
"databroker-examples"
]

[workspace.dependencies]
Expand Down
11 changes: 11 additions & 0 deletions lib/databroker-examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "databroker-examples"
version = "0.1.0"
edition = "2021"

[dependencies]
kuksa-common = { path = "../common"}
kuksa = { path = "../kuksa"}
kuksa-sdv = { path = "../sdv"}
tokio = {version = "1.17.0", features = ["full"]}
tokio-stream = "0.1.8"
49 changes: 49 additions & 0 deletions lib/databroker-examples/examples/slow_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License 2.0 which is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use kuksa::KuksaClient;
use tokio::time::{sleep, Duration};
use kuksa_common::to_uri;
use std::thread;

#[tokio::main]
async fn main() {

// Paths to subscribe
let paths = vec!["Vehicle.Speed"];

// Initialize the KuksaClient
let mut client: KuksaClient = KuksaClient::new(to_uri("127.0.0.1:55555").unwrap());

// Subscribe to paths
let mut stream = client.subscribe(paths.clone()).await.unwrap();

println!("Subscribed to {:?}", paths);

loop {
match stream.message().await {
Ok(msg) => {
println!("Got message, will wait 5 seconds: {:?}", msg);
// Simulate slow processing by sleeping
sleep(Duration::from_secs(1)).await;
thread::sleep(Duration::from_secs(5));
}
Err(e) => {
println!("Error while receiving message: {:?}", e);
break; // Exit loop on error
}
}
}

println!("Exiting subscriber...");
}

0 comments on commit b976797

Please sign in to comment.