Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade Zenoh to 1.1.0 #261

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,936 changes: 1,131 additions & 805 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,18 @@ tracing-subscriber = { version = "0.3" }
uhlc = "0.6"
url = { version = "2.2", features = ["serde"] }
uuid = { version = "1.1", features = ["serde", "v4"] }
zenoh = { version = "0.11.0-rc.3", features = ["unstable", "plugins"] }
zenoh-collections = { version = "0.11.0-rc.3" }
zenoh-core = { version = "0.11.0-rc.3" }
zenoh-ext = { version = "0.11.0-rc.3" }
# ⚠️ To update the version of Zenoh, first *manually* copy the `Cargo.lock` from
# the targeted version, then run `cargo build` and finally commit the updates.
zenoh = { version = "=1.1.0", features = ["unstable", "internal", "plugins"] }
zenoh-config = { version = "=1.1.0" }
zenoh-flow-commons = { path = "./zenoh-flow-commons" }
zenoh-flow-daemon = { path = "./zenoh-flow-daemon" }
zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" }
zenoh-flow-nodes = { path = "./zenoh-flow-nodes" }
zenoh-flow-records = { path = "./zenoh-flow-records" }
zenoh-flow-runtime = { path = "./zenoh-flow-runtime" }
zenoh-keyexpr = { version = "0.11.0-rc.3" }
zenoh-plugin-trait = { version = "0.11.0-rc.3" }
zenoh-protocol = { version = "0.11.0-rc.3" }
zenoh-result = "0.11.0-rc.3"
zenoh-sync = { version = "0.11.0-rc.3" }
zenoh-util = { version = "0.11.0-rc.3" }
zenoh-plugin-trait = { version = "=1.1.0" }
zenoh-keyexpr = { version = "=1.1.0" }

[profile.dev]
debug = true
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.72.1"
channel = "1.75.0"
2 changes: 1 addition & 1 deletion zenoh-flow-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ serde_yaml = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-config = { workspace = true }
7 changes: 3 additions & 4 deletions zenoh-flow-commons/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{fmt::Display, ops::Deref, str::FromStr, sync::Arc};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zenoh_protocol::core::ZenohId;
use zenoh_config::ZenohId;

use crate::deserialize::deserialize_id;

Expand Down Expand Up @@ -116,10 +116,9 @@ pub struct RuntimeId(ZenohId);

impl RuntimeId {
/// Generate a new random identifier, guaranteed (with a high probability) to be unique.
///
/// This internally calls [ZenohId::rand].
pub fn rand() -> Self {
Self(ZenohId::rand())
// NOTE: The `Default` trait implementation internally calls `rand()`.
Self(ZenohId::default())
}
}

Expand Down
14 changes: 9 additions & 5 deletions zenoh-flow-daemon/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod queryables;
use std::sync::Arc;

