Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove excess task_name/tid clones for ExecState #47

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/engine/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ impl Dag {
/// - Create a graph from task dependencies.
/// - Generate task heart sequence according to topological sorting of graph.
pub(crate) fn init(&mut self) -> Result<(), DagError> {
self.execute_states.reserve(self.tasks.len());
self.tasks.values().for_each(|task| {
self.execute_states.insert(
task.id(),
Arc::new(ExecState::new(task.id(), task.name().to_string())),
);
self.execute_states
.insert(task.id(), Arc::new(ExecState::new()));
});

self.create_graph()?;
Expand Down
15 changes: 5 additions & 10 deletions src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,18 @@ impl Engine {
/// Given a Dag name, execute this Dag.
/// Returns true if the given Dag executes successfully, otherwise false.
pub fn run_dag(&mut self, name: &str) -> bool {
if !self.dags.contains_key(name) {
if let Some(dag) = self.dags.get(name) {
self.runtime.block_on(dag.run())
} else {
error!("No job named '{}'", name);
false
} else {
let job = self.dags.get(name).unwrap();
self.runtime.block_on(job.run())
}
}

/// Execute all the Dags in the Engine in sequence according to the order numbers of the Dags in
/// the sequence from small to large. The return value is the execution status of all tasks.
pub fn run_sequential(&mut self) -> Vec<bool> {
let mut res = Vec::new();
let mut res = Vec::with_capacity(self.sequence.len());
for seq in 1..self.sequence.len() + 1 {
let name = self.sequence.get(&seq).unwrap().clone();
res.push(self.run_dag(name.as_str()));
Expand All @@ -92,11 +91,7 @@ impl Engine {

/// Given the name of the Dag, get the execution result of the specified Dag.
pub fn get_dag_result<T: Send + Sync + Clone + 'static>(&self, name: &str) -> Option<Arc<T>> {
if self.dags.contains_key(name) {
self.dags.get(name).unwrap().get_result()
} else {
None
}
self.dags.get(name).and_then(|dag| dag.get_result())
}
}

Expand Down
26 changes: 2 additions & 24 deletions src/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ pub(crate) struct ExecState {
success: AtomicBool,
/// Output produced by a task.
output: AtomicPtr<Output>,
/// Task output identified by id.
tid: usize,
task_name: String,
/// The semaphore is used to control the synchronous blocking of subsequent tasks to obtain the
/// execution results of this task.
/// When a task is successfully executed, the permits inside the semaphore will be increased to
Expand All @@ -105,15 +102,13 @@ pub enum Output {
#[derive(Debug)]
pub struct Input(Vec<Content>);

#[allow(dead_code)]
impl ExecState {
/// Construct a new [`ExeState`].
pub(crate) fn new(task_id: usize, task_name: String) -> Self {
pub(crate) fn new() -> Self {
// initialize the task to failure without output.
Self {
success: AtomicBool::new(false),
output: AtomicPtr::new(std::ptr::null_mut()),
tid: task_id,
task_name,
semaphore: Semaphore::new(0),
}
}
Expand Down Expand Up @@ -145,23 +140,6 @@ impl ExecState {
self.success.store(true, Ordering::Relaxed)
}

pub(crate) fn get_err(&self) -> Option<String> {
if let Some(out) = unsafe { self.output.load(Ordering::Relaxed).as_ref() } {
out.get_err()
} else {
None
}
}

/// Use id to indicate the output of which task.
pub(crate) fn tid(&self) -> usize {
self.tid
}

pub(crate) fn task_name(&self) -> &str {
&self.task_name
}

/// The semaphore is used to control the synchronous acquisition of task output results.
/// Under normal circumstances, first use the semaphore to obtain a permit, and then call
/// the `get_output` function to obtain the output. If the current task is not completed
Expand Down
14 changes: 6 additions & 8 deletions src/yaml/yaml_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,15 @@ impl Parser for YamlParser {
.as_hash()
.ok_or(YamlTaskError::StartWordError)?;

let mut tasks = Vec::new();
let mut map = HashMap::new();
let mut tasks = Vec::with_capacity(yaml_tasks.len());
let mut map = HashMap::with_capacity(yaml_tasks.len());
// Read tasks
for (v, w) in yaml_tasks {
let id = v.as_str().unwrap();
let task = if specific_actions.contains_key(id) {
let action = specific_actions.remove(id).unwrap();
self.parse_one(id, w, Some(action))?
} else {
self.parse_one(id, w, None)?
};
let task = specific_actions.remove(id).map_or_else(
|| self.parse_one(id, w, None),
|action| self.parse_one(id, w, Some(action)),
)?;
map.insert(id, task.id());
tasks.push(task);
}
Expand Down
Loading