Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Concurrent steps support in workflow definitions
- Workflows can now specify nested arrays of step IDs to indicate steps that can run in parallel
- Example: `steps: [setup, [task_a, task_b, task_c], finalize]` runs task_a/b/c concurrently
- Single-item arrays indicate a step with multiple parallel instances (e.g., `[fetch_campaign_data]` runs for each campaign)
- New `WorkflowStepEntry` dataclass in parser for sequential/concurrent step groups
- Meta-skill template renders concurrent steps as "Background Task 1/2/3" with clear instructions
- Added `get_step_entry_position_in_workflow()` and `get_concurrent_step_info()` methods to JobDefinition
- Full backward compatibility: existing workflows with simple step arrays continue to work
- Agent delegation field for job.yml steps
- New `agent` field on steps allows specifying an agent type (e.g., `agent: general-purpose`)
- When `agent` is set, generated Claude Code skills automatically include `context: fork` and `agent:` in frontmatter
Expand Down
13 changes: 11 additions & 2 deletions doc/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,22 @@ changelog:

# Workflows define named sequences of steps that form complete processes.
# Steps not in any workflow are "standalone skills" that can be run anytime.
# Steps can be listed as simple strings (sequential) or arrays (concurrent execution).
#
# Concurrent step patterns:
# 1. Multiple different steps: [step_a, step_b] - run both in parallel
# 2. Single step with multiple instances: [fetch_campaign_data] - indicates this
# step should be run in parallel for each instance (e.g., each ad campaign)
#
# Use a single-item array when a step needs multiple parallel instances, like
# "fetch performance data" that runs once per campaign in an ad reporting job.
workflows:
- name: full_analysis
summary: "Complete competitive analysis from identification through positioning"
steps:
- identify_competitors
- primary_research
- secondary_research
# Steps in an array execute concurrently (as "Background Tasks")
- [primary_research, secondary_research]
- comparative_report
- positioning

Expand Down
28 changes: 26 additions & 2 deletions src/deepwork/core/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,38 @@ def _build_meta_skill_context(

steps_info.append(step_info)

# Build workflow info
# Build workflow info with concurrent step support
workflows_info = []
for workflow in job.workflows:
# Build step entries with concurrency info
step_entries_info = []
for entry in workflow.step_entries:
entry_info: dict[str, Any] = {
"is_concurrent": entry.is_concurrent,
"step_ids": entry.step_ids,
}
if entry.is_concurrent:
# Add detailed step info for each concurrent step
concurrent_steps = []
for i, step_id in enumerate(entry.step_ids):
step = job.get_step(step_id)
concurrent_steps.append(
{
"id": step_id,
"name": step.name if step else step_id,
"description": step.description if step else "",
"task_number": i + 1,
}
)
entry_info["concurrent_steps"] = concurrent_steps
step_entries_info.append(entry_info)

workflows_info.append(
{
"name": workflow.name,
"summary": workflow.summary,
"steps": workflow.steps,
"steps": workflow.steps, # Flattened for backward compat
"step_entries": step_entries_info, # New: with concurrency info
"first_step": workflow.steps[0] if workflow.steps else None,
}
)
Expand Down
108 changes: 106 additions & 2 deletions src/deepwork/core/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,74 @@ def from_dict(cls, data: dict[str, Any]) -> "Step":
)


@dataclass
class WorkflowStepEntry:
"""Represents a single entry in a workflow's step list.

Each entry can be either:
- A single step (sequential execution)
- A list of steps (concurrent execution)
"""

step_ids: list[str] # Single step has one ID, concurrent group has multiple
is_concurrent: bool = False

@property
def first_step(self) -> str:
"""Get the first step ID in this entry."""
return self.step_ids[0] if self.step_ids else ""

def all_step_ids(self) -> list[str]:
"""Get all step IDs in this entry."""
return self.step_ids