use flume::{Receiver, Sender};
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::Result;
pub use zenoh_flow_runtime::{Extension, Extensions, Runtime};

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Daemon {
/// [runtime]: Runtime
/// [configuration]: ZenohFlowConfiguration
pub async fn spawn_from_config(
zenoh_session: Arc<Session>,
zenoh_session: Session,
configuration: ZenohFlowConfiguration,
) -> Result<Self> {
let extensions = configuration.extensions.unwrap_or_default();
Expand Down Expand Up @@ -116,9 +116,13 @@ impl Daemon {
// TODO: Clean everything up before aborting.
}

if let Err(e) =
queryables::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx)
.await
if let Err(e) = queryables::spawn_instances_queryable(
session.clone(),
runtime.clone(),
abort_rx,
abort_ack_tx,
)
.await
{
tracing::error!(
"The Zenoh-Flow daemon encountered a fatal error:\n{:?}\nAborting",
Expand Down
18 changes: 5 additions & 13 deletions zenoh-flow-daemon/src/daemon/queryables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use anyhow::bail;
use flume::{Receiver, Sender};
use futures::select;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::Result;
use zenoh_flow_runtime::Runtime;

Expand All @@ -27,17 +27,13 @@ use crate::queries::{

/// Spawns an async task to answer queries received on `zenoh-flow/{runtime_id}/instances`.
pub(crate) async fn spawn_instances_queryable(
zenoh_session: Arc<Session>,
zenoh_session: Session,
runtime: Arc<Runtime>,
abort_rx: Receiver<()>,
abort_ack_tx: Sender<()>,
) -> Result<()> {
let ke_instances = selectors::selector_instances(runtime.id());
let queryable = match zenoh_session
.declare_queryable(ke_instances.clone())
.res()
.await
{
let queryable = match zenoh_session.declare_queryable(ke_instances.clone()).await {
Ok(queryable) => {
tracing::trace!("declared queryable: {}", ke_instances);
queryable
Expand Down Expand Up @@ -89,18 +85,14 @@ pub(crate) async fn spawn_instances_queryable(
}

pub(crate) async fn spawn_runtime_queryable(
zenoh_session: Arc<Session>,
zenoh_session: Session,
runtime: Arc<Runtime>,
abort_rx: Receiver<()>,
abort_ack_tx: Sender<()>,
) -> Result<()> {
let ke_runtime = selectors::selector_runtimes(runtime.id());

let queryable = match zenoh_session
.declare_queryable(ke_runtime.clone())
.res()
.await
{
let queryable = match zenoh_session.declare_queryable(ke_runtime.clone()).await {
Ok(queryable) => {
tracing::trace!("declared queryable < {} >", ke_runtime);
queryable
Expand Down
11 changes: 3 additions & 8 deletions zenoh-flow-daemon/src/queries/instances/abort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::Runtime;

Expand All @@ -27,7 +27,7 @@ pub(crate) fn abort(runtime: Arc<Runtime>, origin: Origin, instance_id: Instance
match runtime.try_get_record(&instance_id).await {
Ok(record) => {
query_abort(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand Down Expand Up @@ -74,12 +74,7 @@ pub(crate) async fn query_abort(
for runtime_id in runtimes {
let selector = selectors::selector_instances(runtime_id);

if let Err(e) = session
.get(selector)
.with_value(abort_query.clone())
.res()
.await
{
if let Err(e) = session.get(selector).payload(abort_query.clone()).await {
tracing::error!(
"Sending abort query to runtime < {} > failed with error: {:?}",
runtime_id,
Expand Down
6 changes: 2 additions & 4 deletions zenoh-flow-daemon/src/queries/instances/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use anyhow::Context;
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{InstanceId, Result};
use zenoh_flow_descriptors::FlattenedDataFlowDescriptor;
use zenoh_flow_records::DataFlowRecord;
Expand Down Expand Up @@ -134,8 +133,7 @@ Query:
runtime
.session()
.get(&selector)
.with_value(payload.clone())
.res()
.payload(payload.clone())
.await,
r#"Zenoh query on < {} > failed"#,
selector
Expand All @@ -148,7 +146,7 @@ Query:
);

rollback_if_err!(
reply.sample,
reply.result(),
"Runtime < {} > failed to load data flow instance < {} >",
&runtime_id,
&instance_id
Expand Down
11 changes: 3 additions & 8 deletions zenoh-flow-daemon/src/queries/instances/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use async_std::task::JoinHandle;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::{DataFlowErr, Runtime};

Expand All @@ -39,12 +39,7 @@ pub(crate) async fn query_delete(

// NOTE: No need to process the request, as, even if the query failed, this is not something we want to recover
// from.
if let Err(e) = session
.get(selector)
.with_value(delete_query.clone())
.res()
.await
{
if let Err(e) = session.get(selector).payload(delete_query.clone()).await {
tracing::error!(
"Sending delete query to runtime < {} > failed with error: {:?}",
runtime_id,
Expand All @@ -69,7 +64,7 @@ pub(crate) fn delete_instance(
match runtime.try_get_record(&instance_id).await {
Ok(record) => {
query_delete(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand Down
22 changes: 8 additions & 14 deletions zenoh-flow-daemon/src/queries/instances/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{fmt::Debug, sync::Arc};

use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::{InstanceId, Result};
use zenoh_flow_descriptors::FlattenedDataFlowDescriptor;
use zenoh_flow_records::DataFlowRecord;
Expand All @@ -48,20 +48,14 @@ pub enum Origin {
}

async fn reply<T: Serialize + Debug>(query: Query, data: Result<T>) -> Result<()> {
let sample = match data {
Ok(data) => match serde_json::to_vec(&data) {
Ok(payload) => Ok(Sample::new(query.key_expr().clone(), payload)),
Err(e) => Err(Value::from(e.to_string())),
match data {
Ok(payload) => match serde_json::to_vec(&payload) {
Ok(payload) => query.reply(query.key_expr(), payload).await,
Err(e) => query.reply_err(e.to_string()).await,
},

Err(e) => Err(Value::from(e.to_string())),
};

query
.reply(sample)
.res()
.await
.map_err(|e| anyhow!("Failed to send reply: {:?}", e))
Err(e) => query.reply_err(e.to_string()).await,
}
.map_err(|e| anyhow!("Failed to send reply: {e:?}"))
}

/// The available interactions to manipulate a data flow instance.
Expand Down
17 changes: 5 additions & 12 deletions zenoh-flow-daemon/src/queries/instances/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use anyhow::bail;
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::{bytes::ZBytes, query::Query, Session};
use zenoh_flow_commons::{InstanceId, Result, RuntimeId};
use zenoh_flow_runtime::Runtime;

Expand Down Expand Up @@ -51,7 +51,7 @@ Caused by:
message,
e
);
if let Err(e) = query.reply(Err(Value::from(message))).res().await {
if let Err(e) = query.reply_err(message).await {
tracing::error!(
"Failed to reply (error) to query on < {} >: {:?}",
query.key_expr(),
Expand All @@ -74,7 +74,7 @@ Caused by:
if matches!(origin, Origin::Client) {
return_if_err!(
query_start(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand All @@ -97,10 +97,7 @@ Caused by:

tracing::trace!("Successfully started instance < {} >", instance_id);
return_if_err!(
query
.reply(Ok(Sample::new(query.key_expr().clone(), Value::empty())))
.res()
.await,
query.reply(query.key_expr(), ZBytes::default()).await,
"Failed to reply (success) to query on < {} >",
query.key_expr()
);
Expand Down Expand Up @@ -169,11 +166,7 @@ Caused by:
let selector = selectors::selector_instances(runtime_id);

rollback_if_err!(
session
.get(selector)
.with_value(start_query.clone())
.res()
.await,
session.get(selector).payload(start_query.clone()).await,
"Query `start` on runtime < {} > failed",
runtime_id
);
Expand Down
21 changes: 4 additions & 17 deletions zenoh-flow-daemon/src/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) mod selectors;

use anyhow::{anyhow, bail};
use serde::Deserialize;
use zenoh::{prelude::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::Result;
pub use zenoh_flow_runtime::InstanceStatus;

Expand All @@ -48,22 +48,9 @@ pub use self::{
///
/// After these checks, the method `process` is called on the variant of `InstancesQuery`.
pub(crate) async fn validate_query<T: for<'a> Deserialize<'a>>(query: &Query) -> Result<T> {
let value = match query.value() {
Some(value) => value,
None => {
bail!("Received empty payload");
}
let Some(payload) = query.payload() else {
bail!("Received Query with empty payload")
};

if ![
Encoding::APP_OCTET_STREAM,
Encoding::APP_JSON,
Encoding::TEXT_JSON,
]
.contains(&value.encoding)
{
bail!("Encoding < {} > is not supported", value.encoding);
}

serde_json::from_slice::<T>(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e))
serde_json::from_slice::<T>(&payload.to_bytes()).map_err(|e| anyhow!("{:?}", e))
}
12 changes: 5 additions & 7 deletions zenoh-flow-daemon/src/queries/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{collections::HashMap, sync::Arc};

use serde::{Deserialize, Serialize};
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind};
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::{InstanceState, Runtime};

Expand Down Expand Up @@ -103,12 +103,10 @@ impl RuntimesQuery {
}
};

let sample = match payload {
Ok(payload) => Ok(Sample::new(query.key_expr().clone(), payload)),
Err(e) => Err(Value::from(e.to_string())),
};

if let Err(e) = query.reply(sample).res().await {
if let Err(e) = match payload {
Ok(payload) => query.reply(query.key_expr(), payload).await,
Err(e) => query.reply_err(e.to_string()).await,
} {
tracing::error!(
r#"Failed to reply to query < {} >:
Caused by:
Expand Down
Loading
Loading