Skip to content

Commit

Permalink
refacto: move descriptors logic in its own crate
Browse files Browse the repository at this point in the history
This refactoring will allow us to diminish the compilation time of Zenoh-Flow
nodes: the end-goal is to split Zenoh-Flow in multiple crates.

The following changes were also implemented:
- the `vars` section is propagated downstream,
- full support for JSON descriptors (other schemas can easily be added),
- the mapping section is "flattened" with the nodes (i.e. a
  FlattenedDataFlowDescriptor has no `mapping` section)
- the configuration is "flattened" with the nodes

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet committed Oct 13, 2023
1 parent 380fe6a commit 597257c
Show file tree
Hide file tree
Showing 51 changed files with 3,739 additions and 2 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
members = [
"cargo-zenoh-flow",
"zenoh-flow",
"zenoh-flow-commons",
"zenoh-flow-daemon",
"zenoh-flow-derive",
"zenoh-flow-descriptors",
"zenoh-flow-plugin",
"zfctl",
]
Expand All @@ -33,6 +35,7 @@ repository = "https://github.com/eclipse-zenoh/zenoh-flow"
version = "0.6.0-dev"

[workspace.dependencies]
anyhow = "1"
async-std = "1.12"
async-trait = "0.1.50"
base64 = "0.21"
Expand All @@ -42,7 +45,7 @@ flume = "0.11"
futures = "0.3.15"
git-version = "0.3"
log = "0.4"
serde = "1.0"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_cbor = "0.11"
serde_derive = "1.0"
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.70
1.70.0
30 changes: 30 additions & 0 deletions zenoh-flow-commons/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2021 - 2023 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#

[package]
authors = { workspace = true }
categories = { workspace = true }
description = "Internal crate for Zenoh-Flow."
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
name = "zenoh-flow-commons"
repository = { workspace = true }
version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
ramhorns = "0.14"
serde = { workspace = true }
serde_json = { workspace = true }
130 changes: 130 additions & 0 deletions zenoh-flow-commons/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::{ops::Deref, sync::Arc};

use serde::{Deserialize, Serialize};

use crate::merge::IMergeOverwrite;

/// A `Configuration` is a recursive key-value structure that allows modifying the behavior of a
/// node without altering its implementation.
///
/// It is effectively a re-export of [serde_json::Value].
///
/// # Examples
///
/// ## YAML
///
/// ```yaml
/// configuration:
/// name: "John Doe",
/// age: 43,
/// phones:
/// - "+44 1234567"
/// - "+44 2345678"
/// ```
///
/// ## JSON
///
/// ```json
/// "configuration": {
/// "name": "John Doe",
/// "age": 43,
/// "phones": [
/// "+44 1234567",
/// "+44 2345678"
/// ]
/// }
/// ```
//
// NOTE: we take the `serde_json` representation because:
// - JSON is the most supported representation when going online,
// - a `serde_json::Value` can be converted to a `serde_yaml::Value` whereas the opposite is not
// true (YAML introduces "tags" which are not supported by JSON).
#[derive(Default, Deserialize, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct Configuration(Arc<serde_json::Value>);

