Skip to content

Commit

Permalink
Initial example for testing slow subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikehaller committed Jan 28, 2025
1 parent 86ed421 commit da97f9b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 3 deletions.
4 changes: 2 additions & 2 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,10 +1670,10 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.await
.add_change_subscription(subscription);

let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
let stream = BroadcastStream::new(receiver).filter_map(move |result| match result {
Ok(message) => Some(message),
Err(err) => {
debug!("Lagged entries: {}", err);
warn!("Slow subscriber with capacity {} lagged and missed signal updates: {}", channel_capacity, err);
None
}
});
Expand Down
76 changes: 76 additions & 0 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ resolver = "2"
members = [
"common",
"kuksa",
"sdv"
"sdv",
"databroker-examples"
]

[workspace.dependencies]
Expand Down
11 changes: 11 additions & 0 deletions lib/databroker-examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "databroker-examples"
version = "0.1.0"
edition = "2021"

[dependencies]
kuksa-common = { path = "../common"}
kuksa = { path = "../kuksa"}
kuksa-sdv = { path = "../sdv"}
tokio = {version = "1.17.0", features = ["full"]}
tokio-stream = "0.1.8"
49 changes: 49 additions & 0 deletions lib/databroker-examples/examples/slow_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License 2.0 which is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use kuksa::KuksaClient;
use tokio::time::{sleep, Duration};
use kuksa_common::to_uri;
use std::thread;

#[tokio::main]
async fn main() {

// Paths to subscribe
let paths = vec!["Vehicle.Speed"];

// Initialize the KuksaClient
let mut client: KuksaClient = KuksaClient::new(to_uri("127.0.0.1:55555").unwrap());

// Subscribe to paths
let mut stream = client.subscribe(paths.clone()).await.unwrap();

println!("Subscribed to {:?}", paths);

loop {
match stream.message().await {
Ok(msg) => {
println!("Got message, will wait 5 seconds: {:?}", msg);
// Simulate slow processing by sleeping
sleep(Duration::from_secs(1)).await;
thread::sleep(Duration::from_secs(5));
}
Err(e) => {
println!("Error while receiving message: {:?}", e);
break; // Exit loop on error
}
}
}

println!("Exiting subscriber...");
}

0 comments on commit da97f9b

Please sign in to comment.