From 191835f94c58aecfabc7716800f61c99e9857ab4 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Mon, 23 Oct 2023 15:38:10 +0200 Subject: [PATCH] refacto(BREAKING): move nodes traits/declaration in their own crate **BREAKING**: the macros defined in the crate `zenoh-flow-derive` were updated to take into account this refactoring. Hence, current code that uses `zenoh-flow` as a dependency will have their macro `export_xxx` fail at compilation time. The docstring withing the `zenoh-flow` crate have been ignored as a result. ----- All the logic that is related to creating nodes will sit in this crate. This separate crate will allow users to not have to compile Zenoh-Flow (and even Zenoh) when compiling their nodes. This should lead to greatly decreased compilation times. Several aspects are still work-in-progress: - the `Context` structure probably needs to be refined, - documentation is still sparse. Signed-off-by: Julien Loudet --- Cargo.toml | 2 + zenoh-flow-derive/Cargo.toml | 11 - zenoh-flow-derive/src/lib.rs | 62 +-- zenoh-flow-nodes/Cargo.toml | 47 ++ zenoh-flow-nodes/build.rs | 18 + zenoh-flow-nodes/src/declaration.rs | 61 +++ zenoh-flow-nodes/src/io/inputs.rs | 359 +++++++++++++ zenoh-flow-nodes/src/io/mod.rs | 19 + zenoh-flow-nodes/src/io/outputs.rs | 498 +++++++++++++++++ zenoh-flow-nodes/src/io/tests/input-tests.rs | 145 +++++ zenoh-flow-nodes/src/io/tests/output-tests.rs | 150 ++++++ .../src/io/tests/test_types.proto | 23 + zenoh-flow-nodes/src/lib.rs | 40 ++ zenoh-flow-nodes/src/macros.rs | 216 ++++++++ zenoh-flow-nodes/src/messages.rs | 501 ++++++++++++++++++ zenoh-flow-nodes/src/traits.rs | 276 ++++++++++ zenoh-flow/src/traits.rs | 8 +- 17 files changed, 2390 insertions(+), 46 deletions(-) create mode 100644 zenoh-flow-nodes/Cargo.toml create mode 100644 zenoh-flow-nodes/build.rs create mode 100644 zenoh-flow-nodes/src/declaration.rs create mode 100644 zenoh-flow-nodes/src/io/inputs.rs create mode 100644 zenoh-flow-nodes/src/io/mod.rs create mode 100644 zenoh-flow-nodes/src/io/outputs.rs create mode 100644 zenoh-flow-nodes/src/io/tests/input-tests.rs create mode 100644 zenoh-flow-nodes/src/io/tests/output-tests.rs create mode 100644 zenoh-flow-nodes/src/io/tests/test_types.proto create mode 100644 zenoh-flow-nodes/src/lib.rs create mode 100644 zenoh-flow-nodes/src/macros.rs create mode 100644 zenoh-flow-nodes/src/messages.rs create mode 100644 zenoh-flow-nodes/src/traits.rs diff --git a/Cargo.toml b/Cargo.toml index 2ed93eeb..2c7effad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "zenoh-flow-daemon", "zenoh-flow-derive", "zenoh-flow-descriptors", + "zenoh-flow-nodes", "zenoh-flow-plugin", "zenoh-flow-records", "zfctl", @@ -51,6 +52,7 @@ serde_cbor = "0.11" serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.9" +tracing = { version = "0.1", features = ["log"] } uhlc = "0.6" uuid = { version = "1.1", features = ["serde", "v4"] } zenoh = { version = "0.7.2-rc" } diff --git a/zenoh-flow-derive/Cargo.toml b/zenoh-flow-derive/Cargo.toml index 0230ecb2..85281a66 100644 --- a/zenoh-flow-derive/Cargo.toml +++ b/zenoh-flow-derive/Cargo.toml @@ -27,20 +27,9 @@ version.workspace = true # To build with debug on macros: RUSTFLAGS="-Z macro-backtrace" [dependencies] -Inflector = "0.11.4" -async-std = { workspace = true, features = ["attributes"] } -darling = "0.20" -futures = { workspace = true } -proc-macro-error = "1.0.4" proc-macro2 = "1.0" quote = "1.0" -serde = { workspace = true, features = ["derive"] } -serde_derive = { workspace = true } syn = { version = "2", features = ["full"] } -syn-serde = { version = "0.3", features = ["json"] } - -[dev-dependencies] -env_logger = { workspace = true } [lib] proc-macro = true diff --git a/zenoh-flow-derive/src/lib.rs b/zenoh-flow-derive/src/lib.rs index 18eed9ef..37e1066b 100644 --- a/zenoh-flow-derive/src/lib.rs +++ b/zenoh-flow-derive/src/lib.rs @@ -59,19 +59,19 @@ pub fn export_source(_: TokenStream, input: TokenStream) -> TokenStream { #[doc(hidden)] #[no_mangle] - pub static _zf_export_source: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< - zenoh_flow::runtime::dataflow::node::SourceFn, - > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< - zenoh_flow::runtime::dataflow::node::SourceFn, + pub static _zf_export_source: zenoh_flow_nodes::NodeDeclaration< + zenoh_flow_nodes::SourceFn, + > = zenoh_flow_nodes::NodeDeclaration::< + zenoh_flow_nodes::SourceFn, > { - rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, - core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, - constructor: |context: zenoh_flow::types::Context, - configuration: Option, - outputs: zenoh_flow::io::Outputs| { + rustc_version: zenoh_flow_nodes::RUSTC_VERSION, + core_version: zenoh_flow_nodes::CORE_VERSION, + constructor: |context: zenoh_flow_nodes::prelude::Context, + configuration: Option, + outputs: zenoh_flow_nodes::prelude::Outputs| { std::boxed::Box::pin(async { let node = <#ident>::new(context, configuration, outputs).await?; - Ok(std::sync::Arc::new(node) as std::sync::Arc) + Ok(std::sync::Arc::new(node) as std::sync::Arc) }) }, }; @@ -122,19 +122,19 @@ pub fn export_sink(_: TokenStream, input: TokenStream) -> TokenStream { #[doc(hidden)] #[no_mangle] - pub static _zf_export_sink: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< - zenoh_flow::runtime::dataflow::node::SinkFn, - > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< - zenoh_flow::runtime::dataflow::node::SinkFn, + pub static _zf_export_sink: zenoh_flow_nodes::NodeDeclaration< + zenoh_flow_nodes::SinkFn, + > = zenoh_flow_nodes::NodeDeclaration::< + zenoh_flow_nodes::SinkFn, > { - rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, - core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, - constructor: |context: zenoh_flow::types::Context, - configuration: Option, - mut inputs: zenoh_flow::io::Inputs| { + rustc_version: zenoh_flow_nodes::RUSTC_VERSION, + core_version: zenoh_flow_nodes::CORE_VERSION, + constructor: |context: zenoh_flow_nodes::prelude::Context, + configuration: Option, + mut inputs: zenoh_flow_nodes::prelude::Inputs| { std::boxed::Box::pin(async { let node = <#ident>::new(context, configuration, inputs).await?; - Ok(std::sync::Arc::new(node) as std::sync::Arc) + Ok(std::sync::Arc::new(node) as std::sync::Arc) }) }, }; @@ -194,20 +194,20 @@ pub fn export_operator(_: TokenStream, input: TokenStream) -> TokenStream { #[doc(hidden)] #[no_mangle] - pub static _zf_export_operator: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< - zenoh_flow::runtime::dataflow::node::OperatorFn, - > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< - zenoh_flow::runtime::dataflow::node::OperatorFn, + pub static _zf_export_operator: zenoh_flow_nodes::NodeDeclaration< + zenoh_flow_nodes::OperatorFn, + > = zenoh_flow_nodes::NodeDeclaration::< + zenoh_flow_nodes::OperatorFn, > { - rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, - core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, - constructor: |context: zenoh_flow::types::Context, - configuration: Option, - mut inputs: zenoh_flow::io::Inputs, - mut outputs: zenoh_flow::io::Outputs| { + rustc_version: zenoh_flow_nodes::RUSTC_VERSION, + core_version: zenoh_flow_nodes::CORE_VERSION, + constructor: |context: zenoh_flow_nodes::prelude::Context, + configuration: Option, + mut inputs: zenoh_flow_nodes::prelude::Inputs, + mut outputs: zenoh_flow_nodes::prelude::Outputs| { std::boxed::Box::pin(async { let node = <#ident>::new(context, configuration, inputs, outputs).await?; - Ok(std::sync::Arc::new(node) as std::sync::Arc) + Ok(std::sync::Arc::new(node) as std::sync::Arc) }) }, }; diff --git a/zenoh-flow-nodes/Cargo.toml b/zenoh-flow-nodes/Cargo.toml new file mode 100644 index 00000000..ef48636b --- /dev/null +++ b/zenoh-flow-nodes/Cargo.toml @@ -0,0 +1,47 @@ +# +# Copyright (c) 2021 - 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +[package] +authors = { workspace = true } +categories = { workspace = true } +description = "Internal crate for Zenoh-Flow." +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "zenoh-flow-nodes" +repository = { workspace = true } +version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +bincode = { version = "1.3" } +flume = { workspace = true } +futures = { workspace = true } +proc-macro-error = "1.0.4" +proc-macro2 = "1.0" +quote = "1.0" +serde = { workspace = true } +tracing = { workspace = true } +uhlc = { workspace = true } +uuid = { workspace = true } +zenoh-flow-commons = { workspace = true } +zenoh-flow-derive = { path = "../zenoh-flow-derive" } + +[dev-dependencies] +prost = "0.12" +serde_json = { workspace = true } + +[build-dependencies] +rustc_version = "0.4.0" diff --git a/zenoh-flow-nodes/build.rs b/zenoh-flow-nodes/build.rs new file mode 100644 index 00000000..4527ffdf --- /dev/null +++ b/zenoh-flow-nodes/build.rs @@ -0,0 +1,18 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +fn main() { + let version = rustc_version::version().unwrap(); + println!("cargo:rustc-env=RUSTC_VERSION={version}"); +} diff --git a/zenoh-flow-nodes/src/declaration.rs b/zenoh-flow-nodes/src/declaration.rs new file mode 100644 index 00000000..9d253d38 --- /dev/null +++ b/zenoh-flow-nodes/src/declaration.rs @@ -0,0 +1,61 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{pin::Pin, sync::Arc}; + +use futures::Future; +use zenoh_flow_commons::{Configuration, Result}; + +use crate::{ + prelude::{Inputs, Node, Outputs}, + Context, +}; + +/// Constant used to check if a node is compatible with the currently running Zenoh Flow daemon. +/// As nodes are dynamically loaded, this is to prevent (possibly cryptic) runtime error due to +/// incompatible API. +pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION"); +/// Constant used to check if a node was compiled with the same version of the Rust compiler than +/// the currently running Zenoh Flow daemon. +/// As Rust is not ABI stable, this is to prevent (possibly cryptic) runtime errors. +pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION"); + +/// Declaration expected in the library that will be loaded. +pub struct NodeDeclaration { + pub rustc_version: &'static str, + pub core_version: &'static str, + pub constructor: C, +} + +/// `SourceFn` is the only signature we accept to construct a [`Source`](`crate::prelude::Source`). +pub type SourceFn = fn( + Context, + Option, + Outputs, +) -> Pin>> + Send>>; + +/// `OperatorFn` is the only signature we accept to construct an [`Operator`](`crate::prelude::Operator`). +pub type OperatorFn = fn( + Context, + Option, + Inputs, + Outputs, +) -> Pin>> + Send>>; + +/// `SinkFn` is the only signature we accept to construct a [`Sink`](`crate::prelude::Sink`). +pub type SinkFn = fn( + Context, + Option, + Inputs, +) -> Pin>> + Send>>; diff --git a/zenoh-flow-nodes/src/io/inputs.rs b/zenoh-flow-nodes/src/io/inputs.rs new file mode 100644 index 00000000..5a1c4911 --- /dev/null +++ b/zenoh-flow-nodes/src/io/inputs.rs @@ -0,0 +1,359 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::messages::{Data, DataMessage, DeserializerFn, LinkMessage, Message}; +use anyhow::bail; +use flume::TryRecvError; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::Arc; +use uhlc::Timestamp; +use zenoh_flow_commons::{PortId, Result}; + +/// The `Inputs` structure contains all the inputs created for a [Sink](crate::prelude::Sink) or an +/// [Operator](crate::prelude::Operator). +/// +/// Each input is indexed by its **port identifier**: the name that was indicated in the descriptor +/// of the node. These names are _case sensitive_ and should be an exact match to what was written +/// in the descriptor. +/// +/// Zenoh-Flow provides two flavors of input: [InputRaw] and [`Input`]. An [`Input`] +/// conveniently exposes instances of `T` while an [InputRaw] exposes messages, allowing to +/// disregard the contained data. +/// +/// The main way to interact with `Inputs` is through the `take` method. +/// +/// # Example +/// +/// ```ignore +/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found"); +/// let input_raw = input_builder.raw(); +/// +/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found"); +/// let input: Input = input_build.typed( +/// |bytes| serde_json::from_slice(bytes) +/// .map_err(|e| anyhow::anyhow!(e)) +/// )?; +/// ``` +#[derive(Default)] +pub struct Inputs { + pub(crate) hmap: HashMap>>, +} + +// Dereferencing on the internal `Hashmap` allows users to call all the methods implemented on it: +// `keys()` for one. +impl Deref for Inputs { + type Target = HashMap>>; + + fn deref(&self) -> &Self::Target { + &self.hmap + } +} + +impl Inputs { + /// Insert the `flume::Receiver` in the [Inputs], creating the entry if needed in the internal + /// `HashMap`. + pub fn insert(&mut self, port_id: PortId, rx: flume::Receiver) { + self.hmap + .entry(port_id) + .or_insert_with(Vec::default) + .push(rx) + } + + /// Returns an [InputBuilder] for the provided `port_id`, if an input was declared with this + /// exact name in the descriptor of the node, otherwise returns `None`. + /// + /// # Usage + /// + /// This builder can either produce a, typed, [`Input`] or an [InputRaw]. The main difference + /// between both is the type of data they expose: an [`Input`] automatically tries to downcast + /// or deserialize the data contained in the message to expose `&T`, while an [InputRaw] simply + /// exposes a [LinkMessage]. + /// + /// As long as data need to be manipulated, a typed [`Input`] should be favored. + /// + /// ## Typed + /// + /// To obtain an [`Input`] one must call `typed` and provide a deserializer function. In + /// the example below we rely on the `serde_json` crate to do the deserialization. + /// + /// ```ignore + /// let input_typed: Input = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .typed( + /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// ); + /// ``` + /// + /// ## Raw + /// + /// To obtain an [InputRaw] one must call `raw`. + /// + /// ```ignore + /// let input_raw: InputRaw = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .raw(); + /// ``` + pub fn take(&mut self, port_id: impl AsRef) -> Option { + self.hmap + .remove(&port_id.as_ref().into()) + .map(|receivers| InputBuilder { + port_id: port_id.as_ref().into(), + receivers, + }) + } +} + +/// An `InputBuilder` is the intermediate structure to obtain either an [`Input`] or an +/// [InputRaw]. +/// +/// The main difference between both is the type of data they expose: an [`Input`] automatically +/// tries to downcast or deserialize the data contained in the message to expose `&T`, while an +/// [InputRaw] simply exposes a [LinkMessage]. +/// +/// # Planned evolution +/// +/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the +/// `receivers` channels are _unbounded_ and do not implement a dropping policy, which could lead to +/// issues. +pub struct InputBuilder { + pub(crate) port_id: PortId, + pub(crate) receivers: Vec>, +} + +impl InputBuilder { + /// Consume the `InputBuilder` to produce an [InputRaw]. + /// + /// An [InputRaw] exposes the [LinkMessage] it receives, without trying to perform any + /// conversion on the data. + /// + /// The [InputRaw] was designed for use cases such as load-balancing or rate-limiting. In these + /// scenarios, the node does not need to access the underlying data. + /// + /// # `InputRaw` vs `Input` + /// + /// If the node needs access to the data to perform computations, an [`Input`] should be + /// favored as it performs the conversion automatically. + /// + /// # Example + /// + /// ```ignore + /// let input_raw: InputRaw = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .raw(); + /// ``` + pub fn raw(self) -> InputRaw { + InputRaw { + port_id: self.port_id, + receivers: self.receivers, + } + } + + /// Consume the `InputBuilder` to produce an [`Input`]. + /// + /// An [`Input`] tries to automatically convert the data contained in the [LinkMessage] in + /// order to expose `&T`. Depending on if the data is received serialized or not, to perform + /// this conversion either the `deserializer` is called or a downcast is attempted. + /// + /// # `Input` vs `InputRaw` + /// + /// If the node does need to access the data contained in the [LinkMessage], an [InputRaw] + /// should be favored as it does not try to perform the extra conversion steps. + /// + /// # Example + /// + /// ```ignore + /// let input_typed: Input = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .typed( + /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// ); + /// ``` + pub fn typed( + self, + deserializer: impl Fn(&[u8]) -> anyhow::Result + Send + Sync + 'static, + ) -> Input { + Input { + input_raw: self.raw(), + deserializer: Arc::new(deserializer), + } + } +} + +/// An [`InputRaw`](`InputRaw`) exposes the [`LinkMessage`](`LinkMessage`) it receives. +/// +/// It's primary purpose is to ensure "optimal" performance. This can be useful to implement +/// behaviour where actual access to the underlying data is irrelevant. +#[derive(Clone, Debug)] +pub struct InputRaw { + pub(crate) port_id: PortId, + pub(crate) receivers: Vec>, +} + +impl InputRaw { + pub fn port_id(&self) -> &PortId { + &self.port_id + } + + /// Returns the number of channels associated with this Input. + pub fn channels_count(&self) -> usize { + self.receivers.len() + } + + /// Returns the first [LinkMessage] that was received on any of the channels associated with + /// this Input, or an `Empty` error if there were no messages. + /// + /// # Asynchronous alternative: `recv` + /// + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. + /// Although synchronous, but given it is "fail-fast", this method will not block the thread on + /// which it is executed. + /// + /// # Error + /// + /// If no message was received, an `Empty` error is returned. Note that if some channels are + /// disconnected, for each of such channel an error is logged. + pub fn try_recv(&self) -> Result { + for receiver in &self.receivers { + match receiver.try_recv() { + Ok(message) => return Ok(message), + Err(e) => { + if matches!(e, TryRecvError::Disconnected) { + tracing::error!("[Input: {}] Channel disconnected", self.port_id); + } + } + } + } + + // We went through all channels, no message, Empty error. + bail!("[Input: {}] No message", self.port_id) + } + + /// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels + /// associated with this Input. + /// + /// If several [LinkMessage] are received at the same time, one is *randomly* selected. + /// + /// # Error + /// + /// An error is returned if *all* channels are disconnected. For each disconnected channel, an + /// error is separately logged. + pub async fn recv(&self) -> Result { + let mut recv_futures = self + .receivers + .iter() + .map(|link| link.recv_async()) + .collect::>(); + + loop { + let (res, _, remaining) = futures::future::select_all(recv_futures).await; + match res { + Ok(message) => return Ok(message), + Err(_disconnected) => { + tracing::error!("[Input: {}] Channel disconnected", self.port_id); + if remaining.is_empty() { + bail!("[Input: {}] All channels are disconnected", self.port_id); + } + + recv_futures = remaining; + } + } + } + } +} + +/// A typed `Input` that tries to automatically downcast or deserialize the data received in order +/// to expose `&T`. +/// +/// # Performance +/// +/// If the data is received serialized from the upstream node, an allocation is performed to host +/// the deserialized `T`. +pub struct Input { + pub(crate) input_raw: InputRaw, + pub(crate) deserializer: Arc>, +} + +// Dereferencing to the [InputRaw] allows to directly call methods on it with a typed [Input]. +impl Deref for Input { + type Target = InputRaw; + + fn deref(&self) -> &Self::Target { + &self.input_raw + } +} + +impl Input { + /// Returns the first [`Message`] that was received, *asynchronously*, on any of the channels + /// associated with this Input. + /// + /// If several [`Message`] are received at the same time, one is *randomly* selected. + /// + /// This method interprets the data to the type associated with this [`Input`]. + /// + /// # Performance + /// + /// As this method interprets the data received additional operations are performed: + /// - data received serialized is deserialized (an allocation is performed to store an instance + /// of `T`), + /// - data received "typed" are checked against the type associated to this [`Input`]. + /// + /// # Error + /// + /// Several errors can occur: + /// - all the channels are disconnected, + /// - Zenoh-Flow failed at interpreting the received data as an instance of `T`. + pub async fn recv(&self) -> Result<(Message, Timestamp)> { + match self.input_raw.recv().await? { + LinkMessage::Data(DataMessage { data, timestamp }) => Ok(( + Message::Data(Data::try_from_payload(data, self.deserializer.clone())?), + timestamp, + )), + LinkMessage::Watermark(timestamp) => Ok((Message::Watermark, timestamp)), + } + } + + /// Returns the first [`Message`] that was received on any of the channels associated with this + /// Input, or `None` if all the channels are empty. + /// + /// # Asynchronous alternative: `recv` + /// + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. + /// Although synchronous, this method will not block the thread on which it is executed. + /// + /// # Error + /// + /// Several errors can occur: + /// - no message was received (i.e. Empty error), + /// - Zenoh-Flow failed at interpreting the received data as an instance of `T`. + /// + /// Note that if some channels are disconnected, for each of such channel an error is logged. + pub fn try_recv(&self) -> Result<(Message, Timestamp)> { + match self.input_raw.try_recv()? { + LinkMessage::Data(DataMessage { data, timestamp }) => Ok(( + Message::Data(Data::try_from_payload(data, self.deserializer.clone())?), + timestamp, + )), + LinkMessage::Watermark(ts) => Ok((Message::Watermark, ts)), + } + } +} + +#[cfg(test)] +#[path = "./tests/input-tests.rs"] +mod tests; diff --git a/zenoh-flow-nodes/src/io/mod.rs b/zenoh-flow-nodes/src/io/mod.rs new file mode 100644 index 00000000..37c12aa9 --- /dev/null +++ b/zenoh-flow-nodes/src/io/mod.rs @@ -0,0 +1,19 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod inputs; +pub use inputs::{Input, Inputs}; + +mod outputs; +pub use outputs::{Output, Outputs}; diff --git a/zenoh-flow-nodes/src/io/outputs.rs b/zenoh-flow-nodes/src/io/outputs.rs new file mode 100644 index 00000000..c46df652 --- /dev/null +++ b/zenoh-flow-nodes/src/io/outputs.rs @@ -0,0 +1,498 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::messages::{Data, LinkMessage, Payload, SerializerFn}; +use anyhow::bail; +use flume::Sender; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use uhlc::{Timestamp, HLC}; +use zenoh_flow_commons::{PortId, Result}; + +/// The [Outputs] structure contains all the outputs created for a [Source](crate::prelude::Source) +/// or an [Operator](crate::prelude::Operator). +/// +/// Each output is indexed by its **port identifier**: the name that was indicated in the descriptor +/// of the node. These names are _case sensitive_ and should be an exact match to what was written +/// in the descriptor. +/// +/// Zenoh-Flow provides two flavors of output: [OutputRaw] and [`Output`]. An [`Output`] +/// conveniently accepts instances of `T` while an [OutputRaw] operates at the message level, +/// potentially disregarding the data it contains. +pub struct Outputs { + pub(crate) hmap: HashMap>>, + pub(crate) hlc: Arc, +} + +// Dereferencing on the internal [`HashMap`](`Hashmap`) allows users to call all the methods +// implemented on it: `keys()` for one. +impl Deref for Outputs { + type Target = HashMap>>; + + fn deref(&self) -> &Self::Target { + &self.hmap + } +} + +impl Outputs { + pub fn new(hlc: Arc) -> Self { + Self { + hmap: HashMap::default(), + hlc, + } + } + + /// Insert the `flume::Sender` in the [Outputs], creating the entry if needed in the internal + /// `HashMap`. + pub fn insert(&mut self, port_id: PortId, tx: Sender) { + self.hmap.entry(port_id).or_insert_with(Vec::new).push(tx) + } + + /// Returns an [OutputBuilder] for the provided `port_id`, if an output was declared with this + /// exact name in the descriptor of the node, otherwise returns `None`. + /// + /// # Usage + /// + /// This builder can either produce a, typed, [`Output`] or an [OutputRaw]. The main difference + /// between both is the type of data they accept: an [`Output`] accepts anything that is + /// `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is + /// `Into<`[Payload]`>`. + /// + /// As long as data are produced or manipulated, a typed [`Output`] should be favored. + /// + /// ## Typed + /// + /// To obtain an [`Output`] one must call `typed` and provide a serializer function. In + /// the example below we rely on the `serde_json` crate to do the serialization. + /// + /// ```ignore + /// let output_typed: Output = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// ``` + /// + /// ## Raw + /// + /// To obtain an [OutputRaw] one must call `raw`. + /// + /// ```ignore + /// let output_raw = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .raw(); + /// ``` + pub fn take(&mut self, port_id: impl AsRef) -> Option { + self.hmap + .remove(&port_id.as_ref().into()) + .map(|senders| OutputBuilder { + port_id: port_id.as_ref().into(), + senders, + hlc: Arc::clone(&self.hlc), + last_watermark: Arc::new(AtomicU64::new( + self.hlc.new_timestamp().get_time().as_u64(), + )), + }) + } +} + +/// An [OutputBuilder] is the intermediate structure to obtain either an [`Output`] or an +/// [OutputRaw]. +/// +/// The main difference between both is the type of data they accept: an [`Output`] accepts +/// anything that is `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is +/// `Into<`[Payload]`>`. +/// +/// # Planned evolution +/// +/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the `senders` +/// channels are _unbounded_ and do not implement a dropping policy, which could lead to issues. +pub struct OutputBuilder { + pub(crate) port_id: PortId, + pub(crate) senders: Vec>, + pub(crate) hlc: Arc, + pub(crate) last_watermark: Arc, +} + +impl OutputBuilder { + /// Consume this `OutputBuilder` to produce an [OutputRaw]. + /// + /// An [OutputRaw] sends [LinkMessage]s (through `forward`) or anything that is + /// `Into<`[Payload]`>` (through `send` and `try_send`) to downstream nodes. + /// + /// The [OutputRaw] was designed for use cases such as load-balancing or rate-limiting. In this + /// scenarios, the node does not need to access the underlying data and the message can simply + /// be forwarded downstream. + /// + /// # `OutputRaw` vs `Output` + /// + /// If the node produces instances of `T` as a result of computations, an [`Output`] should be + /// favored as it sends anything that is `Into`. Thus, contrary to an [OutputRaw], there is + /// no need to encapsulate `T` inside a [Payload]. + /// + /// # Example + /// + /// ```ignore + /// let output_raw = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .raw(); + /// ``` + pub fn raw(self) -> OutputRaw { + OutputRaw { + port_id: self.port_id, + senders: self.senders, + hlc: self.hlc, + last_watermark: self.last_watermark, + } + } + + /// Consume this `OutputBuilder` to produce an [`Output`]. + /// + /// An [`Output`] sends anything that is `Into` (through `send` and `try_send`) to + /// downstream nodes. + /// + /// An [`Output`] requires knowing how to serialize `T`. Data is only serialized when it is (a) + /// transmitted to a node located on another process or (b) transmitted to a node written in a + /// programming language other than Rust. + /// + /// The serialization will automatically be performed by Zenoh-Flow and only when needed. + /// + /// # `Output` vs `OutputRaw` + /// + /// If the node does not process any data and only has access to a [LinkMessage], an [OutputRaw] + /// would be better suited as it does not require to downcast it into an object that + /// implements `Into`. + /// + /// # Example + /// + /// ```ignore + /// let output_typed: Output = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// ``` + pub fn typed( + self, + serializer: impl Fn(&mut Vec, &T) -> anyhow::Result<()> + Send + Sync + 'static, + ) -> Output { + Output { + _phantom: PhantomData, + output_raw: self.raw(), + serializer: Arc::new(move |buffer, data| { + if let Some(typed) = (*data).as_any().downcast_ref::() { + match (serializer)(buffer, typed) { + Ok(serialized_data) => Ok(serialized_data), + Err(e) => bail!(e), + } + } else { + bail!("Failed to downcast provided value") + } + }), + } + } +} + +/// An [OutputRaw] sends [LinkMessage] or `Into<`[Payload]`>` to downstream Nodes. +/// +/// Its primary purpose is to ensure optimal performance: any message received on an input can +/// transparently be sent downstream, without requiring (a potentially expensive) access to the data +/// it contained. +#[derive(Clone)] +pub struct OutputRaw { + pub(crate) port_id: PortId, + pub(crate) senders: Vec>, + pub(crate) hlc: Arc, + pub(crate) last_watermark: Arc, +} + +impl OutputRaw { + /// Returns the port id associated with this Output. + pub fn port_id(&self) -> &PortId { + &self.port_id + } + + /// Returns the number of channels associated with this Output. + pub fn channels_count(&self) -> usize { + self.senders.len() + } + + /// If a timestamp is provided, check that it is not inferior to the latest watermark. + /// + /// If no timestamp is provided, a new one is generated from the [HLC](uhlc::HLC). + pub(crate) fn check_timestamp(&self, timestamp: Option) -> Result { + let ts = match timestamp { + Some(ts_u64) => Timestamp::new(uhlc::NTP64(ts_u64), *self.hlc.get_id()), + None => self.hlc.new_timestamp(), + }; + + if ts.get_time().0 < self.last_watermark.load(Ordering::Relaxed) { + tracing::error!( + r#" +Provided timestamp is older (lower) than the latest watermark: +- watermark = {} +- timestamp = {} +"#, + self.last_watermark.load(Ordering::Relaxed), + ts.get_time().0 + ); + bail!("") + } + + Ok(ts) + } + + /// Attempt to forward, *synchronously*, the message to the downstream Nodes. + /// + /// # Asynchronous alternative: `forward` + /// + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: + /// `forward`. Hence, although synchronous, this method will not block the thread on which it is + /// executed. + /// + /// # Errors + /// + /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it + /// on the remaining channels. For each failing channel, an error is logged. + pub(crate) fn try_forward(&self, message: LinkMessage) -> Result<()> { + let mut err_count = 0; + self.senders.iter().for_each(|sender| { + if let Err(e) = sender.try_send(message.clone()) { + err_count += 1; + match e { + flume::TrySendError::Full(_) => { + tracing::error!("[Output: {}] Channel is full", self.port_id) + } + flume::TrySendError::Disconnected(_) => { + tracing::error!("[Output: {}] Channel disconnected", self.port_id) + } + } + } + }); + + if err_count > 0 { + bail!( + "[Output: {}] Encountered {} errors while sending (async) data", + self.port_id, + err_count + ) + } + + Ok(()) + } + + /// Attempt to send, *synchronously*, the `data` on all channels to the downstream Nodes. + /// + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. + /// + /// # Asynchronous alternative: `send` + /// + /// This method is a synchronous fail-fast alternative to its asynchronous counterpart: `send`. + /// Hence, although synchronous, this method will not block the thread on which it is executed. + /// + /// # Errors + /// + /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send + /// it on the remaining channels. For each failing channel, an error is logged and counted for. + pub fn try_send(&self, data: impl Into, timestamp: Option) -> Result<()> { + let ts = self.check_timestamp(timestamp)?; + let message = LinkMessage::from_payload(data.into(), ts); + + self.try_forward(message) + } + + /// Attempt to send, *synchronously*, the watermark on all channels to the downstream Nodes. + /// + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. + /// + /// # Asynchronous alternative: `send_watermark` + /// + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `send`. + /// Although synchronous, this method will not block the thread on which it is executed. + /// + /// # Errors + /// + /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send + /// it on the remaining channels. For each failing channel, an error is logged and counted for. + pub fn try_send_watermark(&self, timestamp: Option) -> Result<()> { + let ts = self.check_timestamp(timestamp)?; + self.last_watermark + .store(ts.get_time().0, Ordering::Relaxed); + let message = LinkMessage::Watermark(ts); + self.try_forward(message) + } + + /// Forward, *asynchronously*, the [LinkMessage] on all channels to the downstream Nodes. + /// + /// # Errors + /// + /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send + /// it on the remaining channels. For each failing channel, an error is logged and counted for. + pub async fn forward(&self, message: LinkMessage) -> Result<()> { + // FIXME Feels like a cheap hack counting the number of errors. To improve. + let mut err = 0; + let fut_senders = self + .senders + .iter() + .map(|sender| sender.send_async(message.clone())); + // [`join_all`](`futures::future::join_all`) executes all futures in parallel. + let res = futures::future::join_all(fut_senders).await; + + res.iter().for_each(|res| { + if let Err(e) = res { + tracing::error!( + "[Output: {}] Error occured while sending to downstream node(s): {:?}", + self.port_id(), + e + ); + err += 1; + } + }); + + if err > 0 { + bail!( + "[Output: {}] Encountered {} errors while sending (async) data", + self.port_id, + err + ) + } + + Ok(()) + } + + /// Send, *asynchronously*, the `data` on all channels to the downstream Nodes. + /// + /// If no `timestamp` is provided, the current timestamp — as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node — is taken. + /// + /// # Errors + /// + /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send + /// it on the remaining channels. For each failing channel, an error is logged and counted for. + pub async fn send(&self, data: impl Into, timestamp: Option) -> Result<()> { + let ts = self.check_timestamp(timestamp)?; + let message = LinkMessage::from_payload(data.into(), ts); + + self.forward(message).await + } + + /// Send, *asynchronously*, a [Watermark](LinkMessage::Watermark) on all channels. + /// + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. + /// + /// # Watermarks + /// + /// A [Watermark](LinkMessage::Watermark) is a special kind of message whose purpose is to + /// signal and guarantee the fact that no message with a lower [Timestamp] will be send + /// afterwards. + /// + /// # Errors + /// + /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send + /// it on the remaining channels. For each failing channel, an error is logged and counted for. + pub async fn send_watermark(&self, timestamp: Option) -> Result<()> { + let ts = self.check_timestamp(timestamp)?; + self.last_watermark + .store(ts.get_time().0, Ordering::Relaxed); + let message = LinkMessage::Watermark(ts); + self.forward(message).await + } +} + +/// An [`Output`] sends instances of `T` to downstream Nodes. +/// +/// It's primary purpose is to ensure type guarantees: only types that implement `Into` can be +/// sent to downstream Nodes. +#[derive(Clone)] +pub struct Output { + _phantom: PhantomData, + pub(crate) output_raw: OutputRaw, + pub(crate) serializer: Arc, +} + +// Dereferencing to the [`OutputRaw`](`OutputRaw`) allows to directly call methods on it with a +// typed [`Output`](`Output`). +impl Deref for Output { + type Target = OutputRaw; + + fn deref(&self) -> &Self::Target { + &self.output_raw + } +} + +impl Output { + // Construct the `LinkMessage` to send. + fn construct_message( + &self, + data: impl Into>, + timestamp: Option, + ) -> Result { + let ts = self.check_timestamp(timestamp)?; + let payload = Payload::from_data(data.into(), Arc::clone(&self.serializer)); + Ok(LinkMessage::from_payload(payload, ts)) + } + + /// Send, *asynchronously*, the provided `data` to all downstream Nodes. + /// + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. + /// + /// # Constraint `Into>` + /// + /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that + /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). + /// + /// # Errors + /// + /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it + /// on the remaining channels. For each failing channel, an error is logged and counted for. The + /// total number of encountered errors is returned. + pub async fn send(&self, data: impl Into>, timestamp: Option) -> Result<()> { + self.output_raw + .forward(self.construct_message(data, timestamp)?) + .await + } + + /// Tries to send the provided `data` to all downstream Nodes. + /// + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. + /// + /// # Constraint `Into>` + /// + /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that + /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). + /// + /// # Errors + /// + /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it + /// on the remaining channels. For each failing channel, an error is logged and counted for. The + /// total number of encountered errors is returned. + pub fn try_send(&self, data: impl Into>, timestamp: Option) -> Result<()> { + self.output_raw + .try_forward(self.construct_message(data, timestamp)?) + } +} + +#[cfg(test)] +#[path = "./tests/output-tests.rs"] +mod tests; diff --git a/zenoh-flow-nodes/src/io/tests/input-tests.rs b/zenoh-flow-nodes/src/io/tests/input-tests.rs new file mode 100644 index 00000000..04a901eb --- /dev/null +++ b/zenoh-flow-nodes/src/io/tests/input-tests.rs @@ -0,0 +1,145 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use prost::Message as pMessage; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use super::{Input, InputRaw}; +use crate::{ + messages::{LinkMessage, Message, Payload}, + traits::SendSyncAny, +}; + +/// Test that the Input behaves as expected for the provided data and deserializer: +/// 1. when a Payload::Bytes is received the deserializer is called and produces the correct output, +/// 2. when a Payload::Typed is received the data can correctly be downcasted. +/// +/// ## Scenario tested +/// +/// A typed input is created. +/// +/// We send on the associated channel: +/// 1. a Payload::Bytes (the `expected_serialized`), +/// 2. a Payload::Typed (the `expected_data` upcasted to `dyn SendSyncAny`). +/// +/// ## Traits bound on T +/// +/// The bounds on `T` are more restrictive than what they are in the code. In particular, `Clone` +/// and `std::fmt::Debug` are not required. This has no impact on the test and mostly help us debug. +fn test_typed_input( + expected_data: T, + expected_serialized: Vec, + deserializer: impl Fn(&[u8]) -> anyhow::Result + Send + Sync + 'static, +) { + let hlc = uhlc::HLC::default(); + let (tx, rx) = flume::unbounded::(); + + let input_raw = InputRaw { + port_id: "test-id".into(), + receivers: vec![rx], + }; + + let input = Input { + input_raw, + deserializer: Arc::new(deserializer), + }; + + let message = LinkMessage::from_payload( + Payload::Bytes(Arc::new(expected_serialized)), + hlc.new_timestamp(), + ); + tx.send(message).expect("Failed to send message"); + + let (data, _) = input.try_recv().expect("Message (serialized) was not sent"); + if let Message::Data(data) = data { + assert_eq!(expected_data, *data); + } + + let message = LinkMessage::from_payload( + Payload::Typed(( + Arc::new(expected_data.clone()) as Arc, + // The serializer should never be called, hence the panic. + Arc::new(|_buffer, _data| panic!("Unexpected call to serialize the data")), + )), + hlc.new_timestamp(), + ); + tx.send(message).expect("Failed to send message"); + + let (data, _) = input + .try_recv() + .expect("Message (dyn SendSyncAny) was not sent"); + if let Message::Data(data) = data { + assert_eq!(expected_data, *data); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// SERDE JSON + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct TestData { + pub field1: u8, + pub field2: String, + pub field3: f64, +} + +#[test] +fn test_serde_json() { + let expected_data = TestData { + field1: 1u8, + field2: "test".to_string(), + field3: 0.2f64, + }; + + let expected_serialized = + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + + test_typed_input(expected_data, expected_serialized, |bytes| { + serde_json::de::from_slice::(bytes).map_err(|e| anyhow::anyhow!(e)) + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// PROTOBUF PROST + +// This structure was generated using the `prost-build` crate. We copied & pasted it here such that +// we do not have to include `prost-build` as a build dependency to Zenoh-Flow. Our only purpose is +// to ensure that at least one implementation of ProtoBuf works, not to suggest to use Prost. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TestProto { + #[prost(int64, tag = "1")] + pub field1: i64, + #[prost(string, tag = "2")] + pub field2: ::prost::alloc::string::String, + #[prost(double, tag = "3")] + pub field3: f64, +} + +#[test] +fn test_protobuf_prost() { + let expected_data = TestProto { + field1: 1i64, + field2: "test".to_string(), + field3: 0.2f64, + }; + + // First test, send data serialized. + let expected_serialized = expected_data.encode_to_vec(); + + test_typed_input(expected_data, expected_serialized, |bytes| { + ::decode(bytes).map_err(|e| anyhow::anyhow!(e)) + }) +} diff --git a/zenoh-flow-nodes/src/io/tests/output-tests.rs b/zenoh-flow-nodes/src/io/tests/output-tests.rs new file mode 100644 index 00000000..5752e0ec --- /dev/null +++ b/zenoh-flow-nodes/src/io/tests/output-tests.rs @@ -0,0 +1,150 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc}; +use zenoh_flow_commons::PortId; + +use super::Outputs; +use crate::messages::{LinkMessage, Payload}; + +/// Test that the Output behaves as expected for the provided data and serializer: +/// 1. the `serializer` is correctly type-erased yet still produces the correct output, +/// 2. the `expected_data` is not eagerly serialized and can correctly be downcasted. +/// +/// ## Scenario tested +/// +/// A bogus output is generated — see the call to `outputs.take`. We go through the `Outputs` +/// structure such that the transformation on the serializer is performed (i.e. the type is erased). +/// +/// The provided `expected_data` is sent on the output. +/// +/// A receiver channel ensures that: +/// 1. it is a `Payload::Typed`, +/// 2. we can still downcast it to `T`, +/// 3. the result of the serialization is correct. +/// +/// ## Traits on T +/// +/// The bounds on `T` are more restrictive than what they are in the code. In particular, `Clone` +/// and `std::fmt::Debug` are not required. This has no impact on the test and mostly help us debug. +fn test_typed_output( + expected_data: T, + expected_serialized: Vec, + serializer: impl for<'b, 'a> Fn(&'b mut Vec, &'a T) -> anyhow::Result<()> + + Send + + Sync + + 'static, +) { + let hlc = uhlc::HLC::default(); + let key: PortId = "test".into(); + + let (tx, rx) = flume::unbounded::(); + + let mut outputs = Outputs { + hmap: HashMap::from([(key.clone(), vec![tx])]), + hlc: Arc::new(hlc), + }; + + let output = outputs + .take(key.as_ref()) + .expect("Wrong key provided") + .typed(serializer); + + output + .try_send(expected_data.clone(), None) + .expect("Failed to send the message"); + + let message = rx.recv().expect("Received no message"); + match message { + LinkMessage::Data(data) => match &*data { + Payload::Bytes(_) => panic!("Unexpected bytes payload"), + Payload::Typed((dyn_data, serializer)) => { + let mut dyn_serialized = Vec::new(); + (serializer)(&mut dyn_serialized, dyn_data.clone()).expect("Failed to serialize"); + assert_eq!(expected_serialized, dyn_serialized); + + let data = (**dyn_data) + .as_any() + .downcast_ref::() + .expect("Failed to downcast"); + assert_eq!(expected_data, *data); + } + }, + LinkMessage::Watermark(_) => panic!("Unexpected watermark message"), + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// SERDE JSON + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct TestData { + pub field1: u8, + pub field2: String, + pub field3: f64, +} + +#[test] +fn test_serde_json() { + let expected_data = TestData { + field1: 1u8, + field2: "two".into(), + field3: 0.3f64, + }; + + let expected_serialized = + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + + let serializer = |buffer: &mut Vec, data: &TestData| { + serde_json::ser::to_writer(buffer, data).map_err(|e| anyhow::anyhow!(e)) + }; + + test_typed_output(expected_data, expected_serialized, serializer) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// PROTOBUF PROST + +// This structure was generated using the `prost-build` crate. We copied & pasted it here such that +// we do not have to include `prost-build` as a build dependency to Zenoh-Flow. Our only purpose is +// to ensure that at least one implementation of ProtoBuf works, not to suggest to use Prost. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TestProto { + #[prost(int64, tag = "1")] + pub field1: i64, + #[prost(string, tag = "2")] + pub field2: ::prost::alloc::string::String, + #[prost(double, tag = "3")] + pub field3: f64, +} + +#[test] +fn test_protobuf_prost() { + let expected_data = TestProto { + field1: 1i64, + field2: "two".into(), + field3: 0.3f64, + }; + + let expected_serialized = expected_data.encode_to_vec(); + + let serializer = |buffer: &mut Vec, data: &TestProto| { + data.encode(buffer).map_err(|e| anyhow::anyhow!(e)) + }; + + test_typed_output(expected_data, expected_serialized, serializer) +} diff --git a/zenoh-flow-nodes/src/io/tests/test_types.proto b/zenoh-flow-nodes/src/io/tests/test_types.proto new file mode 100644 index 00000000..0c70e1fe --- /dev/null +++ b/zenoh-flow-nodes/src/io/tests/test_types.proto @@ -0,0 +1,23 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +syntax = "proto3"; + +package testtypes.test_types; + +message TestProto { + int64 field1 = 1; + string field2 = 2; + double field3 = 3; +} diff --git a/zenoh-flow-nodes/src/lib.rs b/zenoh-flow-nodes/src/lib.rs new file mode 100644 index 00000000..fb5ed291 --- /dev/null +++ b/zenoh-flow-nodes/src/lib.rs @@ -0,0 +1,40 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod declaration; +pub use declaration::{NodeDeclaration, OperatorFn, SinkFn, SourceFn, CORE_VERSION, RUSTC_VERSION}; + +pub(crate) mod io; +pub(crate) mod messages; +pub(crate) mod traits; + +pub mod prelude { + pub use crate::io::{Input, Inputs, Output, Outputs}; + pub use crate::messages::{Data, DataMessage, LinkMessage, Message}; + pub use crate::traits::{Node, Operator, SendSyncAny, Sink, Source}; + pub use crate::Context; + pub use zenoh_flow_commons::{Configuration, Result}; + pub use zenoh_flow_derive::{export_operator, export_sink, export_source}; +} + +use std::sync::Arc; +use uuid::Uuid; +use zenoh_flow_commons::RuntimeId; + +/// TODO@J-Loudet +pub struct Context { + pub flow_name: Arc, + pub flow_uuid: Uuid, + pub runtime_id: RuntimeId, +} diff --git a/zenoh-flow-nodes/src/macros.rs b/zenoh-flow-nodes/src/macros.rs new file mode 100644 index 00000000..18eed9ef --- /dev/null +++ b/zenoh-flow-nodes/src/macros.rs @@ -0,0 +1,216 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, DeriveInput}; + +/// The `export_source` attribute macro is provided to allow the users +/// in exporting their source. +/// +/// ## Example +/// +/// ```no_compile +/// use async_trait::async_trait; +/// use std::sync::Arc; +/// use zenoh_flow::prelude::*; +/// +/// #[export_source] +/// pub struct MySource; +/// +/// #[async_trait] +/// impl Source for MySource{ +/// async fn new( +/// context: Context, +/// configuration: Option, +/// outputs: Outputs, +/// ) -> Result { +/// todo!() +/// } +/// } +/// +/// #[async_trait] +/// impl Node for MySource { +/// async fn iteration(&self) -> Result<()> { +/// todo!() +/// } +/// } +/// +/// ``` +#[proc_macro_attribute] +pub fn export_source(_: TokenStream, input: TokenStream) -> TokenStream { + let ast = parse_macro_input!(input as DeriveInput); + let ident = &ast.ident; + + let gen = quote! { + + #ast + + #[doc(hidden)] + #[no_mangle] + pub static _zf_export_source: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< + zenoh_flow::runtime::dataflow::node::SourceFn, + > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< + zenoh_flow::runtime::dataflow::node::SourceFn, + > { + rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, + core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, + constructor: |context: zenoh_flow::types::Context, + configuration: Option, + outputs: zenoh_flow::io::Outputs| { + std::boxed::Box::pin(async { + let node = <#ident>::new(context, configuration, outputs).await?; + Ok(std::sync::Arc::new(node) as std::sync::Arc) + }) + }, + }; + }; + gen.into() +} + +/// The `export_sink` attribute macro is provided to allow the users +/// in exporting their sink. +/// +/// ## Example +/// +/// ```no_compile +/// use async_trait::async_trait; +/// use std::sync::Arc; +/// use zenoh_flow::prelude::*; +/// +/// #[export_sink] +/// pub struct MySink; +/// +/// #[async_trait] +/// impl Sink for MySink { +/// async fn new( +/// context: Context, +/// configuration: Option, +/// inputs: Inputs, +/// ) -> Result { +/// todo!() +/// } +/// } +/// +/// #[async_trait] +/// impl Node for MySink { +/// async fn iteration(&self) -> Result<()> { +/// todo!() +/// } +/// } +/// +/// ``` +#[proc_macro_attribute] +pub fn export_sink(_: TokenStream, input: TokenStream) -> TokenStream { + let ast = parse_macro_input!(input as DeriveInput); + let ident = &ast.ident; + + let sink = quote! {#ast}; + + let constructor = quote! { + + #[doc(hidden)] + #[no_mangle] + pub static _zf_export_sink: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< + zenoh_flow::runtime::dataflow::node::SinkFn, + > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< + zenoh_flow::runtime::dataflow::node::SinkFn, + > { + rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, + core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, + constructor: |context: zenoh_flow::types::Context, + configuration: Option, + mut inputs: zenoh_flow::io::Inputs| { + std::boxed::Box::pin(async { + let node = <#ident>::new(context, configuration, inputs).await?; + Ok(std::sync::Arc::new(node) as std::sync::Arc) + }) + }, + }; + }; + + let gen = quote! { + #sink + #constructor + + }; + gen.into() +} + +/// The `export_operator` attribute macro is provided to allow the users +/// in exporting their operator. +/// +/// ## Example +/// +/// ```no_compile +/// use async_trait::async_trait; +/// use std::sync::Arc; +/// use zenoh_flow::prelude::*; +/// +/// #[export_operator] +/// struct MyOperator; +/// +/// #[async_trait] +/// impl Operator for MyOperator { +/// fn new( +/// context: Context, +/// configuration: Option, +/// inputs: Inputs, +/// outputs: Outputs, +/// ) -> Result +/// where +/// Self: Sized { +/// todo!() +/// } +/// } +/// +/// #[async_trait] +/// impl Node for MyOperator { +/// async fn iteration(&self) -> Result<()> { +/// todo!() +/// } +/// } +/// +/// ``` +#[proc_macro_attribute] +pub fn export_operator(_: TokenStream, input: TokenStream) -> TokenStream { + let ast = parse_macro_input!(input as DeriveInput); + let ident = &ast.ident; + + let gen = quote! { + + #ast + + #[doc(hidden)] + #[no_mangle] + pub static _zf_export_operator: zenoh_flow::runtime::dataflow::loader::NodeDeclaration< + zenoh_flow::runtime::dataflow::node::OperatorFn, + > = zenoh_flow::runtime::dataflow::loader::NodeDeclaration::< + zenoh_flow::runtime::dataflow::node::OperatorFn, + > { + rustc_version: zenoh_flow::runtime::dataflow::loader::RUSTC_VERSION, + core_version: zenoh_flow::runtime::dataflow::loader::CORE_VERSION, + constructor: |context: zenoh_flow::types::Context, + configuration: Option, + mut inputs: zenoh_flow::io::Inputs, + mut outputs: zenoh_flow::io::Outputs| { + std::boxed::Box::pin(async { + let node = <#ident>::new(context, configuration, inputs, outputs).await?; + Ok(std::sync::Arc::new(node) as std::sync::Arc) + }) + }, + }; + }; + gen.into() +} diff --git a/zenoh-flow-nodes/src/messages.rs b/zenoh-flow-nodes/src/messages.rs new file mode 100644 index 00000000..b9d21552 --- /dev/null +++ b/zenoh-flow-nodes/src/messages.rs @@ -0,0 +1,501 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::traits::SendSyncAny; +use anyhow::{bail, Context}; +use serde::{Deserialize, Serialize}; +use std::ops::Deref; +use std::sync::Arc; +use std::{cmp::Ordering, fmt::Debug}; +use uhlc::Timestamp; +use uuid::Uuid; +use zenoh_flow_commons::{NodeId, PortId, Result}; + +/// `SerializerFn` is a type-erased version of the serializer function provided by node developer. +/// +/// It is passed to downstream nodes (residing on the same process) in case they need to serialize +/// the data they receive typed. +/// Passing around the function allows us to serialize only when needed and without requiring prior +/// knowledge. +pub(crate) type SerializerFn = + dyn Fn(&mut Vec, Arc) -> Result<()> + Send + Sync; + +/// This function is what Zenoh-Flow will use to deserialize the data received on the `Input`. +/// +/// It will be called for instance when data is received serialized (i.e. from an upstream node that +/// is either not implemented in Rust or on a different process) before it is given to the user's +/// code. +pub(crate) type DeserializerFn = dyn Fn(&[u8]) -> Result + Send + Sync; + +/// A `Payload` is Zenoh-Flow's lowest message container. +/// +/// It either contains serialized data, i.e. `Bytes` (if received from the network, or from nodes +/// not written in Rust), or `Typed` data as a tuple `(`[Any](`std::any::Any`)`, SerializerFn)`. +#[derive(Clone, Serialize, Deserialize)] +pub enum Payload { + /// Serialized data, coming either from Zenoh of from non-Rust node. + Bytes(Arc>), + #[serde(skip_serializing, skip_deserializing)] + /// Data coming from another Rust node located on the same process that can either be downcasted + /// (provided that its actual type is known) or serialized. + Typed((Arc, Arc)), +} + +impl Debug for Payload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Payload::Bytes(_) => write!(f, "Payload::Bytes"), + Payload::Typed(_) => write!(f, "Payload::Typed"), + } + } +} + +impl Payload { + pub fn from_data( + data: Data, + serializer: Arc, + ) -> Self { + match data.inner { + DataInner::Payload { payload, data: _ } => payload, + DataInner::Data(data) => { + Self::Typed((Arc::new(data) as Arc, serializer)) + } + } + } + + /// Populate `buffer` with the bytes representation of the [Payload]. + /// + /// # Performance + /// + /// This method will serialize the [Payload] if it is `Typed`. Otherwise, the bytes + /// representation is simply cloned. + /// + /// The provided `buffer` is reused and cleared between calls, so once its capacity stabilizes + /// no more allocation is performed. + pub(crate) fn try_as_bytes_into(&self, buffer: &mut Vec) -> Result<()> { + buffer.clear(); // remove previous data but keep the allocated capacity + + match self { + Payload::Bytes(bytes) => { + (**bytes).clone_into(buffer); + Ok(()) + } + Payload::Typed((typed_data, serializer)) => { + (serializer)(buffer, Arc::clone(typed_data)) + } + } + } + + /// Return an [Arc] containing the bytes representation of the [Payload]. + /// + /// # Performance + /// + /// This method will only serialize (and thus allocate) the [Payload] if it is typed. Otherwise + /// the [Arc] is cloned. + // + // NOTE: This method is used by, at least, our Python API. + pub fn try_as_bytes(&self) -> Result>> { + match self { + Payload::Bytes(bytes) => Ok(bytes.clone()), + Payload::Typed((typed_data, serializer)) => { + let mut buffer = Vec::default(); + (serializer)(&mut buffer, Arc::clone(typed_data))?; + Ok(Arc::new(buffer)) + } + } + } +} + +/// Creates a new `Data` from a `Vec`. +/// +/// In order to avoid copies it puts the data inside an `Arc`. +impl From> for Payload { + fn from(bytes: Vec) -> Self { + Self::Bytes(Arc::new(bytes)) + } +} + +/// Creates a new `Data` from a `&[u8]`. +impl From<&[u8]> for Payload { + fn from(bytes: &[u8]) -> Self { + Self::Bytes(Arc::new(bytes.to_vec())) + } +} + +impl From for Payload { + fn from(data_message: DataMessage) -> Self { + data_message.data + } +} + +/// Zenoh-Flow data message. +/// +/// It contains the actual data, the timestamp associated, the end to end deadline, the end to end +/// deadline misses and loop contexts. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DataMessage { + pub(crate) data: Payload, + pub(crate) timestamp: Timestamp, +} + +impl Deref for DataMessage { + type Target = Payload; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl DataMessage { + /// Creates a new message from serialized data. + /// + /// This is used when the message is coming from Zenoh or from a non-rust node. + pub fn new_serialized(data: Vec, timestamp: Timestamp) -> Self { + Self { + data: Payload::Bytes(Arc::new(data)), + timestamp, + } + } + + /// Return the [Timestamp] associated with this [DataMessage]. + // + // NOTE: This method is used by, at least, our Python API. + pub fn get_timestamp(&self) -> &Timestamp { + &self.timestamp + } +} + +/// Metadata stored in Zenoh's time series storages. +/// It contains information about the recording. +/// Multiple [`RecordingMetadata`](`RecordingMetadata`) can be used +/// to synchronize the recording from different Ports. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RecordingMetadata { + pub(crate) timestamp: Timestamp, + pub(crate) port_id: PortId, + pub(crate) node_id: NodeId, + pub(crate) flow_id: Uuid, + pub(crate) instance_id: Uuid, +} + +/// Zenoh Flow control messages. +/// It contains the control messages used within Zenoh Flow. +/// For the time being only the `RecordingStart` and `RecordingStop` messages +/// have been defined, +/// *Note*: Most of messages are not yet defined. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ControlMessage { + // These messages are not yet defined, those are some ideas + // ReadyToMigrate, + // ChangeMode(u8, u128), + RecordingStart(RecordingMetadata), + RecordingStop(Timestamp), +} + +/// The Zenoh-Flow message that is sent across `Link` and across Zenoh. +/// +/// It contains either a [`DataMessage`](`DataMessage`) or a [`Timestamp`](`uhlc::Timestamp`), +/// in such case the `LinkMessage` variant is `Watermark`. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum LinkMessage { + Data(DataMessage), + Watermark(Timestamp), +} + +impl LinkMessage { + /// Creates a `LinkMessage::Data` from a [`Payload`](`Payload`). + pub fn from_payload(output: Payload, timestamp: Timestamp) -> Self { + Self::Data(DataMessage { + data: output, + timestamp, + }) + } + + /// Serializes the [LinkMessage] using [bincode] into the given `buffer`. + /// + /// The `inner_buffer` is used to serialize (if need be) the [Payload] contained inside the + /// [LinkMessage]. + /// + /// # Performance + /// + /// The provided `buffer` and `inner_buffer` are reused and cleared between calls, so once their + /// capacity stabilizes no (re)allocation is performed. + /// + /// # Errors + /// + /// An error variant is returned in case of: + /// - fails to serialize + pub fn serialize_bincode_into( + &self, + message_buffer: &mut Vec, + payload_buffer: &mut Vec, + ) -> Result<()> { + payload_buffer.clear(); // empty the buffers but keep their allocated capacity + message_buffer.clear(); + + match &self { + LinkMessage::Data(data_message) => match &data_message.data { + Payload::Bytes(_) => bincode::serialize_into(message_buffer, &self) + .context("Failed to serialize `Payload::Bytes``"), + Payload::Typed((data, serializer)) => { + (serializer)(payload_buffer, Arc::clone(data))?; + let serialized_message = LinkMessage::Data(DataMessage { + data: Payload::Bytes(Arc::new(payload_buffer.clone())), + timestamp: data_message.timestamp, + }); + + bincode::serialize_into(message_buffer, &serialized_message) + .context("Failed to serialize `Payload::Typed`") + } + }, + _ => bincode::serialize_into(message_buffer, &self) + .context("Failed to serialize `LinkMessage::Watermark`"), + } + } + + /// Serializes the [LinkMessage] using [bincode] into the given `shm_buffer` shared memory + /// buffer. + /// + /// The `inner_buffer` is used to serialize (if need be) the [Payload] contained inside the + /// [LinkMessage]. + /// + /// # Performance + /// + /// The provided `inner_buffer` is reused and cleared between calls, so once its capacity + /// stabilizes no (re)allocation is performed. + /// + /// # Errors + /// + /// An error variant is returned in case of: + /// - fails to serialize + /// - there is not enough space in the slice + pub fn serialize_bincode_into_shm( + &self, + shm_buffer: &mut [u8], + payload_buffer: &mut Vec, + ) -> Result<()> { + payload_buffer.clear(); // empty the buffer but keep the allocated capacity + + match &self { + LinkMessage::Data(data_message) => match &data_message.data { + Payload::Bytes(_) => bincode::serialize_into(shm_buffer, &self) + .context("Failed to serialize `Payload::Bytes`"), + Payload::Typed(_) => { + data_message.try_as_bytes_into(payload_buffer)?; + let serialized_message = LinkMessage::Data(DataMessage::new_serialized( + payload_buffer.clone(), + data_message.timestamp, + )); + bincode::serialize_into(shm_buffer, &serialized_message) + .context("Failed to serialize `Payload::Typed`") + } + }, + _ => bincode::serialize_into(shm_buffer, &self) + .context("Failed to serialize `LinkMessage::Watermark`"), + } + } + + /// Returns the `Timestamp` associated with the message. + pub fn get_timestamp(&self) -> Timestamp { + match self { + Self::Data(data) => data.timestamp, + Self::Watermark(ref ts) => *ts, + // Self::Control(ref ctrl) => match ctrl { + // ControlMessage::RecordingStart(ref rs) => rs.timestamp, + // ControlMessage::RecordingStop(ref ts) => *ts, + // }, + // _ => Err(ErrorKind::Unsupported), + } + } +} + +// Manual Ord implementation for message ordering when replay +impl Ord for LinkMessage { + fn cmp(&self, other: &Self) -> Ordering { + self.get_timestamp().cmp(&other.get_timestamp()) + } +} + +impl PartialOrd for LinkMessage { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LinkMessage { + fn eq(&self, other: &Self) -> bool { + self.get_timestamp() == other.get_timestamp() + } +} + +impl Eq for LinkMessage {} + +/// A `Message` is what is received on an `Input`, typically after a call to `try_recv` or +/// `recv`. +/// +/// A `Message` can either contain [`Data`](`Data`), or signal a _Watermark_. +#[derive(Debug)] +pub enum Message { + Data(Data), + Watermark, +} + +/// A `Data` is a convenience wrapper around `T`. +/// +/// Upon reception, it transparently deserializes to `T` when the message is received serialized. It +/// downcasts it to a `&T` when the data is passed "typed" through a channel. +/// +/// ## Performance +/// +/// When deserializing, an allocation is performed. +#[derive(Debug)] +pub struct Data { + inner: DataInner, +} + +/// The `DataInner` enum represents the two ways to send data in an [`Output`](`Output`). +/// +/// The `Payload` variant corresponds to a previously generated `Data` being sent. +/// The `Data` variant corresponds to a new instance of `T` being sent. +pub(crate) enum DataInner { + Payload { payload: Payload, data: Option }, + Data(T), +} + +impl Debug for DataInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DataInner::Payload { payload, data } => { + let data = if data.is_some() { "Some" } else { "None" }; + write!(f, "DataInner::Payload: {:?} - data: {}", payload, data) + } + DataInner::Data(_) => write!(f, "DataInner::Data(T)"), + } + } +} + +// Implementing `From` allows us to accept instances of `T` in the signature of `send` and +// `try_send` methods as `T` will implement `impl Into>`. +impl From for Data { + fn from(value: T) -> Self { + Self { + inner: DataInner::Data(value), + } + } +} + +// The implementation of `Deref` is what allows users to transparently manipulate the type `T`. +// +// ## SAFETY +// +// Despite the presence of `expect` and `panic!`, we should never end up in these situations in +// normal circumstances. +// +// Let us reason here as to why this is "safe". +// +// The call to `expect` happens when the inner data is a [`Typed`](`Payload::Typed`) payload and the +// downcasts to `T` fails. This should not happen because of the way a [`Data`](`Data`) is created: +// upon creation we first perform a check that the provided typed payload can actually be downcasted +// to `T` — see the method `Data::try_from_payload`. +// +// The call to `panic!` happens when the inner data is a [`Bytes`](`Payload::Bytes`) payload and the +// `data` field is `None`. Again, this should not happen because of the way a [`Data`](`Data`) is +// created: upon creation, if the data is received as bytes, we first deserialize it and set the +// `data` field to `Some(T)` — see the method `Data::try_from_payload`. +impl Deref for Data { + type Target = T; + + fn deref(&self) -> &Self::Target { + match &self.inner { + DataInner::Payload { payload, data } => { + if let Some(data) = data { + data + } else if let Payload::Typed((typed, _)) = payload { + (**typed).as_any().downcast_ref::().expect( + r#"You probably managed to find a very nasty flaw in Zenoh-Flow’s code as we +believed this situation would never happen (unless explicitely triggered — "explicitely" being an +understatement here, we feel it’s more like you really, really, wanted to see that message — in +which case, congratulations!). + +Our guess as to what happened is that: +- the data in `Payload::Typed` was, at first, correct (where we internally do the + `as_any().is::()` check), +- in between this check and the call to `deref` the underlying data somehow changed. + +If we did not do a mistake — fortunately the most likely scenario — then we do not know what +happened and we would be eager to investigate. + +Feel free to contact us at < zenoh@zettascale.tech >. +"#, + ) + } else { + panic!( + r#"You probably managed to find a very nasty flaw in Zenoh-Flow's code as we +believed this situation would never happen (unless explicitely triggered — "explicitely" being an +understatement here, we feel it's more like you really, really, wanted to see that message — in +which case, congratulations!). + +Our guess as to what happened is that: +- the `data` field is a `Payload::Bytes`, +- the `typed` field is set to `None`. + +If we did not do a mistake — fortunately the most likely scenario — then we do not know what +happened and we would be eager to investigate. + +Feel free to contact us at < zenoh@zettascale.tech >. +"# + ) + } + } + DataInner::Data(data) => data, + } + } +} + +impl Data { + /// Try to create a new [`Data`](`Data`) based on a [`Payload`](`Payload`). + /// + /// Depending on the variant of [`Payload`](`Payload`) different steps are performed: + /// - if `Payload::Bytes` then Zenoh-Flow tries to deserialize to an instance of `T` (performing + /// an allocation), + /// - if `Payload::Typed` then Zenoh-Flow checks that the underlying type matches `T` (relying + /// on [`Any`](`Any`)). + /// + /// ## Errors + /// + /// An error will be returned if the Payload does not match `T`, i.e. if the deserialization or + /// the downcast failed. + pub(crate) fn try_from_payload( + payload: Payload, + deserializer: Arc>, + ) -> Result { + let mut typed = None; + + match payload { + Payload::Bytes(ref bytes) => typed = Some((deserializer)(bytes.as_slice())?), + Payload::Typed((ref typed, _)) => { + if !(**typed).as_any().is::() { + bail!("Failed to downcast provided value") + } + } + } + + Ok(Self { + inner: DataInner::Payload { + payload, + data: typed, + }, + }) + } +} diff --git a/zenoh-flow-nodes/src/traits.rs b/zenoh-flow-nodes/src/traits.rs new file mode 100644 index 00000000..cb2dab2b --- /dev/null +++ b/zenoh-flow-nodes/src/traits.rs @@ -0,0 +1,276 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::io::{Inputs, Outputs}; +use crate::Context; +use async_trait::async_trait; +use std::any::Any; +use zenoh_flow_commons::{Configuration, Result}; + +/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process +/// without serializing. +/// +/// This trait is implemented for any type that has the `static` lifetime and implements `Send` and +/// `Sync`. These constraints are the same than for the typed `Input` and `Output` which means that +/// there is absolutely no need to manually implement it. +pub trait SendSyncAny: Send + Sync { + fn as_any(&self) -> &dyn Any; + + fn as_mut_any(&mut self) -> &mut dyn Any; +} + +impl SendSyncAny for T { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } +} + +/// A `Node` is defined by its `iteration` that is repeatedly called by Zenoh-Flow. +/// +/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a +/// state and to mutate it, the interior mutability pattern is necessary. +/// +/// A struct implementing the Node trait typically needs to keep a reference to the `Input` and +/// `Output` it needs. +/// +/// For usage examples see: [`Operator`](`Operator`), [`Source`](`Source`) or [`Sink`](`Sink`) +/// traits. +#[async_trait] +pub trait Node: Send + Sync { + async fn iteration(&self) -> Result<()>; +} + +/// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and +/// their purpose is to fetch data from the external world. +/// +/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a +/// state and to mutate it, the interior mutability pattern is necessary. +/// +/// A struct implementing the Source trait typically needs to keep a reference to the `Output` it +/// needs. +/// +/// ## Example +/// +/// ```no_run +/// use async_trait::async_trait; +/// use zenoh_flow_nodes::prelude::*; +/// +/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load +/// // the shared library. +/// #[export_source] +/// pub struct MySource { +/// output: Output, +/// // The state could go in such structure. +/// // state: Arc>, +/// } +/// +/// #[async_trait::async_trait] +/// impl Source for MySource { +/// async fn new( +/// _context: Context, +/// _configuration: Option, +/// mut outputs: Outputs, +/// ) -> Result { +/// let output = outputs +/// .take("out") +/// .expect("No output called 'out' found") +/// .typed(|buffer, data| todo!("Provide your serializer here")); +/// +/// Ok(Self { output }) +/// } +/// } +/// +/// #[async_trait::async_trait] +/// impl Node for MySource { +/// async fn iteration(&self) -> Result<()> { +/// // To mutate the state, first lock it. +/// // +/// // let state = self.state.lock().await; +/// // +/// // The state is a way for the Source to read information from the external world, i.e., +/// // interacting with I/O devices. We mimick an asynchronous iteraction with a sleep. +/// +/// self.output.send(10usize, None).await +/// } +/// } +/// ``` +#[async_trait] +pub trait Source: Node + Send + Sync { + /// For a `Context`, a `Configuration` and a set of `Outputs`, produce a new *Source*. + /// + /// Sources only possess `Outputs` and their purpose is to fetch data from the external world. + /// + /// Sources are **started last** when initiating a data flow. This is to prevent data loss: if a + /// Source is started before its downstream nodes then the data it would send before said + /// downstream nodes are up would be lost. + async fn new( + context: Context, + configuration: Option, + outputs: Outputs, + ) -> Result + where + Self: Sized; +} + +/// The `Operator` trait represents an Operator inside Zenoh-Flow. +/// +/// Operators are at the heart of a data flow, they carry out computations on the data they receive +/// before sending them out to the next downstream node. +/// +/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a +/// state and to mutate it, the interior mutability pattern is necessary. +/// +/// A struct implementing the Operator trait typically needs to keep a reference to the `Input` and +/// `Output` it needs. +/// +/// ## Example +/// +/// ```no_run +/// use async_trait::async_trait; +/// use zenoh_flow_nodes::prelude::*; +/// +/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load +/// // the shared library. +/// #[export_operator] +/// struct NoOp { +/// input: Input, +/// output: Output, +/// } +/// +/// #[async_trait] +/// impl Operator for NoOp { +/// async fn new( +/// _context: Context, +/// _configuration: Option, +/// mut inputs: Inputs, +/// mut outputs: Outputs, +/// ) -> Result { +/// Ok(NoOp { +/// input: inputs +/// .take("in") +/// .expect("No input called 'in' found") +/// .typed(|bytes| todo!("Provide your deserializer here")), +/// output: outputs +/// .take("out") +/// .expect("No output called 'out' found") +/// .typed(|buffer, data| todo!("Provide your serializer here")), +/// }) +/// } +/// } +/// #[async_trait] +/// impl Node for NoOp { +/// async fn iteration(&self) -> Result<()> { +/// let (message, _timestamp) = self.input.recv().await?; +/// match message { +/// Message::Data(t) => self.output.send(*t, None).await?, +/// Message::Watermark => println!("Watermark"), +/// } +/// Ok(()) +/// } +/// } +/// ``` +#[async_trait] +pub trait Operator: Node + Send + Sync { + /// For a `Context`, a `Configuration`, a set of `Inputs` and `Outputs`, produce a new + /// **Operator**. + /// + /// Operators are at the heart of a data flow, they carry out computations on the data they + /// receive before sending them out to the next downstream node. + /// + /// The Operators are started *before the Sources* such that they are active before the first + /// data are produced. + async fn new( + context: Context, + configuration: Option, + inputs: Inputs, + outputs: Outputs, + ) -> Result + where + Self: Sized; +} + +/// The `Sink` trait represents a Sink of data in Zenoh Flow. +/// +/// Sinks only possess `Inputs`, their objective is to send the result of the computations to the +/// external world. +/// +/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a +/// state and to mutate it, the interior mutability pattern is necessary. +/// +/// A struct implementing the Sink trait typically needs to keep a reference to the `Input` it +/// needs. +/// +/// ## Example +/// +/// ```no_run +/// use async_trait::async_trait; +/// use zenoh_flow_nodes::prelude::*; +/// +/// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load +/// // the shared library. +/// #[export_sink] +/// struct GenericSink { +/// input: Input, +/// } +/// +/// #[async_trait] +/// impl Sink for GenericSink { +/// async fn new( +/// _context: Context, +/// _configuration: Option, +/// mut inputs: Inputs, +/// ) -> Result { +/// let input = inputs +/// .take("in") +/// .expect("No input called 'in' found") +/// .typed(|bytes| todo!("Provide your deserializer here")); +/// +/// Ok(GenericSink { input }) +/// } +/// } +/// +/// #[async_trait] +/// impl Node for GenericSink { +/// async fn iteration(&self) -> Result<()> { +/// let (message, _timestamp) = self.input.recv().await?; +/// match message { +/// Message::Data(t) => println!("{}", *t), +/// Message::Watermark => println!("Watermark"), +/// } +/// +/// Ok(()) +/// } +/// } +/// ``` +#[async_trait] +pub trait Sink: Node + Send + Sync { + /// For a `Context`, a `Configuration` and a set of `Inputs`, produce a new **Sink**. + /// + /// Sinks only possess `Inputs`, their objective is to send the result of the computations to the + /// external world. + /// + /// Sinks are **started first** when initiating a data flow. As they are at the end of the chain of + /// computations, by starting them first we ensure that no data is lost. + async fn new( + context: Context, + configuration: Option, + inputs: Inputs, + ) -> Result + where + Self: Sized; +} diff --git a/zenoh-flow/src/traits.rs b/zenoh-flow/src/traits.rs index 007907a7..947e9ebc 100644 --- a/zenoh-flow/src/traits.rs +++ b/zenoh-flow/src/traits.rs @@ -52,7 +52,7 @@ impl SendSyncAny for T { /// /// ## Example /// -/// ```no_run +/// ```ignore /// use zenoh_flow::prelude::*; /// /// // Use our provided macro to expose the symbol that Zenoh-Flow will look for when it will load @@ -75,7 +75,7 @@ impl SendSyncAny for T { /// .take("out") /// .expect("No output called 'out' found") /// .typed(|buffer, data| todo!("Provide your serializer here")); -/// +/// /// Ok(Self { output }) /// } /// } @@ -127,7 +127,7 @@ pub trait Source: Node + Send + Sync { /// /// ## Example /// -/// ```no_run +/// ```ignore /// use async_trait::async_trait; /// use zenoh_flow::prelude::*; /// @@ -198,7 +198,7 @@ pub trait Sink: Node + Send + Sync { /// /// ## Example /// -/// ```no_run +/// ```ignore /// use async_trait::async_trait; /// use zenoh_flow::prelude::*; ///