impl Deref for Configuration {
type Target = serde_json::Value;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl IMergeOverwrite for Configuration {
fn merge_overwrite(self, other: Self) -> Self {
if self == Configuration::default() {
return other;
}

if other == Configuration::default() {
return self;
}

match (self.as_object(), other.as_object()) {
(Some(this), Some(other)) => {
let mut other = other.clone();
let mut this = this.clone();

other.append(&mut this);
Configuration(Arc::new(other.into()))
}
(_, _) => unreachable!(
"We are checking, when deserializing, that a Configuration is a JSON object."
),
}
}
}

impl From<serde_json::Value> for Configuration {
fn from(value: serde_json::Value) -> Self {
Self(Arc::new(value))
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn test_merge_configurations() {
let global = Configuration(Arc::new(
json!({ "a": { "nested": true }, "b": ["an", "array"] }),
));
let local = Configuration(Arc::new(json!({ "a": { "not-nested": false }, "c": 1 })));

assert_eq!(
global.clone().merge_overwrite(local),
Configuration(Arc::new(
json!({ "a": { "nested": true }, "b": ["an", "array"], "c": 1 })
))
);

assert_eq!(
global,
global.clone().merge_overwrite(Configuration::default())
);
assert_eq!(
global,
Configuration::default().merge_overwrite(global.clone())
);
assert_eq!(
Configuration::default(),
Configuration::default().merge_overwrite(Configuration::default())
)
}
}
59 changes: 59 additions & 0 deletions zenoh-flow-commons/src/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

//! This module exposes the functions [deserialize_size] and [deserialize_time] that are used
//! throughout Zenoh-Flow to "parse" values used to express time or size.
//!
//! The external crates [bytesize] and [humantime] are leveraged for these purposes.

use std::str::FromStr;

use serde::Deserializer;

pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result<Option<usize>, D::Error>
where
D: Deserializer<'de>,
{
match serde::de::Deserialize::deserialize(deserializer) {
Ok(buf) => Ok(Some(
bytesize::ByteSize::from_str(buf)
.map_err(|_| {
serde::de::Error::custom(format!("Unable to parse value as bytes {buf}"))
})?
.as_u64() as usize,
)),
Err(_) => {
// log::warn!("failed to deserialize size: {:?}", e);
Ok(None)
}
}
}

pub fn deserialize_time<'de, D>(deserializer: D) -> std::result::Result<Option<u64>, D::Error>
where
D: Deserializer<'de>,
{
match serde::de::Deserialize::deserialize(deserializer) {
Ok::<&str, _>(buf) => {
let ht = (buf)
.parse::<humantime::Duration>()
.map_err(serde::de::Error::custom)?;
Ok(Some(ht.as_nanos() as u64))
}
Err(_) => {
// log::warn!("failed to deserialize time: {:?}", e);
Ok(None)
}
}
}
109 changes: 109 additions & 0 deletions zenoh-flow-commons/src/identifiers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;

use serde::{Deserialize, Serialize};

/// A `NodeId` identifies a Node in a data flow.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Hash)]
pub struct NodeId(Arc<str>);

impl Deref for NodeId {
type Target = Arc<str>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl From<String> for NodeId {
fn from(value: String) -> Self {
Self(value.into())
}
}

impl From<&str> for NodeId {
fn from(value: &str) -> Self {
Self(value.into())
}
}

/// A `PortId` identifies an `Input` or an `Output` of a Node.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
pub struct PortId(Arc<str>);

impl Deref for PortId {
type Target = Arc<str>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Display for PortId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl From<String> for PortId {
fn from(value: String) -> Self {
Self(value.into())
}
}

impl From<&str> for PortId {
fn from(value: &str) -> Self {
Self(value.into())
}
}

/// A `PortId` identifies an `Input` or an `Output` of a Node.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)]
pub struct RuntimeId(Arc<str>);

impl Deref for RuntimeId {
type Target = Arc<str>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Display for RuntimeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", &self.0)
}
}

impl From<String> for RuntimeId {
fn from(value: String) -> Self {
Self(value.into())
}
}

impl From<&str> for RuntimeId {
fn from(value: &str) -> Self {
Self(value.into())
}
}
28 changes: 28 additions & 0 deletions zenoh-flow-commons/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

mod vars;
pub use vars::Vars;

mod identifiers;
pub use identifiers::{NodeId, PortId, RuntimeId};

mod configuration;
pub use configuration::Configuration;

mod merge;
pub use merge::IMergeOverwrite;

/// Zenoh-Flow's result type.
pub type Result<T> = std::result::Result<T, anyhow::Error>;
Loading

0 comments on commit 597257c

Please sign in to comment.