From 5c6295fcdca377db09497a5c07df21e8791c29c0 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Wed, 2 Oct 2024 17:13:57 -0400 Subject: [PATCH] Allow parent tasks to optionally wait for subtasks --- src/controlflow/orchestration/orchestrator.py | 4 ++++ src/controlflow/tasks/task.py | 10 +++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/controlflow/orchestration/orchestrator.py b/src/controlflow/orchestration/orchestrator.py index dcb658f8..35757aa1 100644 --- a/src/controlflow/orchestration/orchestrator.py +++ b/src/controlflow/orchestration/orchestrator.py @@ -474,6 +474,10 @@ def collect_tasks(task: Task): for dependency in task.depends_on: collect_tasks(dependency) + # Collect parent + if task.parent and not task.parent.wait_for_subtasks: + collect_tasks(task.parent) + # Check if the task is ready if task.is_ready(): ready_tasks.append(task) diff --git a/src/controlflow/tasks/task.py b/src/controlflow/tasks/task.py index 22f1bfc7..6cb0ab28 100644 --- a/src/controlflow/tasks/task.py +++ b/src/controlflow/tasks/task.py @@ -172,6 +172,10 @@ class Task(ControlFlowModel): "which this task is considered `assigned`.", ) created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + wait_for_subtasks: bool = Field( + default=True, + description="If True, the task will not be considered ready until all subtasks are complete.", + ) _subtasks: set["Task"] = set() _downstreams: set["Task"] = set() _cm_stack: list[contextmanager] = [] @@ -465,7 +469,11 @@ def is_ready(self) -> bool: Returns True if all dependencies are complete and this task is incomplete, meaning it is ready to be worked on. """ - return self.is_incomplete() and all(t.is_complete() for t in self.depends_on) + depends_on = self.depends_on + if not self.wait_for_subtasks: + depends_on = depends_on.difference(self._subtasks) + + return self.is_incomplete() and all(t.is_complete() for t in depends_on) def get_agents(self) -> list[Agent]: if self.agents is not None: