From c093ad02c919b0ecbc4814548ac2e6fd618bdbec Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Wed, 18 Oct 2023 21:49:16 +0200 Subject: [PATCH] refacto: move records into their own crate This commit creates the package `zenoh-flow-records`. The central structure is the `DataFlowRecord`: a data flow that is ready to be instantiated. All nodes are mapped to a runtime and the required connectors were added. The preferred way to create a `DataFlowRecord` is to call the method `complete_mapping_and_connect` on a `FlattenedDataFlowDescriptor` instance. This commit was also the opportunity to: - remove the `runtime` field from all the descriptors / records, - the remaining `String` were transformed into `Arc`, Signed-off-by: Julien Loudet --- Cargo.toml | 6 +- zenoh-flow-commons/Cargo.toml | 2 + zenoh-flow-commons/src/deserialize.rs | 15 ++ zenoh-flow-commons/src/lib.rs | 19 +- zenoh-flow-commons/src/runtime.rs | 22 ++ zenoh-flow-commons/src/shared_memory.rs | 62 +++++ zenoh-flow-descriptors/Cargo.toml | 6 +- zenoh-flow-descriptors/src/composite/io.rs | 3 +- .../src/composite/operator.rs | 2 - zenoh-flow-descriptors/src/composite/tests.rs | 9 - zenoh-flow-descriptors/src/dataflow.rs | 44 ++-- zenoh-flow-descriptors/src/deserialize.rs | 73 ------ .../src/flattened/dataflow.rs | 164 +++++++++++- zenoh-flow-descriptors/src/flattened/mod.rs | 10 +- .../src/flattened/nodes/operator.rs | 57 ++--- .../src/flattened/nodes/sink.rs | 39 +-- .../src/flattened/nodes/source.rs | 40 +-- zenoh-flow-descriptors/src/flattened/tests.rs | 235 ++++++++++++++++++ zenoh-flow-descriptors/src/io.rs | 22 +- zenoh-flow-descriptors/src/lib.rs | 2 - zenoh-flow-descriptors/src/link.rs | 26 +- zenoh-flow-descriptors/src/nodes/mod.rs | 17 +- zenoh-flow-descriptors/src/nodes/operator.rs | 27 +- zenoh-flow-descriptors/src/nodes/sink.rs | 23 +- zenoh-flow-descriptors/src/nodes/source.rs | 16 +- zenoh-flow-descriptors/src/tests.rs | 39 ++- zenoh-flow-records/Cargo.toml | 34 +++ zenoh-flow-records/src/connectors.rs | 45 ++++ zenoh-flow-records/src/dataflow.rs | 87 +++++++ zenoh-flow-records/src/io.rs | 30 +++ zenoh-flow-records/src/lib.rs | 28 +++ zenoh-flow-records/src/links.rs | 25 ++ zenoh-flow-records/src/nodes.rs | 48 ++++ 33 files changed, 989 insertions(+), 288 deletions(-) create mode 100644 zenoh-flow-commons/src/runtime.rs create mode 100644 zenoh-flow-commons/src/shared_memory.rs delete mode 100644 zenoh-flow-descriptors/src/deserialize.rs create mode 100644 zenoh-flow-records/Cargo.toml create mode 100644 zenoh-flow-records/src/connectors.rs create mode 100644 zenoh-flow-records/src/dataflow.rs create mode 100644 zenoh-flow-records/src/io.rs create mode 100644 zenoh-flow-records/src/lib.rs create mode 100644 zenoh-flow-records/src/links.rs create mode 100644 zenoh-flow-records/src/nodes.rs diff --git a/Cargo.toml b/Cargo.toml index cff3ce95..2ed93eeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "zenoh-flow-derive", "zenoh-flow-descriptors", "zenoh-flow-plugin", + "zenoh-flow-records", "zfctl", ] @@ -51,11 +52,14 @@ serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.9" uhlc = "0.6" -uuid = "1.1" +uuid = { version = "1.1", features = ["serde", "v4"] } zenoh = { version = "0.7.2-rc" } zenoh-collections = { version = "0.7.2-rc" } zenoh-core = { version = "0.7.2-rc" } zenoh-ext = { version = "0.7.2-rc" } +zenoh-flow-commons = { path = "./zenoh-flow-commons" } +zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" } +zenoh-flow-records = { path = "./zenoh-flow-records" } zenoh-plugin-trait = { version = "0.7.2-rc", default-features = false } zenoh-sync = { version = "0.7.2-rc" } zenoh-util = { version = "0.7.2-rc" } diff --git a/zenoh-flow-commons/Cargo.toml b/zenoh-flow-commons/Cargo.toml index f8b36e26..87e7bae4 100644 --- a/zenoh-flow-commons/Cargo.toml +++ b/zenoh-flow-commons/Cargo.toml @@ -25,6 +25,8 @@ version = { workspace = true } [dependencies] anyhow = { workspace = true } +bytesize = "1.2.0" +humantime = "2.1" ramhorns = "0.14" serde = { workspace = true } serde_json = { workspace = true } diff --git a/zenoh-flow-commons/src/deserialize.rs b/zenoh-flow-commons/src/deserialize.rs index dd8267f3..78454395 100644 --- a/zenoh-flow-commons/src/deserialize.rs +++ b/zenoh-flow-commons/src/deserialize.rs @@ -17,10 +17,25 @@ //! //! The external crates [bytesize] and [humantime] are leveraged for these purposes. +use crate::NodeId; use std::str::FromStr; use serde::Deserializer; +pub fn deserialize_id<'de, D>(deserializer: D) -> std::result::Result +where + D: Deserializer<'de>, +{ + let id: String = serde::de::Deserialize::deserialize(deserializer)?; + if id.contains('/') { + return Err(serde::de::Error::custom(format!( + "A NodeId cannot contain any '/': {id}" + ))); + } + + Ok(id.into()) +} + pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result, D::Error> where D: Deserializer<'de>, diff --git a/zenoh-flow-commons/src/lib.rs b/zenoh-flow-commons/src/lib.rs index 737556f8..a5b4a666 100644 --- a/zenoh-flow-commons/src/lib.rs +++ b/zenoh-flow-commons/src/lib.rs @@ -12,17 +12,26 @@ // ZettaScale Zenoh Team, // -mod vars; -pub use vars::Vars; +mod configuration; +pub use configuration::Configuration; + +mod deserialize; +pub use deserialize::deserialize_id; mod identifiers; pub use identifiers::{NodeId, PortId, RuntimeId}; -mod configuration; -pub use configuration::Configuration; - mod merge; pub use merge::IMergeOverwrite; +mod runtime; +pub use runtime::RuntimeContext; + +mod shared_memory; +pub use shared_memory::{SharedMemoryConfiguration, SharedMemoryParameters}; + +mod vars; +pub use vars::Vars; + /// Zenoh-Flow's result type. pub type Result = std::result::Result; diff --git a/zenoh-flow-commons/src/runtime.rs b/zenoh-flow-commons/src/runtime.rs new file mode 100644 index 00000000..b0d749ac --- /dev/null +++ b/zenoh-flow-commons/src/runtime.rs @@ -0,0 +1,22 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{RuntimeId, SharedMemoryParameters}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)] +pub struct RuntimeContext { + pub id: RuntimeId, + pub shared_memory: SharedMemoryParameters, +} diff --git a/zenoh-flow-commons/src/shared_memory.rs b/zenoh-flow-commons/src/shared_memory.rs new file mode 100644 index 00000000..298f0b6d --- /dev/null +++ b/zenoh-flow-commons/src/shared_memory.rs @@ -0,0 +1,62 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; + +use crate::deserialize::{deserialize_size, deserialize_time}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct SharedMemoryConfiguration { + pub(crate) number_elements: Option, + #[serde(deserialize_with = "deserialize_size")] + pub(crate) element_size: Option, + #[serde(deserialize_with = "deserialize_time")] + pub(crate) backoff: Option, +} + +// TODO@J-Loudet +impl Display for SharedMemoryConfiguration { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct SharedMemoryParameters { + pub number_elements: usize, + // Size, in bytes, of a single element. + pub element_size: usize, + // Duration, in nanoseconds, to wait before retrying the last operation. + pub backoff: u64, +} + +// TODO@J-Loudet +impl Display for SharedMemoryParameters { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl SharedMemoryParameters { + pub fn from_configuration(configuration: &SharedMemoryConfiguration, default: &Self) -> Self { + Self { + number_elements: configuration + .number_elements + .unwrap_or(default.number_elements), + element_size: configuration.element_size.unwrap_or(default.element_size), + backoff: configuration.backoff.unwrap_or(default.backoff), + } + } +} diff --git a/zenoh-flow-descriptors/Cargo.toml b/zenoh-flow-descriptors/Cargo.toml index 1446861e..a9f26f55 100644 --- a/zenoh-flow-descriptors/Cargo.toml +++ b/zenoh-flow-descriptors/Cargo.toml @@ -25,10 +25,10 @@ version = { workspace = true } [dependencies] anyhow = { workspace = true } -bytesize = "1.2.0" -humantime = "2.1" serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } url = "2.2" -zenoh-flow-commons = { path = "../zenoh-flow-commons" } +uuid = { workspace = true } +zenoh-flow-commons = { workspace = true } +zenoh-flow-records = { workspace = true } diff --git a/zenoh-flow-descriptors/src/composite/io.rs b/zenoh-flow-descriptors/src/composite/io.rs index de2a1f72..ab2cf22c 100644 --- a/zenoh-flow-descriptors/src/composite/io.rs +++ b/zenoh-flow-descriptors/src/composite/io.rs @@ -13,11 +13,10 @@ // use super::ISubstituable; -use crate::deserialize::deserialize_id; use crate::{InputDescriptor, OutputDescriptor}; use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{NodeId, PortId}; +use zenoh_flow_commons::{deserialize_id, NodeId, PortId}; /// TODO@J-Loudet example? /// TODO@J-Loudet documentation? diff --git a/zenoh-flow-descriptors/src/composite/operator.rs b/zenoh-flow-descriptors/src/composite/operator.rs index 7c66d8e6..d382ec46 100644 --- a/zenoh-flow-descriptors/src/composite/operator.rs +++ b/zenoh-flow-descriptors/src/composite/operator.rs @@ -152,7 +152,6 @@ impl IFlattenableComposite for CompositeOperatorDescriptor { mut self, composite_id: NodeId, overwriting_configuration: Configuration, - runtime: Option, vars: Vars, ancestors: &mut HashSet>, ) -> Result<(Vec, Vec, Patch)> { @@ -172,7 +171,6 @@ impl IFlattenableComposite for CompositeOperatorDescriptor { let (mut operators, mut links, patch) = operator_desc .flatten_maybe_composite::( node_overwriting_configuration, - runtime.clone(), vars.clone(), // If we don't clone the ancestors between successive calls, consecutive composite operators // referring to the same descriptor would be falsely flagged as "infinite recursions". diff --git a/zenoh-flow-descriptors/src/composite/tests.rs b/zenoh-flow-descriptors/src/composite/tests.rs index 48c913e7..ed266c3f 100644 --- a/zenoh-flow-descriptors/src/composite/tests.rs +++ b/zenoh-flow-descriptors/src/composite/tests.rs @@ -57,12 +57,10 @@ fn test_flatten_composite_descriptor_non_nested() { configuration: Configuration::default(), }; - let runtime = Some("zf-plugin-1".into()); let (flattened_operators, flattened_links, patch) = composite_descriptor .flatten_composite( "composite".into(), Configuration::default(), - runtime.clone(), Vars::default(), &mut HashSet::default(), ) @@ -82,7 +80,6 @@ fn test_flatten_composite_descriptor_non_nested() { outputs: vec!["operator-1-out".into()], uri: Some("file://operator-1.so".into()), configuration: Configuration::default(), - runtime: runtime.clone(), }, FlattenedOperatorDescriptor { id: "composite/my-operator-2".into(), @@ -91,7 +88,6 @@ fn test_flatten_composite_descriptor_non_nested() { outputs: vec!["operator-2-out".into()], uri: Some("file://operator-2.so".into()), configuration: Configuration::default(), - runtime, }, ]; @@ -180,7 +176,6 @@ fn test_flatten_composite_descriptor_nested() { .flatten_composite( "composite".into(), Configuration::default(), - None, Vars::from([("SCHEME", "file://"), ("BASE_DIR", BASE_DIR)]), &mut HashSet::default(), ) @@ -197,7 +192,6 @@ fn test_flatten_composite_descriptor_nested() { outputs: vec!["composite-outer-out".into()], uri: Some("file://composite-outer.so".into()), configuration: Configuration::default(), - runtime: None, }, FlattenedOperatorDescriptor { id: "composite/composite-nested/operator-1".into(), @@ -206,7 +200,6 @@ fn test_flatten_composite_descriptor_nested() { outputs: vec!["operator-1-out".into()], uri: Some("file://operator-1.so".into()), configuration: Configuration::default(), - runtime: None, }, FlattenedOperatorDescriptor { id: "composite/composite-nested/operator-2".into(), @@ -215,7 +208,6 @@ fn test_flatten_composite_descriptor_nested() { outputs: vec!["operator-2-out".into()], uri: Some("file://operator-2.so".into()), configuration: Configuration::default(), - runtime: None, }, FlattenedOperatorDescriptor { id: "composite/composite-outer-i".into(), @@ -224,7 +216,6 @@ fn test_flatten_composite_descriptor_nested() { outputs: vec!["composite-outer-out".into()], uri: Some("file://composite-outer.so".into()), configuration: Configuration::default(), - runtime: None, }, ]; diff --git a/zenoh-flow-descriptors/src/dataflow.rs b/zenoh-flow-descriptors/src/dataflow.rs index eff4dfd1..484731d0 100644 --- a/zenoh-flow-descriptors/src/dataflow.rs +++ b/zenoh-flow-descriptors/src/dataflow.rs @@ -17,7 +17,10 @@ use crate::{ SinkDescriptor, SourceDescriptor, }; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use serde::{Deserialize, Serialize}; use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, RuntimeId, Vars}; @@ -146,7 +149,7 @@ use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, Runtime /// ``` #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct DataFlowDescriptor { - pub flow: String, + pub flow: Arc, #[serde(default)] pub configuration: Configuration, pub operators: Vec, @@ -168,7 +171,7 @@ impl DataFlowDescriptor { mapping, } = self; - let mapping = mapping.unwrap_or_default(); + let mut mapping = mapping.unwrap_or_default(); let mut flattened_sources = Vec::with_capacity(sources.len()); for source_desc in sources { @@ -177,13 +180,10 @@ impl DataFlowDescriptor { .configuration .clone() .merge_overwrite(flow_configuration.clone()); - let runtime = mapping.get(&source_desc.id).cloned(); - flattened_sources.push(source_desc.flatten::( - overwriting_configuration, - runtime, - vars.clone(), - )?); + flattened_sources.push( + source_desc.flatten::(overwriting_configuration, vars.clone())?, + ); } let mut flattened_sinks = Vec::with_capacity(sinks.len()); @@ -194,13 +194,9 @@ impl DataFlowDescriptor { .clone() .merge_overwrite(flow_configuration.clone()); - let runtime = mapping.get(&sink_desc.id).cloned(); - - flattened_sinks.push(sink_desc.flatten::( - overwriting_configuration, - runtime, - vars.clone(), - )?); + flattened_sinks.push( + sink_desc.flatten::(overwriting_configuration, vars.clone())?, + ); } let mut flattened_operators = Vec::with_capacity(operators.len()); @@ -210,16 +206,27 @@ impl DataFlowDescriptor { .configuration .clone() .merge_overwrite(flow_configuration.clone()); - let runtime = mapping.get(&operator_desc.id).cloned(); + let operator_id = operator_desc.id.clone(); let (mut flat_operators, mut flat_links, patch) = operator_desc .flatten_maybe_composite::( overwriting_configuration, - runtime, vars.clone(), &mut HashSet::default(), )?; + // If the composite operator (i.e. before flattening) appears in the mapping, we need to: + // 1. remove it from the list (it is not, per se, a real operator), + // 2. propagate that mapping to all the "flattened" operators. + // + // NOTE: it does not matter if the operator is not composite, we perform a (useless) removal / re-insert on + // the same `NodeId`. It could potentially be avoided but this code is not on the critical path. + if let Some(runtime) = mapping.remove(&operator_id) { + flat_operators.iter().for_each(|operator| { + mapping.insert(operator.id.clone(), runtime.clone()); + }); + } + flattened_operators.append(&mut flat_operators); patch.apply(&mut links); links.append(&mut flat_links); @@ -231,6 +238,7 @@ impl DataFlowDescriptor { operators: flattened_operators, sinks: flattened_sinks, links, + mapping, }) } } diff --git a/zenoh-flow-descriptors/src/deserialize.rs b/zenoh-flow-descriptors/src/deserialize.rs deleted file mode 100644 index d502f464..00000000 --- a/zenoh-flow-descriptors/src/deserialize.rs +++ /dev/null @@ -1,73 +0,0 @@ -// -// Copyright (c) 2021 - 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! This module exposes the functions [deserialize_size] and [deserialize_time] that are used -//! throughout Zenoh-Flow to "parse" values used to express time or size. -//! -//! The external crates [bytesize] and [humantime] are leveraged for these purposes. - -use serde::Deserializer; -use std::str::FromStr; -use zenoh_flow_commons::NodeId; - -pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result, D::Error> -where - D: Deserializer<'de>, -{ - match serde::de::Deserialize::deserialize(deserializer) { - Ok(buf) => Ok(Some( - bytesize::ByteSize::from_str(buf) - .map_err(|_| { - serde::de::Error::custom(format!("Unable to parse value as bytes {buf}")) - })? - .as_u64() as usize, - )), - Err(_) => { - // log::warn!("failed to deserialize size: {:?}", e); - Ok(None) - } - } -} - -pub fn deserialize_time<'de, D>(deserializer: D) -> std::result::Result, D::Error> -where - D: Deserializer<'de>, -{ - match serde::de::Deserialize::deserialize(deserializer) { - Ok::<&str, _>(buf) => { - let ht = (buf) - .parse::() - .map_err(serde::de::Error::custom)?; - Ok(Some(ht.as_nanos() as u64)) - } - Err(_) => { - // log::warn!("failed to deserialize time: {:?}", e); - Ok(None) - } - } -} - -pub fn deserialize_id<'de, D>(deserializer: D) -> std::result::Result -where - D: Deserializer<'de>, -{ - let id: String = serde::de::Deserialize::deserialize(deserializer)?; - if id.contains('/') { - return Err(serde::de::Error::custom(format!( - "A NodeId cannot contain any '/': {id}" - ))); - } - - Ok(id.into()) -} diff --git a/zenoh-flow-descriptors/src/flattened/dataflow.rs b/zenoh-flow-descriptors/src/flattened/dataflow.rs index d7353133..c7af37c1 100644 --- a/zenoh-flow-descriptors/src/flattened/dataflow.rs +++ b/zenoh-flow-descriptors/src/flattened/dataflow.rs @@ -12,11 +12,16 @@ // ZettaScale Zenoh Team, // +use std::{collections::HashMap, fmt::Display, mem, sync::Arc}; + use crate::{ - FlattenedOperatorDescriptor, FlattenedSinkDescriptor, FlattenedSourceDescriptor, LinkDescriptor, + FlattenedOperatorDescriptor, FlattenedSinkDescriptor, FlattenedSourceDescriptor, + InputDescriptor, LinkDescriptor, OutputDescriptor, }; - use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use zenoh_flow_commons::{NodeId, PortId, RuntimeContext, RuntimeId, SharedMemoryParameters}; +use zenoh_flow_records::{DataFlowRecord, ZenohReceiver, ZenohSender}; /// TODO@J-Loudet Documentation? /// @@ -37,8 +42,7 @@ use serde::{Deserialize, Serialize}; /// uri: file:///home/zenoh-flow/node/libsource.so /// outputs: /// - out-operator -/// mapping: zenoh-flow-plugin-0 -/// +/// /// operators: /// - id: Operator-1 /// name: Operator @@ -50,7 +54,7 @@ use serde::{Deserialize, Serialize}; /// - in-source /// outputs: /// - out-sink -/// +/// /// sinks: /// - id: Sink-2 /// name: Sink @@ -68,7 +72,7 @@ use serde::{Deserialize, Serialize}; /// to: /// node : Operator-1 /// input : in-source -/// +/// /// - from: /// node : Operator-1 /// output : out-sink @@ -102,7 +106,7 @@ use serde::{Deserialize, Serialize}; /// \"mapping\": \"zenoh-flow-plugin-0\" /// } /// ], -/// +/// /// \"operators\": [ /// { /// \"id\": \"Operator-1\", @@ -120,7 +124,7 @@ use serde::{Deserialize, Serialize}; /// ] /// } /// ], -/// +/// /// \"sinks\": [ /// { /// \"id\": \"Sink-2\", @@ -166,9 +170,151 @@ use serde::{Deserialize, Serialize}; /// ``` #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct FlattenedDataFlowDescriptor { - pub flow: String, + pub flow: Arc, pub sources: Vec, pub operators: Vec, pub sinks: Vec, pub links: Vec, + #[serde(default)] + pub mapping: HashMap, +} + +// TODO@J-Loudet +impl Display for FlattenedDataFlowDescriptor { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl FlattenedDataFlowDescriptor { + /// Assign the nodes without an explicit mapping to the provided runtime and generate the connections between the + /// nodes running on different runtimes. + /// + /// This method will consume the `FlattenedDataFlowDescriptor` and produce a `DataFlowRecord`. + pub fn complete_mapping_and_connect(self, default_runtime: RuntimeContext) -> DataFlowRecord { + let mut senders = HashMap::default(); + let mut receivers = HashMap::default(); + let mut connector_links = Vec::default(); + + let flow_id = Uuid::new_v4(); + + let FlattenedDataFlowDescriptor { + flow, + sources, + operators, + sinks, + mut links, + mut mapping, + } = self; + + for link in links.iter_mut() { + let runtime_from = mapping + .entry(link.from.node.clone()) + .or_insert(default_runtime.id.clone()) + .clone(); + let runtime_to = mapping + .entry(link.to.node.clone()) + .or_insert(default_runtime.id.clone()) + .clone(); + + // The nodes will run on the same runtime, no need for connectors. + if runtime_from == runtime_to { + continue; + } + + // Nodes are on different runtimes: we generate a special key expression for communications through Zenoh + // (either Shared Memory or standard pub/sub). + // + // We also need to: + // - update the link and replace the "to" part with the sender, + // - add another link that goes from the receiver to the "to". + let resource: Arc = format!( + "{}/{}/{}/{}", + &flow, &flow_id, link.from.node, link.from.output + ) + .into(); + + let shared_memory_parameters = SharedMemoryParameters::from_configuration( + &link.shared_memory, + &default_runtime.shared_memory, + ); + + let sender_id: NodeId = + format!("z-sender/{}/{}", link.from.node, link.from.output).into(); + let sender_input: PortId = format!("{}-input", sender_id).into(); + + let receiver_id: NodeId = + format!("z-receiver/{}/{}", link.to.node, link.to.input).into(); + let receiver_output: PortId = format!("{}-output", receiver_id).into(); + + // Update the link, replacing "to" with the sender. + let to = mem::replace( + &mut link.to, + InputDescriptor { + node: sender_id.clone(), + input: sender_input.clone(), + }, + ); + + // Create a new link, connecting the receiver to the "to". + connector_links.push(LinkDescriptor { + from: OutputDescriptor { + node: receiver_id.clone(), + output: receiver_output.clone(), + }, + to, + shared_memory: link.shared_memory.clone(), + }); + + // Create the connector nodes — without forgetting their mapping. + let sender = ZenohSender { + id: sender_id.clone(), + resource: resource.clone(), + input: sender_input.clone(), + shared_memory: shared_memory_parameters.clone(), + }; + senders.insert(sender_id.clone(), sender); + mapping.insert(sender_id, runtime_from); + + let receiver = ZenohReceiver { + id: receiver_id.clone(), + resource, + output: receiver_output, + shared_memory: shared_memory_parameters, + }; + receivers.insert(receiver_id.clone(), receiver); + mapping.insert(receiver_id, runtime_to); + } + + // Add the new links. + links.append(&mut connector_links); + + DataFlowRecord { + uuid: flow_id, + flow, + sources: sources + .into_iter() + .map(|source| (source.id.clone(), source.into())) + .collect::>(), + operators: operators + .into_iter() + .map(|operator| (operator.id.clone(), operator.into())) + .collect::>(), + sinks: sinks + .into_iter() + .map(|sink| (sink.id.clone(), sink.into())) + .collect::>(), + receivers, + senders, + links: links + .into_iter() + .map(|link| link.into_record(&default_runtime.shared_memory)) + .collect::>(), + mapping, + } + } } + +#[cfg(test)] +#[path = "./tests.rs"] +mod tests; diff --git a/zenoh-flow-descriptors/src/flattened/mod.rs b/zenoh-flow-descriptors/src/flattened/mod.rs index 53ffc1ef..5cbf7d43 100644 --- a/zenoh-flow-descriptors/src/flattened/mod.rs +++ b/zenoh-flow-descriptors/src/flattened/mod.rs @@ -26,7 +26,7 @@ pub use dataflow::FlattenedDataFlowDescriptor; use crate::{composite::Substitutions, InputDescriptor, LinkDescriptor, OutputDescriptor}; use serde::de::DeserializeOwned; -use zenoh_flow_commons::{Configuration, NodeId, Result, RuntimeId, Vars}; +use zenoh_flow_commons::{Configuration, NodeId, Result, Vars}; pub(crate) trait IFlattenableComposite: DeserializeOwned + Display + Debug + Clone { type Flattened: Debug + Display; @@ -36,7 +36,6 @@ pub(crate) trait IFlattenableComposite: DeserializeOwned + Display + Debug + Clo self, id: NodeId, overwritting_configuration: Configuration, - runtime: Option, vars: Vars, ancestors: &mut HashSet>, ) -> Result<(Vec, Vec, Patch)>; @@ -45,12 +44,7 @@ pub(crate) trait IFlattenableComposite: DeserializeOwned + Display + Debug + Clo pub trait IFlattenable: DeserializeOwned + Display + Debug { type Flattened: Debug + Display; - fn flatten( - self, - id: NodeId, - overwritting_configuration: Configuration, - runtime: Option, - ) -> Self::Flattened; + fn flatten(self, id: NodeId, overwritting_configuration: Configuration) -> Self::Flattened; } #[derive(Default, Debug, PartialEq, Eq)] diff --git a/zenoh-flow-descriptors/src/flattened/nodes/operator.rs b/zenoh-flow-descriptors/src/flattened/nodes/operator.rs index 4f375546..0740e2b9 100644 --- a/zenoh-flow-descriptors/src/flattened/nodes/operator.rs +++ b/zenoh-flow-descriptors/src/flattened/nodes/operator.rs @@ -12,12 +12,11 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; - -use crate::OperatorDescriptor; - use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; +use std::fmt::Display; +use std::sync::Arc; +use zenoh_flow_commons::{Configuration, NodeId, PortId}; +use zenoh_flow_records::OperatorRecord; /// TODO@J-Loudet Documentation? /// @@ -66,13 +65,12 @@ use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, Runtime #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct FlattenedOperatorDescriptor { pub id: NodeId, - pub name: String, + pub name: Arc, pub inputs: Vec, pub outputs: Vec, - pub uri: Option, + pub uri: Option>, #[serde(default)] pub configuration: Configuration, - pub runtime: Option, } impl Display for FlattenedOperatorDescriptor { @@ -82,23 +80,28 @@ impl Display for FlattenedOperatorDescriptor { } impl FlattenedOperatorDescriptor { - /// TODO@J-Loudet: documentation + /// Update the identifier of the [FlattenedOperatorDescriptor] prepending the id of the + /// [CompositeOperatorDescriptor] it belongs to. + /// + /// # TODO /// - /// In case there are identical keys, *the provided configuration will override the configuration of the Operator*. - /// The rationale is that the configuration of the Operator **always** has the lowest priority. - pub fn new( - operator: OperatorDescriptor, - id: NodeId, - overwritting_configuration: Configuration, - runtime: Option, - ) -> Self { - let OperatorDescriptor { + /// - Prevent the usage of "/" in the id of nodes. + pub fn composite_id(&mut self, composite_id: &NodeId) -> NodeId { + self.id = format!("{composite_id}/{}", self.id).into(); + self.id.clone() + } +} + +impl From for OperatorRecord { + fn from(value: FlattenedOperatorDescriptor) -> Self { + let FlattenedOperatorDescriptor { + id, name, - uri, inputs, outputs, + uri, configuration, - } = operator; + } = value; Self { id, @@ -106,19 +109,7 @@ impl FlattenedOperatorDescriptor { inputs, outputs, uri, - configuration: overwritting_configuration.merge_overwrite(configuration), - runtime, + configuration, } } - - /// Update the identifier of the [FlattenedOperatorDescriptor] prepending the id of the - /// [CompositeOperatorDescriptor] it belongs to. - /// - /// # TODO - /// - /// - Prevent the usage of "/" in the id of nodes. - pub fn composite_id(&mut self, composite_id: &NodeId) -> NodeId { - self.id = format!("{composite_id}/{}", self.id).into(); - self.id.clone() - } } diff --git a/zenoh-flow-descriptors/src/flattened/nodes/sink.rs b/zenoh-flow-descriptors/src/flattened/nodes/sink.rs index 7f94cf73..9dc41a9f 100644 --- a/zenoh-flow-descriptors/src/flattened/nodes/sink.rs +++ b/zenoh-flow-descriptors/src/flattened/nodes/sink.rs @@ -12,12 +12,11 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; +use std::{fmt::Display, sync::Arc}; use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; - -use crate::SinkDescriptor; +use zenoh_flow_commons::{Configuration, NodeId, PortId}; +use zenoh_flow_records::SinkRecord; /// Textual representation of a Zenoh-Flow Sink node. /// @@ -60,12 +59,11 @@ use crate::SinkDescriptor; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct FlattenedSinkDescriptor { pub id: NodeId, - pub name: String, - pub uri: Option, + pub name: Arc, + pub uri: Option>, pub inputs: Vec, #[serde(default)] pub configuration: Configuration, - pub runtime: Option, } impl Display for FlattenedSinkDescriptor { @@ -74,27 +72,14 @@ impl Display for FlattenedSinkDescriptor { } } -impl FlattenedSinkDescriptor { - pub fn new( - sink: SinkDescriptor, - id: NodeId, - overwritting_configuration: Configuration, - runtime: Option, - ) -> Self { - let SinkDescriptor { - name, - configuration, - uri, - inputs, - } = sink; - +impl From for SinkRecord { + fn from(value: FlattenedSinkDescriptor) -> Self { Self { - id, - name, - uri, - inputs, - configuration: overwritting_configuration.merge_overwrite(configuration), - runtime, + id: value.id, + name: value.name, + inputs: value.inputs, + uri: value.uri, + configuration: value.configuration, } } } diff --git a/zenoh-flow-descriptors/src/flattened/nodes/source.rs b/zenoh-flow-descriptors/src/flattened/nodes/source.rs index de0dbc27..c625de79 100644 --- a/zenoh-flow-descriptors/src/flattened/nodes/source.rs +++ b/zenoh-flow-descriptors/src/flattened/nodes/source.rs @@ -12,12 +12,10 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; - use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; - -use crate::SourceDescriptor; +use std::{fmt::Display, sync::Arc}; +use zenoh_flow_commons::{Configuration, NodeId, PortId}; +use zenoh_flow_records::SourceRecord; /// Textual representation of a Zenoh-Flow Source node. /// @@ -62,12 +60,11 @@ use crate::SourceDescriptor; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct FlattenedSourceDescriptor { pub id: NodeId, - pub name: String, - pub uri: Option, + pub name: Arc, + pub uri: Option>, pub outputs: Vec, #[serde(default)] pub configuration: Configuration, - pub runtime: Option, } impl Display for FlattenedSourceDescriptor { @@ -76,27 +73,14 @@ impl Display for FlattenedSourceDescriptor { } } -impl FlattenedSourceDescriptor { - pub fn new( - source: SourceDescriptor, - id: NodeId, - overwritting_configuration: Configuration, - runtime: Option, - ) -> Self { - let SourceDescriptor { - name, - uri, - outputs, - configuration, - } = source; - +impl From for SourceRecord { + fn from(value: FlattenedSourceDescriptor) -> Self { Self { - id, - name, - uri, - outputs, - configuration: overwritting_configuration.merge_overwrite(configuration), - runtime, + id: value.id, + name: value.name, + outputs: value.outputs, + uri: value.uri, + configuration: value.configuration, } } } diff --git a/zenoh-flow-descriptors/src/flattened/tests.rs b/zenoh-flow-descriptors/src/flattened/tests.rs index 8b137891..e8d79895 100644 --- a/zenoh-flow-descriptors/src/flattened/tests.rs +++ b/zenoh-flow-descriptors/src/flattened/tests.rs @@ -1 +1,236 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::{ + FlattenedDataFlowDescriptor, FlattenedOperatorDescriptor, FlattenedSinkDescriptor, + FlattenedSourceDescriptor, InputDescriptor, LinkDescriptor, OutputDescriptor, +}; +use std::collections::HashMap; +use zenoh_flow_commons::{ + Configuration, NodeId, PortId, RuntimeContext, RuntimeId, SharedMemoryConfiguration, + SharedMemoryParameters, +}; +use zenoh_flow_records::{InputRecord, LinkRecord, OutputRecord}; + +const SOURCE_ID: &str = "source-0"; +const SOURCE_OUTPUT: &str = "out-operator-1"; + +const OPERATOR_ID: &str = "operator-1"; +const OPERATOR_INPUT: &str = "in-source-0"; +const OPERATOR_OUTPUT: &str = "out-sink-2"; + +const SINK_ID: &str = "sink-2"; +const SINK_INPUT: &str = "in-operator-1"; + +const RUNTIME_ID: &str = "default-runtime"; + +fn generate_default_runtime() -> RuntimeContext { + RuntimeContext { + id: RUNTIME_ID.into(), + shared_memory: SharedMemoryParameters { + number_elements: 10, + element_size: 10_000, + backoff: 500, + }, + } +} + +fn generate_flattened_data_flow_descriptor() -> FlattenedDataFlowDescriptor { + let source = FlattenedSourceDescriptor { + id: SOURCE_ID.into(), + name: "Source".into(), + uri: Some("file:///home/zenoh-flow/source.so".into()), + outputs: vec![SOURCE_OUTPUT.into()], + configuration: Configuration::default(), + }; + + let operator = FlattenedOperatorDescriptor { + id: OPERATOR_ID.into(), + name: "Operator".into(), + inputs: vec![OPERATOR_INPUT.into()], + outputs: vec![OPERATOR_OUTPUT.into()], + uri: Some("file:///home/zenoh-flow/operator.so".into()), + configuration: Configuration::default(), + }; + + let sink = FlattenedSinkDescriptor { + id: SINK_ID.into(), + name: "Sink".into(), + inputs: vec![SINK_INPUT.into()], + uri: Some("file:///home/zenoh-flow/sink.so".into()), + configuration: Configuration::default(), + }; + + let links = vec![ + LinkDescriptor { + from: OutputDescriptor { + node: SOURCE_ID.into(), + output: SOURCE_OUTPUT.into(), + }, + to: InputDescriptor { + node: OPERATOR_ID.into(), + input: OPERATOR_INPUT.into(), + }, + shared_memory: SharedMemoryConfiguration::default(), + }, + LinkDescriptor { + from: OutputDescriptor { + node: OPERATOR_ID.into(), + output: OPERATOR_OUTPUT.into(), + }, + to: InputDescriptor { + node: SINK_ID.into(), + input: SINK_INPUT.into(), + }, + shared_memory: SharedMemoryConfiguration::default(), + }, + ]; + + FlattenedDataFlowDescriptor { + flow: "test-flow".into(), + sources: vec![source], + operators: vec![operator], + sinks: vec![sink], + links, + mapping: HashMap::default(), + } +} + +#[test] +fn test_complete_mapping_and_connect_no_runtime() { + let default_runtime = generate_default_runtime(); + let flattened_descriptor = generate_flattened_data_flow_descriptor(); + + let data_flow_record = flattened_descriptor.complete_mapping_and_connect(default_runtime); + assert!(data_flow_record.receivers.is_empty()); + assert!(data_flow_record.senders.is_empty()); + + let expected_mapping = HashMap::from([ + (SOURCE_ID.into(), RUNTIME_ID.into()), + (OPERATOR_ID.into(), RUNTIME_ID.into()), + (SINK_ID.into(), RUNTIME_ID.into()), + ]); + assert_eq!(expected_mapping, data_flow_record.mapping); +} + +#[test] +fn test_complete_mapping_and_connect_all_different_runtime() { + let default_runtime = generate_default_runtime(); + let mut flattened_descriptor = generate_flattened_data_flow_descriptor(); + + let source_runtime: RuntimeId = "source-runtime".into(); + let sink_runtime: RuntimeId = "sink-runtime".into(); + + // We only indicate a runtime for the Source and the Sink. The Operator will be on the "default-runtime". + flattened_descriptor.mapping = HashMap::from([ + (SOURCE_ID.into(), source_runtime.clone()), + (SINK_ID.into(), sink_runtime.clone()), + ]); + + let data_flow_record = + flattened_descriptor.complete_mapping_and_connect(default_runtime.clone()); + // 2 = sender(source-runtime) + sender(default-runtime) + assert_eq!(data_flow_record.senders.len(), 2); + // 2 = receiver(default-runtime) + receiver(sink-runtime) + assert_eq!(data_flow_record.receivers.len(), 2); + + // 4 = source(source-runtime) -> sender(source-runtime) + // + receiver(default-runtime) -> operator(default-runtime) + // + operator(default-runtime) -> sender(default-runtime) + // + receiver(sink-runtime) -> sink(sink-runtime) + let source_operator_sender_id: NodeId = + format!("z-sender/{}/{}", SOURCE_ID, SOURCE_OUTPUT).into(); + let source_operator_sender_input: PortId = + format!("z-sender/{}/{}-input", SOURCE_ID, SOURCE_OUTPUT).into(); + let source_operator_receiver_id: NodeId = + format!("z-receiver/{}/{}", OPERATOR_ID, OPERATOR_INPUT).into(); + let source_operator_receiver_output: PortId = + format!("z-receiver/{}/{}-output", OPERATOR_ID, OPERATOR_INPUT).into(); + + let operator_sink_sender_id: NodeId = + format!("z-sender/{}/{}", OPERATOR_ID, OPERATOR_OUTPUT).into(); + let operator_sink_sender_input: PortId = + format!("z-sender/{}/{}-input", OPERATOR_ID, OPERATOR_OUTPUT).into(); + let operator_sink_receiver_id: NodeId = format!("z-receiver/{}/{}", SINK_ID, SINK_INPUT).into(); + let operator_sink_receiver_output: PortId = + format!("z-receiver/{}/{}-output", SINK_ID, SINK_INPUT).into(); + + let expected_links = vec![ + LinkRecord { + from: OutputRecord { + node: SOURCE_ID.into(), + output: SOURCE_OUTPUT.into(), + }, + to: InputRecord { + node: source_operator_sender_id.clone(), + input: source_operator_sender_input, + }, + shared_memory: default_runtime.shared_memory.clone(), + }, + LinkRecord { + from: OutputRecord { + node: source_operator_receiver_id.clone(), + output: source_operator_receiver_output, + }, + to: InputRecord { + node: OPERATOR_ID.into(), + input: OPERATOR_INPUT.into(), + }, + shared_memory: default_runtime.shared_memory.clone(), + }, + LinkRecord { + from: OutputRecord { + node: OPERATOR_ID.into(), + output: OPERATOR_OUTPUT.into(), + }, + to: InputRecord { + node: operator_sink_sender_id, + input: operator_sink_sender_input, + }, + shared_memory: default_runtime.shared_memory.clone(), + }, + LinkRecord { + from: OutputRecord { + node: operator_sink_receiver_id, + output: operator_sink_receiver_output, + }, + to: InputRecord { + node: SINK_ID.into(), + input: SINK_INPUT.into(), + }, + shared_memory: default_runtime.shared_memory, + }, + ]; + assert_eq!(expected_links.len(), data_flow_record.links.len()); + expected_links + .iter() + .for_each(|expected_link| assert!(data_flow_record.links.contains(expected_link))); + + let expected_mapping = HashMap::from([ + (SOURCE_ID.into(), source_runtime.clone()), + (source_operator_sender_id, source_runtime), + (source_operator_receiver_id, default_runtime.id.clone()), + (OPERATOR_ID.into(), default_runtime.id.clone()), + ( + format!("z-sender/{}/{}", OPERATOR_ID, OPERATOR_OUTPUT).into(), + default_runtime.id, + ), + ( + format!("z-receiver/{}/{}", SINK_ID, SINK_INPUT).into(), + sink_runtime.clone(), + ), + (SINK_ID.into(), sink_runtime), + ]); + assert_eq!(expected_mapping, data_flow_record.mapping); +} diff --git a/zenoh-flow-descriptors/src/io.rs b/zenoh-flow-descriptors/src/io.rs index 2127d8b1..a4a3c61e 100644 --- a/zenoh-flow-descriptors/src/io.rs +++ b/zenoh-flow-descriptors/src/io.rs @@ -12,10 +12,10 @@ // ZettaScale Zenoh Team, // -use std::fmt; - use serde::{Deserialize, Serialize}; +use std::fmt; use zenoh_flow_commons::{NodeId, PortId}; +use zenoh_flow_records::{InputRecord, OutputRecord}; /// An `InputDescriptor` describes an Input port of a Zenoh-Flow node. /// @@ -41,6 +41,15 @@ impl InputDescriptor { } } +impl From for InputRecord { + fn from(this: InputDescriptor) -> Self { + InputRecord { + node: this.node, + input: this.input, + } + } +} + /// An `OutputDescriptor` describes an Output port of a Zenoh-Flow node. /// /// See [LinkDescriptor][crate::link::LinkDescriptor] for their usage. @@ -64,3 +73,12 @@ impl OutputDescriptor { } } } + +impl From for OutputRecord { + fn from(this: OutputDescriptor) -> Self { + OutputRecord { + node: this.node, + output: this.output, + } + } +} diff --git a/zenoh-flow-descriptors/src/lib.rs b/zenoh-flow-descriptors/src/lib.rs index 0d88ca2c..43cff8af 100644 --- a/zenoh-flow-descriptors/src/lib.rs +++ b/zenoh-flow-descriptors/src/lib.rs @@ -36,5 +36,3 @@ mod nodes; pub use nodes::{NodeDescriptor, OperatorDescriptor, SinkDescriptor, SourceDescriptor}; mod uri; - -mod deserialize; diff --git a/zenoh-flow-descriptors/src/link.rs b/zenoh-flow-descriptors/src/link.rs index 6f08570f..c7268443 100644 --- a/zenoh-flow-descriptors/src/link.rs +++ b/zenoh-flow-descriptors/src/link.rs @@ -13,11 +13,11 @@ // use crate::composite::{ISubstituable, Substitutions}; -use crate::deserialize::{deserialize_size, deserialize_time}; use crate::{InputDescriptor, OutputDescriptor}; -use zenoh_flow_commons::NodeId; +use zenoh_flow_commons::{NodeId, SharedMemoryConfiguration, SharedMemoryParameters}; use serde::{Deserialize, Serialize}; +use zenoh_flow_records::LinkRecord; /// A `LinkDescriptor` describes a link in Zenoh-Flow: a connection from an Output to an Input. /// @@ -42,12 +42,7 @@ pub struct LinkDescriptor { pub from: OutputDescriptor, pub to: InputDescriptor, #[serde(default)] - #[serde(deserialize_with = "deserialize_size")] - pub shared_memory_element_size: Option, - pub shared_memory_elements: Option, - #[serde(default)] - #[serde(deserialize_with = "deserialize_time")] - pub shared_memory_backoff: Option, + pub shared_memory: SharedMemoryConfiguration, } impl std::fmt::Display for LinkDescriptor { @@ -89,9 +84,18 @@ impl LinkDescriptor { Self { from, to, - shared_memory_element_size: None, - shared_memory_elements: None, - shared_memory_backoff: None, + shared_memory: SharedMemoryConfiguration::default(), + } + } + + pub fn into_record(self, default_shared_memory: &SharedMemoryParameters) -> LinkRecord { + LinkRecord { + from: self.from.into(), + to: self.to.into(), + shared_memory: SharedMemoryParameters::from_configuration( + &self.shared_memory, + default_shared_memory, + ), } } } diff --git a/zenoh-flow-descriptors/src/nodes/mod.rs b/zenoh-flow-descriptors/src/nodes/mod.rs index 8ffd3f97..c6ef35aa 100644 --- a/zenoh-flow-descriptors/src/nodes/mod.rs +++ b/zenoh-flow-descriptors/src/nodes/mod.rs @@ -20,14 +20,14 @@ pub(crate) mod source; pub use source::SourceDescriptor; use crate::{ - deserialize::deserialize_id, flattened::{IFlattenable, IFlattenableComposite, Patch}, uri, LinkDescriptor, }; use anyhow::bail; use serde::{Deserialize, Serialize}; use std::{collections::HashSet, sync::Arc}; -use zenoh_flow_commons::{Configuration, NodeId, Result, RuntimeId, Vars}; +use zenoh_flow_commons::deserialize_id; +use zenoh_flow_commons::{Configuration, NodeId, Result, Vars}; /// A generic Zenoh-Flow node. /// @@ -85,27 +85,23 @@ impl NodeDescriptor { pub(crate) fn flatten( self, overwriting_configuration: Configuration, - runtime: Option, vars: Vars, ) -> Result { let (node_desc, _) = uri::try_load_descriptor::(&self.descriptor, vars)?; - Ok(node_desc.flatten(self.id, overwriting_configuration, runtime)) + Ok(node_desc.flatten(self.id, overwriting_configuration)) } /// TODO@J-Loudet Documentation? pub(crate) fn flatten_maybe_composite( self, overwriting_configuration: Configuration, - runtime: Option, vars: Vars, ancestors: &mut HashSet>, ) -> Result<(Vec, Vec, Patch)> { // 1st attempt: try to flatten as a regular node. - let res_node = self.clone().flatten::( - overwriting_configuration.clone(), - runtime.clone(), - vars.clone(), - ); + let res_node = self + .clone() + .flatten::(overwriting_configuration.clone(), vars.clone()); if let Ok(node) = res_node { return Ok((vec![node], Vec::default(), Patch::default())); @@ -126,7 +122,6 @@ Possible infinite recursion detected, the following descriptor appears to includ let res_composite = composite_desc.clone().flatten_composite( self.id, overwriting_configuration, - runtime, merged_vars, ancestors, ); diff --git a/zenoh-flow-descriptors/src/nodes/operator.rs b/zenoh-flow-descriptors/src/nodes/operator.rs index 251bc1ec..95725ca5 100644 --- a/zenoh-flow-descriptors/src/nodes/operator.rs +++ b/zenoh-flow-descriptors/src/nodes/operator.rs @@ -12,9 +12,11 @@ // ZettaScale Zenoh Team, // +use std::sync::Arc; + use crate::{flattened::IFlattenable, FlattenedOperatorDescriptor}; use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId}; /// Textual representation of a Zenoh-Flow Operator node. /// @@ -58,8 +60,8 @@ use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; /// ``` #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct OperatorDescriptor { - pub name: String, - pub uri: Option, + pub name: Arc, + pub uri: Option>, pub inputs: Vec, pub outputs: Vec, #[serde(default)] @@ -75,13 +77,15 @@ impl std::fmt::Display for OperatorDescriptor { impl IFlattenable for OperatorDescriptor { type Flattened = FlattenedOperatorDescriptor; - fn flatten( - self, - id: NodeId, - overwritting_configuration: Configuration, - runtime: Option, - ) -> Self::Flattened { - FlattenedOperatorDescriptor::new(self, id, overwritting_configuration, runtime) + fn flatten(self, id: NodeId, overwritting_configuration: Configuration) -> Self::Flattened { + FlattenedOperatorDescriptor { + id, + name: self.name, + inputs: self.inputs, + outputs: self.outputs, + uri: self.uri, + configuration: overwritting_configuration.merge_overwrite(self.configuration), + } } } @@ -101,7 +105,6 @@ mod tests { outputs: vec!["operator-1-out".into()], uri: Some("file://operator-1.so".into()), configuration: Configuration::default(), - runtime: None, }; let (operator_descriptor, _) = uri::try_load_descriptor::( @@ -112,7 +115,7 @@ mod tests { assert_eq!( expected_operator, - operator_descriptor.flatten(id, Configuration::default(), None) + operator_descriptor.flatten(id, Configuration::default()) ) } } diff --git a/zenoh-flow-descriptors/src/nodes/sink.rs b/zenoh-flow-descriptors/src/nodes/sink.rs index ce0dc8b6..935b2abc 100644 --- a/zenoh-flow-descriptors/src/nodes/sink.rs +++ b/zenoh-flow-descriptors/src/nodes/sink.rs @@ -12,9 +12,11 @@ // ZettaScale Zenoh Team, // +use std::sync::Arc; + use crate::{flattened::IFlattenable, FlattenedSinkDescriptor}; use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId}; /// Textual representation of a Zenoh-Flow Sink node. /// @@ -51,10 +53,10 @@ use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; /// ``` #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct SinkDescriptor { - pub name: String, + pub name: Arc, #[serde(default)] pub configuration: Configuration, - pub uri: Option, + pub uri: Option>, pub inputs: Vec, } @@ -68,12 +70,13 @@ impl std::fmt::Display for SinkDescriptor { impl IFlattenable for SinkDescriptor { type Flattened = FlattenedSinkDescriptor; - fn flatten( - self, - id: NodeId, - updated_configuration: Configuration, - runtime: Option, - ) -> Self::Flattened { - FlattenedSinkDescriptor::new(self, id, updated_configuration, runtime) + fn flatten(self, id: NodeId, overwritting_configuration: Configuration) -> Self::Flattened { + FlattenedSinkDescriptor { + id, + name: self.name, + uri: self.uri, + inputs: self.inputs, + configuration: overwritting_configuration.merge_overwrite(self.configuration), + } } } diff --git a/zenoh-flow-descriptors/src/nodes/source.rs b/zenoh-flow-descriptors/src/nodes/source.rs index 5218d9a5..2d5a34ea 100644 --- a/zenoh-flow-descriptors/src/nodes/source.rs +++ b/zenoh-flow-descriptors/src/nodes/source.rs @@ -12,9 +12,11 @@ // ZettaScale Zenoh Team, // +use std::sync::Arc; + use crate::{flattened::IFlattenable, FlattenedSourceDescriptor}; use serde::{Deserialize, Serialize}; -use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId}; /// Textual representation of a Zenoh-Flow Source node. /// @@ -51,8 +53,8 @@ use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, Runtime /// ``` #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct SourceDescriptor { - pub name: String, - pub uri: Option, + pub name: Arc, + pub uri: Option>, pub outputs: Vec, #[serde(default)] pub configuration: Configuration, @@ -68,12 +70,7 @@ impl std::fmt::Display for SourceDescriptor { impl IFlattenable for SourceDescriptor { type Flattened = FlattenedSourceDescriptor; - fn flatten( - self, - id: NodeId, - overwriting_configuration: Configuration, - runtime: Option, - ) -> Self::Flattened { + fn flatten(self, id: NodeId, overwriting_configuration: Configuration) -> Self::Flattened { let SourceDescriptor { name, uri, @@ -87,7 +84,6 @@ impl IFlattenable for SourceDescriptor { uri, outputs, configuration: overwriting_configuration.merge_overwrite(configuration), - runtime, } } } diff --git a/zenoh-flow-descriptors/src/tests.rs b/zenoh-flow-descriptors/src/tests.rs index be3c2efe..61b97c1d 100644 --- a/zenoh-flow-descriptors/src/tests.rs +++ b/zenoh-flow-descriptors/src/tests.rs @@ -12,6 +12,8 @@ // ZettaScale Zenoh Team, // +use std::collections::HashMap; + use crate::{ uri::try_load_descriptor, DataFlowDescriptor, FlattenedOperatorDescriptor, FlattenedSinkDescriptor, FlattenedSourceDescriptor, InputDescriptor, LinkDescriptor, @@ -51,7 +53,6 @@ fn test_flatten_descriptor() { outputs: vec!["source-out".into()], uri: Some("file://source.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: Some("runtime-1".into()), }, FlattenedSourceDescriptor { id: "source-2".into(), @@ -59,7 +60,6 @@ fn test_flatten_descriptor() { outputs: vec!["source-out".into()], uri: Some("file://source.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: None, }, FlattenedSourceDescriptor { id: "source-composite".into(), @@ -70,7 +70,6 @@ fn test_flatten_descriptor() { ], uri: Some("file://source-composite.so".into()), configuration: json!({ "foo": "global-outer", "bar": "re-reverse" }).into(), - runtime: Some("runtime-source".into()), }, ]; @@ -95,7 +94,6 @@ fn test_flatten_descriptor() { outputs: vec!["operator-out".into()], uri: Some("file://operator.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: None, }, FlattenedOperatorDescriptor { id: "operator-2".into(), @@ -104,7 +102,6 @@ fn test_flatten_descriptor() { outputs: vec!["operator-out".into()], uri: Some("file://operator.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: None, }, /* * `sub-operator-1` is declared in the file "operator-composite.yml". @@ -124,7 +121,6 @@ fn test_flatten_descriptor() { configuration: json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }) .into(), - runtime: Some("runtime-composite".into()), }, /* * Same spirit but this time it’s a composite operator within a composite operator. The @@ -140,7 +136,6 @@ fn test_flatten_descriptor() { uri: Some("file://sub-sub-operator-1.so".into()), configuration: json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner", "baz": "leaf" }).into(), - runtime: Some("runtime-composite".into()), }, /* * Idem as above: operator-composite/sub-operator-composite/sub-sub-operator-2. @@ -153,7 +148,6 @@ fn test_flatten_descriptor() { uri: Some("file://sub-sub-operator-2.so".into()), configuration: json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner" }).into(), - runtime: Some("runtime-composite".into()), }, /* * Similarly, we check that the name is the composition: operator-composite/sub-operator-2. @@ -166,7 +160,6 @@ fn test_flatten_descriptor() { uri: Some("file://sub-operator-2.so".into()), configuration: json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }).into(), - runtime: Some("runtime-composite".into()), }, ]; @@ -191,7 +184,6 @@ fn test_flatten_descriptor() { inputs: vec!["sink-in".into()], uri: Some("file://sink.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: None, }, FlattenedSinkDescriptor { id: "sink-2".into(), @@ -199,7 +191,6 @@ fn test_flatten_descriptor() { inputs: vec!["sink-in".into()], uri: Some("file://sink.so".into()), configuration: json!({ "foo": "global-outer" }).into(), - runtime: Some("runtime-2".into()), }, FlattenedSinkDescriptor { id: "sink-composite".into(), @@ -207,7 +198,6 @@ fn test_flatten_descriptor() { inputs: vec!["sink-composite-in-1".into(), "sink-composite-in-2".into()], uri: Some("file://sink-composite.so".into()), configuration: json!({ "foo": "global-outer", "bar": "reverse" }).into(), - runtime: Some("runtime-sink".into()), }, ]; @@ -306,6 +296,31 @@ fn test_flatten_descriptor() { ) }); assert_eq!(expected_links.len(), flatten.links.len()); + + let expected_mapping = HashMap::from([ + ("source-1".into(), "runtime-1".into()), + ("sink-2".into(), "runtime-2".into()), + ("source-composite".into(), "runtime-source".into()), + ( + "operator-composite/sub-operator-1".into(), + "runtime-composite".into(), + ), + ( + "operator-composite/sub-operator-composite/sub-sub-operator-1".into(), + "runtime-composite".into(), + ), + ( + "operator-composite/sub-operator-composite/sub-sub-operator-2".into(), + "runtime-composite".into(), + ), + ( + "operator-composite/sub-operator-2".into(), + "runtime-composite".into(), + ), + ("sink-composite".into(), "runtime-sink".into()), + ]); + + assert_eq!(expected_mapping, flatten.mapping); } #[test] diff --git a/zenoh-flow-records/Cargo.toml b/zenoh-flow-records/Cargo.toml new file mode 100644 index 00000000..56afcc85 --- /dev/null +++ b/zenoh-flow-records/Cargo.toml @@ -0,0 +1,34 @@ +# +# Copyright (c) 2021 - 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +[package] +authors = { workspace = true } +categories = { workspace = true } +description = "Internal crate for Zenoh-Flow." +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "zenoh-flow-records" +repository = { workspace = true } +version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +serde = { workspace = true } +uuid = { workspace = true } +zenoh-flow-commons = { workspace = true } + +[dev-dependencies] +serde_json = { workspace = true } +serde_yaml = { workspace = true } diff --git a/zenoh-flow-records/src/connectors.rs b/zenoh-flow-records/src/connectors.rs new file mode 100644 index 00000000..311402e6 --- /dev/null +++ b/zenoh-flow-records/src/connectors.rs @@ -0,0 +1,45 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use serde::{Deserialize, Serialize}; +use std::{fmt::Display, sync::Arc}; +use zenoh_flow_commons::{NodeId, PortId, SharedMemoryParameters}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ZenohSender { + pub id: NodeId, + pub resource: Arc, + pub input: PortId, + pub shared_memory: SharedMemoryParameters, +} + +impl Display for ZenohSender { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ZenohReceiver { + pub id: NodeId, + pub resource: Arc, + pub output: PortId, + pub shared_memory: SharedMemoryParameters, +} + +impl Display for ZenohReceiver { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} diff --git a/zenoh-flow-records/src/dataflow.rs b/zenoh-flow-records/src/dataflow.rs new file mode 100644 index 00000000..dc64a5d9 --- /dev/null +++ b/zenoh-flow-records/src/dataflow.rs @@ -0,0 +1,87 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + connectors::{ZenohReceiver, ZenohSender}, + LinkRecord, OperatorRecord, SinkRecord, SourceRecord, +}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc}; +use uuid::Uuid; +use zenoh_flow_commons::{NodeId, RuntimeId}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct DataFlowRecord { + pub uuid: Uuid, + pub flow: Arc, + pub sources: HashMap, + pub operators: HashMap, + pub sinks: HashMap, + pub receivers: HashMap, + pub senders: HashMap, + pub links: Vec, + pub mapping: HashMap, +} + +// impl DataFlowRecord { +// pub fn from_flattened(data_flow: FlattenedDataFlowDescriptor, runtime: RuntimeContext) -> Self { +// let uuid = Uuid::new_v4(); + +// let sources = data_flow +// .sources +// .into_iter() +// .map(|source| { +// ( +// source.id.clone(), +// SourceRecord::from_flattened(source, runtime.id.clone()), +// ) +// }) +// .collect::>(); + +// let operators = data_flow +// .operators +// .into_iter() +// .map(|operator| { +// ( +// operator.id.clone(), +// OperatorRecord::from_flattened(operator, runtime.id.clone()), +// ) +// }) +// .collect::>(); + +// let sinks = data_flow +// .sinks +// .into_iter() +// .map(|sink| { +// ( +// sink.id.clone(), +// SinkRecord::from_flattened(sink, runtime.id.clone()), +// ) +// }) +// .collect::>(); + +// let (links, receivers, senders) = connect_runtimes(data_flow.links, runtime); + +// Self { +// uuid, +// flow: data_flow.flow, +// sources, +// operators, +// sinks, +// receivers, +// senders, +// links, +// } +// } +// } diff --git a/zenoh-flow-records/src/io.rs b/zenoh-flow-records/src/io.rs new file mode 100644 index 00000000..526001b2 --- /dev/null +++ b/zenoh-flow-records/src/io.rs @@ -0,0 +1,30 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{NodeId, PortId}; + +/// TODO@J-Loudet +#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] +pub struct InputRecord { + pub node: NodeId, + pub input: PortId, +} + +/// TODO@J-Loudet +#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)] +pub struct OutputRecord { + pub node: NodeId, + pub output: PortId, +} diff --git a/zenoh-flow-records/src/lib.rs b/zenoh-flow-records/src/lib.rs new file mode 100644 index 00000000..94d99ca4 --- /dev/null +++ b/zenoh-flow-records/src/lib.rs @@ -0,0 +1,28 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod dataflow; +pub use dataflow::DataFlowRecord; + +mod connectors; +pub use connectors::{ZenohReceiver, ZenohSender}; + +mod io; +pub use io::{InputRecord, OutputRecord}; + +mod links; +pub use links::LinkRecord; + +mod nodes; +pub use nodes::{OperatorRecord, SinkRecord, SourceRecord}; diff --git a/zenoh-flow-records/src/links.rs b/zenoh-flow-records/src/links.rs new file mode 100644 index 00000000..50eb0dbd --- /dev/null +++ b/zenoh-flow-records/src/links.rs @@ -0,0 +1,25 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::io::{InputRecord, OutputRecord}; +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::SharedMemoryParameters; + +/// TODO@J-Loudet +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct LinkRecord { + pub from: OutputRecord, + pub to: InputRecord, + pub shared_memory: SharedMemoryParameters, +} diff --git a/zenoh-flow-records/src/nodes.rs b/zenoh-flow-records/src/nodes.rs new file mode 100644 index 00000000..192ab53e --- /dev/null +++ b/zenoh-flow-records/src/nodes.rs @@ -0,0 +1,48 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, NodeId, PortId}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct SourceRecord { + pub id: NodeId, + pub name: Arc, + pub outputs: Vec, + pub uri: Option>, + pub configuration: Configuration, +} + +/// TODO@J-Loudet +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct OperatorRecord { + pub id: NodeId, + pub name: Arc, + pub inputs: Vec, + pub outputs: Vec, + pub uri: Option>, + pub configuration: Configuration, +} + +/// TODO@J-Loudet +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct SinkRecord { + pub id: NodeId, + pub name: Arc, + pub inputs: Vec, + pub uri: Option>, + pub configuration: Configuration, +}