diff --git a/.github/workflows/base.yml b/.github/workflows/base.yml index edf9cfe..59ae8c7 100644 --- a/.github/workflows/base.yml +++ b/.github/workflows/base.yml @@ -26,7 +26,7 @@ jobs: with: node-version: 16 - test: + check_fmt: name: Fmt Check runs-on: ubuntu-latest steps: @@ -41,6 +41,21 @@ jobs: command: fmt args: --check + test: + name: Unit test & Doc test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - uses: actions-rs/cargo@v1 + with: + command: test + args: --all + # test: # name: Test Suite # runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 2aad589..336c341 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = ["."] tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread"] } log = "0.4" env_logger = "0.10.1" +async-trait = "0.1.83" [dev-dependencies] simplelog = "0.12" diff --git a/src/connection/out_channel.rs b/src/connection/out_channel.rs index 7d7d5a6..2c67fac 100644 --- a/src/connection/out_channel.rs +++ b/src/connection/out_channel.rs @@ -45,13 +45,9 @@ impl OutChannels { } /// # Output Channel -/// Wrapper of senders of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will +/// Wrapper of senderrs of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will /// decide the inner type of channel when building the graph. -/// ## Implements -/// - `blocking_send`: sends the message, blocked if no capacity left in the channel. Returns `Ok()` -/// if message sent; returns `Err(SendErr)` if error occurs. -/// - `send`: sends the message, waiting until there is capacity asynchronously. Returns `Ok()` -/// if message sent; returns `Err(SendErr)` if error occurs. +/// Learn more about [Tokio Channels](https://tokio.rs/tokio/tutorial/channels). enum OutChannel { /// Sender of a `tokio::sync::mpsc` channel. Mpsc(mpsc::Sender), diff --git a/src/graph/mod.rs b/src/graph/mod.rs deleted file mode 100644 index 492bc84..0000000 --- a/src/graph/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod node; diff --git a/src/graph/node.rs b/src/graph/node.rs deleted file mode 100644 index c0b6763..0000000 --- a/src/graph/node.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[derive(Debug, Hash, PartialEq, Eq, Clone)] -pub struct NodeId(usize); diff --git a/src/lib.rs b/src/lib.rs index 5ca068a..4116d78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,15 @@ -mod connection; -pub mod graph; +pub mod connection; +pub mod node; +pub mod utils; + +pub use connection::{ + in_channel::{InChannels, RecvErr}, + information_packet::Content, + out_channel::{OutChannels, SendErr}, +}; +pub use node::{ + action::{Action, EmptyAction}, + default_node::DefaultNode, + node::*, +}; +pub use utils::{env::EnvVar, output::Output}; diff --git a/src/node/action.rs b/src/node/action.rs new file mode 100644 index 0000000..ad2535d --- /dev/null +++ b/src/node/action.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::{ + connection::{in_channel::InChannels, out_channel::OutChannels}, + utils::{env::EnvVar, output::Output}, +}; + +/// Node specific behavior +/// +/// [`Action`] stores the specific execution logic of a task. +/// +/// # Example +/// An implementation of [`Action`]: `HelloAction`, having private +/// fields `statement` and `repeat`. +/// +/// ```rust +/// use std::sync::Arc; +/// use dagrs::{Action, EnvVar, Output, InChannels, OutChannels}; +/// use async_trait::async_trait; +/// +/// struct HelloAction{ +/// statement: String, +/// repeat: usize, +/// } +/// +/// #[async_trait] +/// impl Action for HelloAction{ +/// async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc) -> Output{ +/// for i in 0..self.repeat { +/// println!("{}",self.statement); +/// } +/// Output::empty() +/// } +/// } +/// +/// let hello=HelloAction { +/// statement: "hello world!".to_string(), +/// repeat: 10 +/// }; +/// +/// ``` +#[async_trait] +pub trait Action: Send + Sync { + async fn run( + &self, + in_channels: &mut InChannels, + out_channels: &OutChannels, + env: Arc, + ) -> Output; +} + +/// An empty implementaion of [`Action`]. +/// +/// Used as a placeholder when creating a `Node` without `Action`. +pub struct EmptyAction; +#[async_trait] +impl Action for EmptyAction { + async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc) -> Output { + Output::Out(None) + } +} diff --git a/src/node/default_node.rs b/src/node/default_node.rs new file mode 100644 index 0000000..6e483fe --- /dev/null +++ b/src/node/default_node.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use crate::{ + connection::{in_channel::InChannels, out_channel::OutChannels}, + utils::{env::EnvVar, output::Output}, +}; + +use super::{ + action::{Action, EmptyAction}, + node::{Node, NodeId, NodeName, NodeTable}, +}; + +/// # Default node type +/// +/// [`DefaultNode`] is a default implementation of the [`Node`] trait. Users can use this node +/// type to build tasks to meet most needs. +/// +/// ## Create a `DefaultNode`: +/// - use the method `new`. Required attributes: node's name; [`NodeTable`](for id allocation). +/// +/// ```rust +/// use dagrs::{NodeName, NodeTable, DefaultNode}; +/// +/// let node_name = "Node X"; +/// let mut node_table = NodeTable::new(); +/// let mut node = DefaultNode::new( +/// NodeName::from(node_name), +/// &mut node_table, +/// ); +/// ``` +/// +/// - use the method `with_action`. Required attributes: node's name; [`NodeTable`](for id allocation); +/// execution logic [`Action`]. +/// +/// ```rust +/// use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction}; +/// +/// let node_name = "Node X"; +/// let mut node_table = NodeTable::new(); +/// let mut node = DefaultNode::with_action( +/// NodeName::from(node_name), +/// Box::new(EmptyAction), +/// &mut node_table, +/// ); +/// ``` +pub struct DefaultNode { + id: NodeId, + name: NodeName, + action: Box, + in_channels: InChannels, + out_channels: OutChannels, +} + +impl Node for DefaultNode { + fn id(&self) -> NodeId { + self.id.clone() + } + + fn name(&self) -> NodeName { + self.name.clone() + } + + fn input_channels(&mut self) -> &mut InChannels { + &mut self.in_channels + } + + fn output_channels(&mut self) -> &mut OutChannels { + &mut self.out_channels + } + + fn run(&mut self, env: Arc) -> Output { + tokio::runtime::Runtime::new().unwrap().block_on(async { + self.action + .run(&mut self.in_channels, &self.out_channels, env) + .await + }) + } +} + +impl DefaultNode { + pub fn new(name: NodeName, node_table: &mut NodeTable) -> Self { + Self { + id: node_table.alloc_id_for(&name), + name, + action: Box::new(EmptyAction), + in_channels: InChannels::default(), + out_channels: OutChannels::default(), + } + } + + pub fn with_action( + name: NodeName, + action: Box, + node_table: &mut NodeTable, + ) -> Self { + Self { + id: node_table.alloc_id_for(&name), + name, + action, + in_channels: InChannels::default(), + out_channels: OutChannels::default(), + } + } +} + +#[cfg(test)] +mod test_default_node { + + use std::sync::Arc; + + use crate::{Content, EnvVar, InChannels, Node, NodeName, NodeTable, OutChannels, Output}; + + use super::{Action, DefaultNode}; + + use async_trait::async_trait; + + /// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world". + #[derive(Default)] + pub struct HelloAction; + #[async_trait] + impl Action for HelloAction { + async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc) -> Output { + Output::Out(Some(Content::new("Hello world".to_string()))) + } + } + + impl HelloAction { + pub fn new() -> Box { + Box::new(Self::default()) + } + } + + /// Test for create a default node. + /// + /// Step 1: create a [`DefaultNode`] with [`HelloAction`]. + /// + /// Step 2: run the node and verify its output. + #[test] + fn create_default_node() { + let node_name = "Test Node"; + + let mut node_table = NodeTable::new(); + let mut node = DefaultNode::with_action( + NodeName::from(node_name), + HelloAction::new(), + &mut node_table, + ); + + // Check if node table has key-value pair (node.name, node.id) + assert_eq!(node_table.get(node_name).unwrap(), &node.id()); + + let env = Arc::new(EnvVar::new(node_table)); + let out = node.run(env).get_out().unwrap(); + let out: &String = out.get().unwrap(); + assert_eq!(out, "Hello world"); + } +} diff --git a/src/node/id_allocate.rs b/src/node/id_allocate.rs new file mode 100644 index 0000000..42f5f8f --- /dev/null +++ b/src/node/id_allocate.rs @@ -0,0 +1,29 @@ +use std::sync::atomic::AtomicUsize; + +use super::node::NodeId; + +/// IDAllocator for Node. +struct IDAllocator { + id: AtomicUsize, +} + +impl IDAllocator { + fn alloc(&self) -> NodeId { + let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) { + panic!("Too many tasks.") + } else { + NodeId(origin) + } + } +} + +/// The global task uniquely identifies an instance of the allocator. +static ID_ALLOCATOR: IDAllocator = IDAllocator { + id: AtomicUsize::new(1), +}; + +/// Assign node's id. +pub(crate) fn alloc_id() -> NodeId { + ID_ALLOCATOR.alloc() +} diff --git a/src/node/mod.rs b/src/node/mod.rs new file mode 100644 index 0000000..27aa930 --- /dev/null +++ b/src/node/mod.rs @@ -0,0 +1,4 @@ +pub mod action; +pub mod default_node; +pub mod id_allocate; +pub mod node; diff --git a/src/node/node.rs b/src/node/node.rs new file mode 100644 index 0000000..405fb11 --- /dev/null +++ b/src/node/node.rs @@ -0,0 +1,74 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::{ + connection::{in_channel::InChannels, out_channel::OutChannels}, + utils::{env::EnvVar, output::Output}, +}; + +use super::id_allocate::alloc_id; + +///# The [`Node`] trait +/// +/// Nodes are the basic scheduling units of Graph. They can be identified by +/// a globally assigned [`NodeId`] and a user-provided name. +/// +/// Nodes can communicate with others asynchronously through [`InChannels`] and [`OutChannels`]. +/// +/// In addition to the above properties, users can also customize some other attributes. +pub trait Node: Send + Sync { + /// id is the unique identifier of each node, it will be assigned by the [`NodeTable`] + /// when creating a new node, you can find this node through this identifier. + fn id(&self) -> NodeId; + /// The node's name. + fn name(&self) -> NodeName; + /// Input Channels of this node. + fn input_channels(&mut self) -> &mut InChannels; + /// Output Channels of this node. + fn output_channels(&mut self) -> &mut OutChannels; + /// Execute a run of this node. + fn run(&mut self, env: Arc) -> Output; +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub struct NodeId(pub(crate) usize); + +pub type NodeName = String; + +/// [NodeTable]: a mapping from [Node]'s name to [NodeId]. +#[derive(Default)] +pub struct NodeTable(pub(crate) HashMap); + +/// [NodeTable]'s name in [`EnvVar`]. +pub const NODE_TABLE_STR: &str = "node_table"; + +impl NodeTable { + /// Alloc a new [NodeId] for a [Node]. + /// + /// If there is a Node requesting for an ID with a duplicate name, + /// the older one's info will be overwritten. + pub fn alloc_id_for(&mut self, name: &str) -> NodeId { + let id = alloc_id(); + log::debug!("alloc id {:?} for {:?}", id, name); + + if let Some(v) = self.0.insert(name.to_string(), id.clone()) { + log::warn!("Node {} is already allocated with id {:?}.", name, v); + }; + id + } + + pub fn get(&self, name: &str) -> Option<&NodeId> { + self.0.get(name) + } + + pub fn new() -> Self { + Self::default() + } +} + +impl EnvVar { + /// Get a [`Node`]'s [`NodeId`] by providing its name. + pub fn get_node_id(&self, node_name: &str) -> Option<&NodeId> { + let node_table: &NodeTable = self.get_ref(NODE_TABLE_STR).unwrap(); + node_table.get(node_name) + } +} diff --git a/src/utils/env.rs b/src/utils/env.rs new file mode 100644 index 0000000..8370aab --- /dev/null +++ b/src/utils/env.rs @@ -0,0 +1,67 @@ +use std::collections::HashMap; + +use crate::{ + connection::information_packet::Content, + node::node::{NodeTable, NODE_TABLE_STR}, +}; + +pub type Variable = Content; + +/// # Environment variable. +/// +/// When multiple nodes are running, they may need to share the same data or read +/// the same configuration information. Environment variables can meet this requirement. +/// Before all nodes run, the user builds a [`EnvVar`] and sets all the environment +/// variables. One [`EnvVar`] corresponds to one dag. All nodes in a job can +/// be shared and immutable at runtime. environment variables. +/// +/// Variables that [`EnvVar`] should have: +/// - [NodeTable] : a mapping from node's name to `NodeId`. +/// During the runtime of a `Graph`, [`NodeTable`] allows +/// each `Node` to look up the id of a specific node by its name. +#[derive(Debug)] +pub struct EnvVar { + variables: HashMap, +} + +impl EnvVar { + /// Allocate a new [`EnvVar`]. + pub fn new(node_table: NodeTable) -> Self { + let mut env = Self { + variables: HashMap::default(), + }; + env.set(NODE_TABLE_STR, node_table); + env + } + + #[allow(unused)] + /// Set a global variables. + /// + /// # Example + /// ```rust + /// use dagrs::{EnvVar, NodeTable}; + /// + /// # let mut env = EnvVar::new(NodeTable::default()); + /// env.set("Hello", "World".to_string()); + /// ``` + pub fn set(&mut self, name: &str, var: H) { + let mut v = Variable::new(var); + self.variables.insert(name.to_owned(), v); + } + + /// Get environment variables through keys of type &str. + /// + /// Note: This method will clone the value. To avoid cloning, use `get_ref`. + pub fn get(&self, name: &str) -> Option { + self.get_ref(name).cloned() + } + + /// Get environment variables through keys of type &str. + pub fn get_ref(&self, name: &str) -> Option<&H> { + if let Some(content) = self.variables.get(name) { + content.get() + } else { + None + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..09f6425 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod env; +pub mod output; diff --git a/src/utils/output.rs b/src/utils/output.rs new file mode 100644 index 0000000..4ac4668 --- /dev/null +++ b/src/utils/output.rs @@ -0,0 +1,89 @@ +//! Node output +//! +//! [`Output`] represents the output of the Node respectively. +//! +//! Users should consider the output results of the Node when defining the specific +//! behavior of the Node. The input results may be: normal output, no output, or Node +//! execution error message. +//! It should be noted that the content stored in [`Output`] must implement the [`Clone`] trait. +//! +//! # Example +//! In general, a Node may produce output or no output: +//! ```rust +//! use dagrs::Output; +//! let out=Output::new(10); +//! let non_out=Output::empty(); +//! ``` +//! In some special cases, when a predictable error occurs in the execution of a Node's +//! specific behavior, the user can choose to return the error message as the output of +//! the Node. Of course, this will cause subsequent Nodes to abandon execution. +//! +//! ```rust +//! use dagrs::Output; +//! use dagrs::Content; +//! let err_out = Output::Err("some error messages!".to_string()); + +use crate::connection::information_packet::Content; + +/// [`Output`] represents the output of a node. Different from information packet (`Content`, +/// used to communicate with other Nodes), `Output` carries the information that `Node` +/// needs to pass to the `Graph`. +#[derive(Clone, Debug)] +pub enum Output { + Out(Option), + Err(String), + ErrWithExitCode(Option, Option), +} + +impl Output { + /// Construct a new [`Output`]. + /// + /// Since the return value may be transferred between threads, + /// [`Send`], [`Sync`] is needed. + pub fn new(val: H) -> Self { + Self::Out(Some(Content::new(val))) + } + + /// Construct an empty [`Output`]. + pub fn empty() -> Self { + Self::Out(None) + } + + /// Construct an [`Output`]` with an error message. + pub fn error(msg: String) -> Self { + Self::Err(msg) + } + + /// Construct an [`Output`]` with an exit code and an optional error message. + pub fn error_with_exit_code(code: Option, msg: Option) -> Self { + Self::ErrWithExitCode(code, msg) + } + + /// Determine whether [`Output`] stores error information. + pub(crate) fn is_err(&self) -> bool { + match self { + Self::Err(_) | Self::ErrWithExitCode(_, _) => true, + Self::Out(_) => false, + } + } + + /// Get the contents of [`Output`]. + pub(crate) fn get_out(&self) -> Option { + match self { + Self::Out(ref out) => out.clone(), + Self::Err(_) | Self::ErrWithExitCode(_, _) => None, + } + } + + /// Get error information stored in [`Output`]. + pub(crate) fn get_err(&self) -> Option { + match self { + Self::Out(_) => None, + Self::Err(err) => Some(err.to_string()), + Self::ErrWithExitCode(code, _) => { + let error_code = code.map_or("".to_string(), |v| v.to_string()); + Some(format!("code: {error_code}")) + } + } + } +}