@classmethod
def from_data(cls, data: str | list[str]) -> "WorkflowStepEntry":
"""Create WorkflowStepEntry from YAML data (string or list)."""
if isinstance(data, str):
return cls(step_ids=[data], is_concurrent=False)
else:
return cls(step_ids=list(data), is_concurrent=True)


@dataclass
class Workflow:
"""Represents a named workflow grouping steps into a multi-step sequence."""

name: str
summary: str
steps: list[str] # List of step IDs in order
step_entries: list[WorkflowStepEntry] # List of step entries (sequential or concurrent)

@property
def steps(self) -> list[str]:
"""Get flattened list of all step IDs for backward compatibility."""
result: list[str] = []
for entry in self.step_entries:
result.extend(entry.step_ids)
return result

def get_step_entry_for_step(self, step_id: str) -> WorkflowStepEntry | None:
"""Get the workflow step entry containing the given step ID."""
for entry in self.step_entries:
if step_id in entry.step_ids:
return entry
return None

def get_entry_index_for_step(self, step_id: str) -> int | None:
"""Get the index of the entry containing the given step ID."""
for i, entry in enumerate(self.step_entries):
if step_id in entry.step_ids:
return i
return None

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Workflow":
"""Create Workflow from dictionary."""
step_entries = [WorkflowStepEntry.from_data(s) for s in data["steps"]]
return cls(
name=data["name"],
summary=data["summary"],
steps=data["steps"],
step_entries=step_entries,
)


Expand Down Expand Up @@ -407,6 +460,57 @@ def get_step_position_in_workflow(self, step_id: str) -> tuple[int, int] | None:
except ValueError:
return None

def get_step_entry_position_in_workflow(
self, step_id: str
) -> tuple[int, int, WorkflowStepEntry] | None:
"""
Get the entry-based position of a step within its workflow.

For concurrent step groups, multiple steps share the same entry position.

Args:
step_id: Step ID to look up

Returns:
Tuple of (1-based entry position, total entries, WorkflowStepEntry),
or None if standalone
"""
workflow = self.get_workflow_for_step(step_id)
if not workflow:
return None

entry_index = workflow.get_entry_index_for_step(step_id)
if entry_index is None:
return None

entry = workflow.step_entries[entry_index]
return (entry_index + 1, len(workflow.step_entries), entry)

def get_concurrent_step_info(self, step_id: str) -> tuple[int, int] | None:
"""
Get information about a step's position within a concurrent group.

Args:
step_id: Step ID to look up

Returns:
Tuple of (1-based position in group, total in group) if step is in
a concurrent group, None if step is not in a concurrent group
"""
workflow = self.get_workflow_for_step(step_id)
if not workflow:
return None

entry = workflow.get_step_entry_for_step(step_id)
if entry is None or not entry.is_concurrent:
return None

try:
index = entry.step_ids.index(step_id)
return (index + 1, len(entry.step_ids))
except ValueError:
return None

