diff --git a/src/cli/submit.rs b/src/cli/submit.rs index a332172..f9a8278 100644 --- a/src/cli/submit.rs +++ b/src/cli/submit.rs @@ -86,7 +86,7 @@ pub fn submit( let status = project.separate_by_status(action, matching_directories)?; let groups = project.separate_into_groups(action, status.eligible)?; - if action.group.submit_whole { + if action.group.submit_whole() { let whole_groups = project.separate_into_groups(action, project.state().list_directories())?; for group in &groups { diff --git a/src/lib.rs b/src/lib.rs index b57ffcf..599e461 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,12 +126,27 @@ pub enum Error { #[error("Cannot compare {0} and {1} while checking directory '{2}'.")] CannotCompareInclude(Value, Value, PathBuf), - #[error("Action at index {0} is missing name.")] + #[error("Action at index {0} is missing `name`.")] ActionMissingName(usize), - #[error("Action '{0}' is missing command.")] + #[error("Action '{0}' is missing `command`.")] ActionMissingCommand(String), + #[error("Default action must not set `from`.")] + DefaultActionSetsFrom(), + + #[error("Action '{0}' set in `from` not found.")] + FromActionNotFound(String), + + #[error("Cannot resolve recursive `from={0}`.")] + RecursiveFrom(String), + + #[error("Duplicate actions '{0}' must have the same `products`.")] + DuplicateActionsDifferentProducts(String), + + #[error("Duplicate actions '{0}' must have the same `previous_actions`.")] + DuplicateActionsDifferentPreviousActions(String), + // submission errors #[error("Error encountered while executing action '{0}': {1}.")] ExecuteAction(String, String), diff --git a/src/project.rs b/src/project.rs index 9b2d513..1f8ebfe 100644 --- a/src/project.rs +++ b/src/project.rs @@ -184,7 +184,7 @@ impl Project { 'outer: for name in directories { if let Some(value) = self.state.values().get(&name) { - for (include, comparison, expected) in &action.group.include { + for (include, comparison, expected) in action.group.include() { let actual = value .pointer(include) .ok_or_else(|| Error::JSONPointerNotFound(name.clone(), include.clone()))?; @@ -302,7 +302,7 @@ impl Project { .ok_or_else(|| Error::DirectoryNotFound(directory_name.clone()))?; let mut sort_key = Vec::new(); - for pointer in &action.group.sort_by { + for pointer in action.group.sort_by() { let element = value.pointer(pointer).ok_or_else(|| { Error::JSONPointerNotFound(directory_name.clone(), pointer.clone()) })?; @@ -313,8 +313,8 @@ impl Project { // Sort by key when there are keys to sort by. let mut result = Vec::new(); - if action.group.sort_by.is_empty() { - if action.group.reverse_sort { + if action.group.sort_by().is_empty() { + if action.group.reverse_sort() { directories.reverse(); } result.push(directories); @@ -324,13 +324,13 @@ impl Project { .expect("Valid JSON comparison") }); - if action.group.reverse_sort { + if action.group.reverse_sort() { directories.reverse(); } // Split by the sort key when requested. #[allow(clippy::redundant_closure_for_method_calls)] - if action.group.split_by_sort_key { + if action.group.split_by_sort_key() { result.extend( directories .chunk_by(|a, b| { @@ -465,10 +465,8 @@ previous_actions = ["two"] ); let mut action = project.workflow.action[1].clone(); - action - .group - .include - .push(("/i".into(), Comparison::GreaterThan, Value::from(4))); + let include = &mut action.group.include.as_mut().unwrap(); + include.push(("/i".into(), Comparison::GreaterThan, Value::from(4))); assert_eq!( project .find_matching_directories(&action, all_directories.clone()) @@ -539,7 +537,7 @@ previous_actions = ["two"] reversed.reverse(); let mut action = project.workflow.action[0].clone(); - action.group.reverse_sort = true; + action.group.reverse_sort = Some(true); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); @@ -578,7 +576,7 @@ previous_actions = ["two"] all_directories.sort_unstable(); let mut action = project.workflow.action[0].clone(); - action.group.sort_by = vec!["/j".to_string()]; + action.group.sort_by = Some(vec!["/j".to_string()]); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); @@ -606,8 +604,8 @@ previous_actions = ["two"] all_directories.sort_unstable(); let mut action = project.workflow.action[0].clone(); - action.group.sort_by = vec!["/j".to_string()]; - action.group.split_by_sort_key = true; + action.group.sort_by = Some(vec!["/j".to_string()]); + action.group.split_by_sort_key = Some(true); let groups = project .separate_into_groups(&action, all_directories.clone()) .unwrap(); diff --git a/src/workflow.rs b/src/workflow.rs index 90d0c22..fc8b1bd 100644 --- a/src/workflow.rs +++ b/src/workflow.rs @@ -181,26 +181,26 @@ pub enum Comparison { pub struct Group { /// Include members of the group where all JSON elements match the given values. #[serde(default)] - pub include: Vec<(String, Comparison, serde_json::Value)>, + pub include: Option>, /// Sort by the given set of JSON elements. #[serde(default)] - pub sort_by: Vec, + pub sort_by: Option>, /// Split into groups by the sort keys. #[serde(default)] - pub split_by_sort_key: bool, + pub split_by_sort_key: Option, /// Reverse the sort. #[serde(default)] - pub reverse_sort: bool, + pub reverse_sort: Option, /// Maximum size of the submitted group. pub maximum_size: Option, /// Submit only whole groups when true. #[serde(default)] - pub submit_whole: bool, + pub submit_whole: Option, } /// Resource cost to execute an action. @@ -463,9 +463,7 @@ impl Action { } self.resources.resolve(&template.resources); - - // TODO: Make all group members options - // self.group.resolve(template.group); + self.group.resolve(&template.group); // Populate each action's submit_options with the global ones. for (name, template_options) in &template.submit_options { @@ -495,6 +493,76 @@ impl Action { } } +impl Group { + /// Get the group's include. + pub fn include(&self) -> &[(String, Comparison, serde_json::Value)] { + if let Some(include) = self.include.as_ref() { + include + } else { + &[] + } + } + + /// Get the group's sort_by. + pub fn sort_by(&self) -> &[String] { + if let Some(sort_by) = self.sort_by.as_ref() { + sort_by + } else { + &[] + } + } + + /// Get the group's split_by_sort_key. + pub fn split_by_sort_key(&self) -> bool { + if let Some(split_by_sort_key) = self.split_by_sort_key { + split_by_sort_key + } else { + false + } + } + + /// Get the group's reverse_sort. + pub fn reverse_sort(&self) -> bool { + if let Some(reverse_sort) = self.reverse_sort { + reverse_sort + } else { + false + } + } + + /// Get the group's submit_whole. + pub fn submit_whole(&self) -> bool { + if let Some(submit_whole) = self.submit_whole { + submit_whole + } else { + false + } + } + + /// Resolve omitted keys from the given template. + fn resolve(&mut self, template: &Group) { + if self.include.is_none() { + self.include = template.include.clone(); + } + if self.sort_by.is_none() { + self.sort_by = template.sort_by.clone(); + } + if self.split_by_sort_key.is_none() { + self.split_by_sort_key = template.split_by_sort_key; + } + if self.reverse_sort.is_none() { + self.reverse_sort = template.reverse_sort; + } + if self.maximum_size.is_none() { + self.maximum_size = template.maximum_size; + } + if self.submit_whole.is_none() { + self.submit_whole = template.submit_whole; + } + } + +} + impl Workflow { /// Open the workflow /// @@ -550,8 +618,26 @@ impl Workflow { fn validate_and_set_defaults(mut self) -> Result { let mut action_names = HashSet::with_capacity(self.action.len()); + if self.default.action.from.is_some() { + return Err(Error::DefaultActionSetsFrom()); + } + + let source_actions = self.action.clone(); + for (action_idx, action) in self.action.iter_mut().enumerate() { - //TODO: implement from, validate no recursive from's + + if let Some(from) = &action.from { + if let Some(action_index) = source_actions.iter().position(|a| a.name() == from) { + if let Some(recursive_from) = &source_actions[action_index].from { + return Err(Error::RecursiveFrom(recursive_from.clone())); + } + + action.resolve(&source_actions[action_index]); + } else { + return Err(Error::FromActionNotFound(from.clone())); + } + } + action.resolve(&self.default.action); action_names.insert(action.name().to_string()); @@ -565,7 +651,7 @@ impl Workflow { } // Warn for apparently invalid sort_by. - for pointer in &action.group.sort_by { + for pointer in action.group.sort_by() { if !pointer.is_empty() && !pointer.starts_with('/') { warn!("The JSON pointer '{pointer}' does not appear valid. Did you mean '/{pointer}'?"); } @@ -581,6 +667,15 @@ impl Workflow { )); } } + + if let Some(first_action) = self.action_by_name(&action.name()) { + if action.previous_actions != first_action.previous_actions { + return Err(Error::DuplicateActionsDifferentPreviousActions(action.name().to_string())); + } + if action.products != first_action.products { + return Err(Error::DuplicateActionsDifferentProducts(action.name().to_string())); + } + } } Ok(self) @@ -814,12 +909,17 @@ command = "c" ); assert!(action.submit_options.is_empty()); - assert!(action.group.include.is_empty()); - assert!(action.group.sort_by.is_empty()); - assert!(!action.group.split_by_sort_key); + assert_eq!(action.group.include, None); + assert!(action.group.include().is_empty()); + assert_eq!(action.group.sort_by, None); + assert!(action.group.sort_by().is_empty()); + assert_eq!(action.group.split_by_sort_key, None); + assert!(!action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, None); - assert!(!action.group.submit_whole); - assert!(!action.group.reverse_sort); + assert_eq!(action.group.submit_whole, None); + assert!(!action.group.submit_whole()); + assert_eq!(action.group.reverse_sort, None); + assert!(!action.group.reverse_sort()); } #[test] @@ -833,7 +933,7 @@ command = "c" let result = Workflow::open_str(temp.path(), workflow); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("missing name")); + assert!(result.unwrap_err().to_string().contains("missing `name`")); } #[test] @@ -847,7 +947,7 @@ name = "a" let result = Workflow::open_str(temp.path(), workflow); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("missing command")); + assert!(result.unwrap_err().to_string().contains("missing `command`")); } #[test] @@ -867,12 +967,12 @@ command = "c" let action = workflow.action.first().unwrap(); assert!(action.submit_options.is_empty()); - assert!(action.group.include.is_empty()); - assert!(action.group.sort_by.is_empty()); - assert!(!action.group.split_by_sort_key); + assert!(action.group.include().is_empty()); + assert!(action.group.sort_by().is_empty()); + assert!(!action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, None); - assert!(!action.group.submit_whole); - assert!(!action.group.reverse_sort); + assert!(!action.group.submit_whole()); + assert!(!action.group.reverse_sort()); } #[test] @@ -890,9 +990,51 @@ command = "d" "#; let result = Workflow::open_str(temp.path(), workflow); assert!(result.is_ok()); + } - // TODO: Test that duplicates with different products fail - // TODO: Test that duplicates with different previous actions fail + #[test] + #[parallel] + fn action_duplicate_different_products() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "b" +command = "c" +products = ["e"] + +[[action]] +name = "b" +command = "d" +products = ["b"] +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(matches!(result, Err(Error::DuplicateActionsDifferentProducts(_)))); + + assert!(result.unwrap_err().to_string().contains("must have the same `products`")); + } + + #[test] + #[parallel] + fn action_duplicate_different_previous_actions() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "b" +command = "c" + +[[action]] +name = "b" +command = "d" +previous_actions = ["a"] + +[[action]] +name = "a" +command = "e" +"#; + let result = Workflow::open_str(temp.path(), workflow); + assert!(matches!(result, Err(Error::DuplicateActionsDifferentPreviousActions(_)))); + + assert!(result.unwrap_err().to_string().contains("must have the same `previous_actions`")); } #[test] @@ -1118,7 +1260,7 @@ reverse_sort = true let action = workflow.action.first().unwrap(); assert_eq!( - action.group.include, + action.group.include(), vec![ ( "/d".to_string(), @@ -1147,11 +1289,11 @@ reverse_sort = true ) ] ); - assert_eq!(action.group.sort_by, vec![String::from("/sort")]); - assert!(action.group.split_by_sort_key); + assert_eq!(action.group.sort_by(), vec![String::from("/sort")]); + assert!(action.group.split_by_sort_key()); assert_eq!(action.group.maximum_size, Some(10)); - assert!(action.group.submit_whole); - assert!(action.group.reverse_sort); + assert!(action.group.submit_whole()); + assert!(action.group.reverse_sort()); } #[test] @@ -1342,7 +1484,7 @@ from = "a" let result = Workflow::open_str(temp.path(), workflow); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("asdf")); + assert!(result.unwrap_err().to_string().contains("must not set `from`")); } #[test] @@ -1368,7 +1510,12 @@ from = "a" assert_eq!(action.resources.gpus_per_process, None); assert_eq!(action.resources.walltime, None); assert!(action.submit_options.is_empty()); - // TODO: Group + assert_eq!(action.group.include, None); + assert_eq!(action.group.sort_by, None); + assert_eq!(action.group.split_by_sort_key, None); + assert_eq!(action.group.reverse_sort, None); + assert_eq!(action.group.maximum_size, None); + assert_eq!(action.group.submit_whole, None); assert_eq!(action.from, None); } @@ -1421,19 +1568,268 @@ name = "d" assert_eq!(action.resources.gpus_per_process, Some(4)); assert_eq!(action.resources.walltime(), Walltime::PerSubmission(Duration::new(true, 0, 1, 0).unwrap())); assert!(action.submit_options.is_empty()); - assert_eq!(action.group.include, vec![("/f".into(), Comparison::EqualTo, serde_json::Value::from(5))]); - assert_eq!(action.group.sort_by, vec!["/g"]); - assert!(action.group.split_by_sort_key); - assert!(action.group.reverse_sort); + assert_eq!(action.group.include(), vec![("/f".into(), Comparison::EqualTo, serde_json::Value::from(5))]); + assert_eq!(action.group.sort_by(), vec!["/g"]); + assert!(action.group.split_by_sort_key()); + assert!(action.group.reverse_sort()); assert_eq!(action.group.maximum_size, Some(6)); - assert!(action.group.submit_whole); + assert!(action.group.submit_whole()); assert_eq!(action.from, None); } -// TODO: Test action from all keys -// TODO: Test action override from -// TODO: Test action override default -// TODO: Test action override mixed + #[test] + #[parallel] + fn action_override_default() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +name = "a" +command = "b" +launchers = ["c"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "equal_to", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +name = "aa" +command = "bb" +launchers = ["cc"] +previous_actions = ["dd"] +products = ["ee"] + +[action.resources] +processes.per_directory = 4 +threads_per_process = 6 +gpus_per_process = 8 +walltime.per_submission = "00:00:02" + +# submit_options is tested above + +[action.group] +include = [["/ff", "equal_to", 10]] +sort_by = ["/gg"] +split_by_sort_key = false +reverse_sort = false +maximum_size = 12 +submit_whole = false + +[[action]] +name = "dd" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 2); + + let action = workflow.action.first().unwrap(); + assert_eq!(action.name(), "aa"); + assert_eq!(action.command(), "bb"); + assert_eq!(action.launchers(), vec!["cc"]); + assert_eq!(action.previous_actions(), vec!["dd"]); + assert_eq!(action.products(), vec!["ee"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(4)); + assert_eq!(action.resources.threads_per_process, Some(6)); + assert_eq!(action.resources.gpus_per_process, Some(8)); + assert_eq!(action.resources.walltime(), Walltime::PerSubmission(Duration::new(true, 0, 2, 0).unwrap())); + assert!(action.submit_options.is_empty()); + assert_eq!(action.group.include(), vec![("/ff".into(), Comparison::EqualTo, serde_json::Value::from(10))]); + assert_eq!(action.group.sort_by(), vec!["/gg"]); + assert!(!action.group.split_by_sort_key()); + assert!(!action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(12)); + assert!(!action.group.submit_whole()); + assert_eq!(action.from, None); + } + + #[test] + #[parallel] + fn action_from() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "a" +command = "b" +launchers = ["c"] +previous_actions = ["d"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "equal_to", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +from = "a" + +[[action]] +name = "d" +command = "e" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 3); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "a"); + assert_eq!(action.command(), "b"); + assert_eq!(action.launchers(), vec!["c"]); + assert_eq!(action.previous_actions(), vec!["d"]); + assert_eq!(action.products(), vec!["e"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(2)); + assert_eq!(action.resources.threads_per_process, Some(3)); + assert_eq!(action.resources.gpus_per_process, Some(4)); + assert_eq!(action.resources.walltime(), Walltime::PerSubmission(Duration::new(true, 0, 1, 0).unwrap())); + assert!(action.submit_options.is_empty()); + assert_eq!(action.group.include(), vec![("/f".into(), Comparison::EqualTo, serde_json::Value::from(5))]); + assert_eq!(action.group.sort_by(), vec!["/g"]); + assert!(action.group.split_by_sort_key()); + assert!(action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(6)); + assert!(action.group.submit_whole()); + assert_eq!(action.from, Some("a".into())); + } + + #[test] + #[parallel] + fn action_override_from() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[[action]] +name = "a" +command = "b" +launchers = ["c"] +previous_actions = ["d"] +products = ["e"] + +[default.action.resources] +processes.per_directory = 2 +threads_per_process = 3 +gpus_per_process = 4 +walltime.per_submission = "00:00:01" + +# submit_options is tested above + +[default.action.group] +include = [["/f", "equal_to", 5]] +sort_by = ["/g"] +split_by_sort_key = true +reverse_sort = true +maximum_size = 6 +submit_whole = true + +[[action]] +from = "a" + +name = "aa" +command = "bb" +launchers = ["cc"] +previous_actions = ["dd"] +products = ["ee"] + +[action.resources] +processes.per_directory = 4 +threads_per_process = 6 +gpus_per_process = 8 +walltime.per_submission = "00:00:02" + +# submit_options is tested above + +[action.group] +include = [["/ff", "equal_to", 10]] +sort_by = ["/gg"] +split_by_sort_key = false +reverse_sort = false +maximum_size = 12 +submit_whole = false + +[[action]] +name = "dd" +command = "ee" + +[[action]] +name = "d" +command = "e" +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 4); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "aa"); + assert_eq!(action.command(), "bb"); + assert_eq!(action.launchers(), vec!["cc"]); + assert_eq!(action.previous_actions(), vec!["dd"]); + assert_eq!(action.products(), vec!["ee"]); + assert_eq!(action.resources.processes(), Processes::PerDirectory(4)); + assert_eq!(action.resources.threads_per_process, Some(6)); + assert_eq!(action.resources.gpus_per_process, Some(8)); + assert_eq!(action.resources.walltime(), Walltime::PerSubmission(Duration::new(true, 0, 2, 0).unwrap())); + assert!(action.submit_options.is_empty()); + assert_eq!(action.group.include(), vec![("/ff".into(), Comparison::EqualTo, serde_json::Value::from(10))]); + assert_eq!(action.group.sort_by(), vec!["/gg"]); + assert!(!action.group.split_by_sort_key()); + assert!(!action.group.reverse_sort()); + assert_eq!(action.group.maximum_size, Some(12)); + assert!(!action.group.submit_whole()); + assert_eq!(action.from, Some("a".into())); + } + + #[test] + #[parallel] + fn action_override_mixed() { + let temp = TempDir::new().unwrap(); + let workflow = r#" +[default.action] +resources.threads_per_process = 2 + +[[action]] +name = "a" +command = "b" +resources.gpus_per_process = 4 + +[[action]] +from = "a" +resources.processes.per_directory = 8 +"#; + + let workflow = Workflow::open_str(temp.path(), workflow).unwrap(); + + assert_eq!(workflow.action.len(), 2); + + let action = &workflow.action[1]; + assert_eq!(action.name(), "a"); + assert_eq!(action.command(), "b"); + assert_eq!(action.resources.processes(), Processes::PerDirectory(8)); + assert_eq!(action.resources.threads_per_process, Some(2)); + assert_eq!(action.resources.gpus_per_process, Some(4)); + } #[test] #[parallel]