Skip to content

Commit

Permalink
Merge pull request #75 from atolab/feat/loops
Browse files Browse the repository at this point in the history
feat: add Loop support
  • Loading branch information
gabrik authored Jan 17, 2022
2 parents 414d5c6 + be8094d commit 8ee90bc
Show file tree
Hide file tree
Showing 28 changed files with 2,377 additions and 243 deletions.
4 changes: 2 additions & 2 deletions cargo-zenoh-flow/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ async fn main() {
let target = if release {
format!(
"{}/release/{}{}{}",
target_dir.as_path().display().to_string(),
target_dir.as_path().display(),
std::env::consts::DLL_PREFIX,
node_info.id,
std::env::consts::DLL_SUFFIX
)
} else {
format!(
"{}/debug/{}{}{}",
target_dir.as_path().display().to_string(),
target_dir.as_path().display(),
std::env::consts::DLL_PREFIX,
node_info.id,
std::env::consts::DLL_SUFFIX
Expand Down
6 changes: 3 additions & 3 deletions zenoh-flow-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,19 @@ async fn main() {
instance.flow,
instance
.operators
.iter()
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.sinks
.iter()
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
instance
.sources
.iter()
.values()
.map(|o| format!("{}", o))
.collect::<Vec<String>>()
.join("\n"),
Expand Down
2 changes: 1 addition & 1 deletion zenoh-flow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub enum ZFError {
NodeNotFound(NodeId),
PortNotFound((NodeId, PortId)),
PortNotConnected((NodeId, PortId)),
NotRecoding,
NotRecording,
AlreadyRecording,
NoPathBetweenNodes(((NodeId, PortId), (NodeId, PortId))),
}
Expand Down
1 change: 1 addition & 0 deletions zenoh-flow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use ::typetag;
pub mod model;
pub mod runtime;
pub use runtime::deadline::LocalDeadlineMiss;
pub use runtime::loops::*;
pub use runtime::message::*;
pub use runtime::token::*;
pub mod types;
Expand Down
19 changes: 17 additions & 2 deletions zenoh-flow/src/model/dataflow/descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::model::dataflow::validator::DataflowValidator;
use crate::model::deadline::E2EDeadlineDescriptor;
use crate::model::link::LinkDescriptor;
use crate::model::loops::LoopDescriptor;
use crate::model::node::{OperatorDescriptor, SinkDescriptor, SourceDescriptor};
use crate::serde::{Deserialize, Serialize};
use crate::types::{NodeId, RuntimeId, ZFError, ZFResult};
Expand All @@ -37,6 +38,7 @@ pub struct DataFlowDescriptor {
pub links: Vec<LinkDescriptor>,
pub mapping: Option<Vec<Mapping>>,
pub deadlines: Option<Vec<E2EDeadlineDescriptor>>,
pub loops: Option<Vec<LoopDescriptor>>,
}

impl DataFlowDescriptor {
Expand Down Expand Up @@ -98,16 +100,29 @@ impl DataFlowDescriptor {
// - each node has a unique id,
// - each port (input and output) is connected,
// - an input port is connected only once (i.e. it receives data from a single output port),
// - connected ports are declared with the same type.
// - connected ports are declared with the same type,
// - the dataflow, without the loops, is a DAG,
// - the end-to-end deadlines are correct,
// - the loops are valid.
fn validate(&self) -> ZFResult<()> {
let validator = DataflowValidator::try_from(self)?;
let mut validator = DataflowValidator::try_from(self)?;

validator.validate_ports()?;

validator.validate_dag()?;

if let Some(deadlines) = &self.deadlines {
deadlines.iter().try_for_each(|deadline| {
validator.validate_deadline(&deadline.from, &deadline.to)
})?
}

if let Some(loops) = &self.loops {
loops.iter().try_for_each(|ciclo| {
validator.validate_loop(&ciclo.ingress, &ciclo.egress, &ciclo.feedback_port)
})?
}

Ok(())
}
}
Expand Down
119 changes: 78 additions & 41 deletions zenoh-flow/src/model/dataflow/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::model::node::{OperatorRecord, SinkRecord, SourceRecord};
use crate::model::{InputDescriptor, OutputDescriptor};
use crate::serde::{Deserialize, Serialize};
use crate::types::{RuntimeId, ZFError, ZFResult};
use crate::PortType;
use crate::{NodeId, PortType};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::hash::{Hash, Hasher};
use uuid::Uuid;
Expand All @@ -29,9 +30,9 @@ use uuid::Uuid;
pub struct DataFlowRecord {
pub uuid: Uuid,
pub flow: String,
pub operators: Vec<OperatorRecord>,
pub sinks: Vec<SinkRecord>,
pub sources: Vec<SourceRecord>,
pub operators: HashMap<NodeId, OperatorRecord>,
pub sinks: HashMap<NodeId, SinkRecord>,
pub sources: HashMap<NodeId, SourceRecord>,
pub connectors: Vec<ZFConnectorRecord>,
pub links: Vec<LinkDescriptor>,
pub end_to_end_deadlines: Option<Vec<E2EDeadlineRecord>>,
Expand All @@ -57,23 +58,20 @@ impl DataFlowRecord {
}

pub fn find_node_runtime(&self, id: &str) -> Option<RuntimeId> {
match self.get_operator(id) {
Some(o) => Some(o.runtime),
None => match self.get_source(id) {
Some(s) => Some(s.runtime),
None => match self.get_sink(id) {
Some(s) => Some(s.runtime),
None => None,
},
match self.operators.get(id) {
Some(o) => Some(o.runtime.clone()),
None => match self.sources.get(id) {
Some(s) => Some(s.runtime.clone()),
None => self.sinks.get(id).map(|s| s.runtime.clone()),
},
}
}

pub fn find_node_output_type(&self, id: &str, output: &str) -> Option<PortType> {
log::trace!("find_node_output_type({:?},{:?})", id, output);
match self.get_operator(id) {
match self.operators.get(id) {
Some(o) => o.get_output_type(output),
None => match self.get_source(id) {
None => match self.sources.get(id) {
Some(s) => s.get_output_type(output),
None => None,
},
Expand All @@ -82,49 +80,37 @@ impl DataFlowRecord {

pub fn find_node_input_type(&self, id: &str, input: &str) -> Option<PortType> {
log::trace!("find_node_input_type({:?},{:?})", id, input);
match self.get_operator(id) {
match self.operators.get(id) {
Some(o) => o.get_input_type(input),
None => match self.get_sink(id) {
None => match self.sinks.get(id) {
Some(s) => s.get_input_type(input),
None => None,
},
}
}

fn get_operator(&self, id: &str) -> Option<OperatorRecord> {
self.operators
.iter()
.find(|&o| o.id.as_ref() == id)
.cloned()
}

fn get_source(&self, id: &str) -> Option<SourceRecord> {
self.sources.iter().find(|&o| o.id.as_ref() == id).cloned()
}

fn get_sink(&self, id: &str) -> Option<SinkRecord> {
self.sinks.iter().find(|&o| o.id.as_ref() == id).cloned()
}

fn add_links(&mut self, links: &[LinkDescriptor]) -> ZFResult<()> {
for l in links {
log::debug!("Adding link: {:?}…", l);
let from_runtime = match self.find_node_runtime(&l.from.node) {
Some(rt) => rt,
None => {
log::error!("Could not find runtime for: {:?}", &l.from.node);
return Err(ZFError::Uncompleted(format!(
"Unable to find runtime for {}",
&l.from.node
)))
)));
}
};

let to_runtime = match self.find_node_runtime(&l.to.node) {
Some(rt) => rt,
None => {
log::error!("Could not find runtime for: {:?}", &l.to.node);
return Err(ZFError::Uncompleted(format!(
"Unable to find runtime for {}",
&l.to.node
)))
)));
}
};

Expand Down Expand Up @@ -153,6 +139,7 @@ impl DataFlowRecord {
}

if from_runtime == to_runtime {
log::debug!("Adding link: {:?}… OK", l);
// link between nodes on the same runtime
self.links.push(l.clone())
} else {
Expand Down Expand Up @@ -251,7 +238,7 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
type Error = ZFError;

fn try_from(d: (DataFlowDescriptor, Uuid)) -> Result<Self, Self::Error> {
let (d, id) = d;
let (mut d, id) = d;

let deadlines = d
.deadlines
Expand All @@ -261,9 +248,9 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
let mut dfr = DataFlowRecord {
uuid: id,
flow: d.flow.clone(),
operators: Vec::new(),
sinks: Vec::new(),
sources: Vec::new(),
operators: HashMap::new(),
sinks: HashMap::new(),
sources: HashMap::new(),
connectors: Vec::new(),
links: Vec::new(),
end_to_end_deadlines: deadlines,
Expand All @@ -280,8 +267,9 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
configuration: o.configuration.clone(),
runtime: m,
deadline: o.deadline.as_ref().map(|period| period.to_duration()),
ciclo: None,
};
dfr.operators.push(or)
dfr.operators.insert(o.id.clone(), or);
}
None => {
return Err(ZFError::Uncompleted(format!(
Expand All @@ -303,7 +291,7 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
configuration: s.configuration.clone(),
runtime: m,
};
dfr.sources.push(sr)
dfr.sources.insert(s.id.clone(), sr);
}
None => {
return Err(ZFError::Uncompleted(format!(
Expand All @@ -319,13 +307,12 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
Some(m) => {
let sr = SinkRecord {
id: s.id.clone(),
// name: s.name.clone(),
input: s.input.clone(),
uri: s.uri.clone(),
configuration: s.configuration.clone(),
runtime: m,
};
dfr.sinks.push(sr)
dfr.sinks.insert(s.id.clone(), sr);
}
None => {
return Err(ZFError::Uncompleted(format!(
Expand All @@ -336,7 +323,57 @@ impl TryFrom<(DataFlowDescriptor, Uuid)> for DataFlowRecord {
}
}

// A Loop between two nodes is, in fact, a backward link.
//
// To add a link we first need to add an input to the Ingress and an output to the Egress
// (as these ports were not declared by the user). We also must perform these additions to
// the `validator` as it will check that everything is alright on the link we are about to
// create.
//
// We finally add the loop information to the Ingress and Egress.
if let Some(loops) = d.loops {
// Ciclo is the italian word for "loop" — we cannot use "loop" as it’s a reserved
// keyword.
for ciclo in loops {
// Update the ingress / egress.
let ingress = dfr
.operators
.get_mut(&ciclo.ingress)
.ok_or(ZFError::GenericError)?;
ingress.inputs.push(PortDescriptor {
port_id: ciclo.feedback_port.clone(),
port_type: ciclo.port_type.clone(),
});
ingress.ciclo = Some(ciclo.clone());

let egress = dfr
.operators
.get_mut(&ciclo.egress)
.ok_or(ZFError::GenericError)?;
egress.outputs.push(PortDescriptor {
port_id: ciclo.feedback_port.clone(),
port_type: ciclo.port_type.clone(),
});
egress.ciclo = Some(ciclo.clone());

d.links.push(LinkDescriptor {
from: OutputDescriptor {
node: ciclo.egress,
output: ciclo.feedback_port.clone(),
},
to: InputDescriptor {
node: ciclo.ingress,
input: ciclo.feedback_port,
},
size: None,
queueing_policy: None,
priority: None,
});
}
}

dfr.add_links(&d.links)?;

Ok(dfr)
}
}
Expand Down
Loading

0 comments on commit 8ee90bc

Please sign in to comment.