Skip to content

Commit

Permalink
Merge pull request #346 from PrefectHQ/wait-for-subtasks
Browse files Browse the repository at this point in the history
Allow parent tasks to optionally wait for subtasks
  • Loading branch information
jlowin authored Oct 2, 2024
2 parents 13f6ee3 + 5c6295f commit eace95a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/controlflow/orchestration/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ def collect_tasks(task: Task):
for dependency in task.depends_on:
collect_tasks(dependency)

# Collect parent
if task.parent and not task.parent.wait_for_subtasks:
collect_tasks(task.parent)

# Check if the task is ready
if task.is_ready():
ready_tasks.append(task)
Expand Down
10 changes: 9 additions & 1 deletion src/controlflow/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class Task(ControlFlowModel):
"which this task is considered `assigned`.",
)
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
wait_for_subtasks: bool = Field(
default=True,
description="If True, the task will not be considered ready until all subtasks are complete.",
)
_subtasks: set["Task"] = set()
_downstreams: set["Task"] = set()
_cm_stack: list[contextmanager] = []
Expand Down Expand Up @@ -467,7 +471,11 @@ def is_ready(self) -> bool:
Returns True if all dependencies are complete and this task is
incomplete, meaning it is ready to be worked on.
"""
return self.is_incomplete() and all(t.is_complete() for t in self.depends_on)
depends_on = self.depends_on
if not self.wait_for_subtasks:
depends_on = depends_on.difference(self._subtasks)

return self.is_incomplete() and all(t.is_complete() for t in depends_on)

def get_agents(self) -> list[Agent]:
if self.agents is not None:
Expand Down

0 comments on commit eace95a

Please sign in to comment.