def validate_workflows(self) -> None:
"""
Validate workflow definitions.
Expand Down
31 changes: 26 additions & 5 deletions src/deepwork/schemas/job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@
],
}

# Schema for a single step reference (step ID)
STEP_ID_SCHEMA: dict[str, Any] = {
"type": "string",
"pattern": "^[a-z][a-z0-9_]*$",
}

# Schema for a concurrent step group (array of step IDs that can run in parallel)
# minItems=1 allows single-item arrays to indicate a step with multiple parallel instances
# (e.g., [fetch_campaign_data] means run this step for each campaign in parallel)
CONCURRENT_STEPS_SCHEMA: dict[str, Any] = {
"type": "array",
"minItems": 1,
"description": "Array of step IDs that can be executed concurrently, or single step with multiple instances",
"items": STEP_ID_SCHEMA,
}

# Schema for a workflow step entry (either single step or concurrent group)
WORKFLOW_STEP_ENTRY_SCHEMA: dict[str, Any] = {
"oneOf": [
STEP_ID_SCHEMA,
CONCURRENT_STEPS_SCHEMA,
],
}

# Schema for a workflow definition
WORKFLOW_SCHEMA: dict[str, Any] = {
"type": "object",
Expand All @@ -65,11 +89,8 @@
"steps": {
"type": "array",
"minItems": 1,
"description": "Ordered list of step IDs that comprise this workflow",
"items": {
"type": "string",
"pattern": "^[a-z][a-z0-9_]*$",
},
"description": "Ordered list of step entries. Each entry is either a step ID (string) or an array of step IDs for concurrent execution.",
"items": WORKFLOW_STEP_ENTRY_SCHEMA,
},
},
"additionalProperties": False,
Expand Down
10 changes: 9 additions & 1 deletion src/deepwork/templates/claude/skill-job-meta.md.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,17 @@ description: "{{ job_summary }}"
{{ workflow.summary }}

**Steps in order**:
{% for step_id in workflow.steps %}
{% for entry in workflow.step_entries %}
{% if entry.is_concurrent %}
{{ loop.index }}. **Concurrent Steps** - Execute the following tasks in parallel:
{% for task in entry.concurrent_steps %}
- **Background Task {{ task.task_number }}**: {{ task.id }} - {{ task.description }}
{% endfor %}
{% else %}
{% set step_id = entry.step_ids[0] %}
{% set step = steps | selectattr("id", "equalto", step_id) | first %}
{{ loop.index }}. **{{ step_id }}** - {{ step.description if step else "Unknown step" }}
{% endif %}
{% endfor %}

**Start workflow**: `/{{ job_name }}.{{ workflow.first_step }}`
Expand Down
89 changes: 89 additions & 0 deletions tests/fixtures/jobs/concurrent_steps_job/job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
name: concurrent_workflow
version: "1.0.0"
summary: "Workflow with concurrent steps for testing"
description: |
A test workflow that demonstrates concurrent step execution.
Some steps run sequentially while others run in parallel.

workflows:
- name: full_analysis
summary: "Complete analysis workflow with parallel research phase"
steps:
- setup
- [research_web, research_docs, research_interviews]
- compile_results
- final_review

steps:
- id: setup
name: "Setup"
description: "Initialize the analysis environment"
instructions_file: steps/setup.md
outputs:
- setup_complete.md

- id: research_web
name: "Web Research"
description: "Research information from web sources"
instructions_file: steps/research_web.md
inputs:
- file: setup_complete.md
from_step: setup
outputs:
- web_research.md
dependencies:
- setup

- id: research_docs
name: "Document Research"
description: "Research information from internal documents"
instructions_file: steps/research_docs.md
inputs:
- file: setup_complete.md
from_step: setup
outputs:
- docs_research.md
dependencies:
- setup

- id: research_interviews
name: "Interview Research"
description: "Research information from stakeholder interviews"
instructions_file: steps/research_interviews.md
inputs:
- file: setup_complete.md
from_step: setup
outputs:
- interviews_research.md
dependencies:
- setup

- id: compile_results
name: "Compile Results"
description: "Compile all research into a unified report"
instructions_file: steps/compile_results.md
inputs:
- file: web_research.md
from_step: research_web
- file: docs_research.md
from_step: research_docs
- file: interviews_research.md
from_step: research_interviews
outputs:
- compiled_results.md
dependencies:
- research_web
- research_docs
- research_interviews

- id: final_review
name: "Final Review"
description: "Review and finalize the analysis"
instructions_file: steps/final_review.md
inputs:
- file: compiled_results.md
from_step: compile_results
outputs:
- final_report.md
dependencies:
- compile_results
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Compile Results Instructions

Compile all research into a unified report:

1. Merge findings from all research sources
2. Identify patterns and insights
3. Create unified document
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Final Review Instructions

Review and finalize the analysis:

1. Proofread and edit
2. Verify accuracy
3. Format final report
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Document Research Instructions

Research information from internal documents:

1. Review internal documentation
2. Extract relevant data
3. Summarize findings
Loading