diff --git a/CHANGELOG.md b/CHANGELOG.md index 14c28e77..4f9c4dc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Concurrent steps support in workflow definitions + - Workflows can now specify nested arrays of step IDs to indicate steps that can run in parallel + - Example: `steps: [setup, [task_a, task_b, task_c], finalize]` runs task_a/b/c concurrently + - Single-item arrays indicate a step with multiple parallel instances (e.g., `[fetch_campaign_data]` runs for each campaign) + - New `WorkflowStepEntry` dataclass in parser for sequential/concurrent step groups + - Meta-skill template renders concurrent steps as "Background Task 1/2/3" with clear instructions + - Added `get_step_entry_position_in_workflow()` and `get_concurrent_step_info()` methods to JobDefinition + - Full backward compatibility: existing workflows with simple step arrays continue to work - Agent delegation field for job.yml steps - New `agent` field on steps allows specifying an agent type (e.g., `agent: general-purpose`) - When `agent` is set, generated Claude Code skills automatically include `context: fork` and `agent:` in frontmatter diff --git a/doc/architecture.md b/doc/architecture.md index 879481e3..f4a2e094 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -365,13 +365,22 @@ changelog: # Workflows define named sequences of steps that form complete processes. # Steps not in any workflow are "standalone skills" that can be run anytime. +# Steps can be listed as simple strings (sequential) or arrays (concurrent execution). +# +# Concurrent step patterns: +# 1. Multiple different steps: [step_a, step_b] - run both in parallel +# 2. Single step with multiple instances: [fetch_campaign_data] - indicates this +# step should be run in parallel for each instance (e.g., each ad campaign) +# +# Use a single-item array when a step needs multiple parallel instances, like +# "fetch performance data" that runs once per campaign in an ad reporting job. workflows: - name: full_analysis summary: "Complete competitive analysis from identification through positioning" steps: - identify_competitors - - primary_research - - secondary_research + # Steps in an array execute concurrently (as "Background Tasks") + - [primary_research, secondary_research] - comparative_report - positioning diff --git a/src/deepwork/core/generator.py b/src/deepwork/core/generator.py index 7b575ffc..05ba975c 100644 --- a/src/deepwork/core/generator.py +++ b/src/deepwork/core/generator.py @@ -364,14 +364,38 @@ def _build_meta_skill_context( steps_info.append(step_info) - # Build workflow info + # Build workflow info with concurrent step support workflows_info = [] for workflow in job.workflows: + # Build step entries with concurrency info + step_entries_info = [] + for entry in workflow.step_entries: + entry_info: dict[str, Any] = { + "is_concurrent": entry.is_concurrent, + "step_ids": entry.step_ids, + } + if entry.is_concurrent: + # Add detailed step info for each concurrent step + concurrent_steps = [] + for i, step_id in enumerate(entry.step_ids): + step = job.get_step(step_id) + concurrent_steps.append( + { + "id": step_id, + "name": step.name if step else step_id, + "description": step.description if step else "", + "task_number": i + 1, + } + ) + entry_info["concurrent_steps"] = concurrent_steps + step_entries_info.append(entry_info) + workflows_info.append( { "name": workflow.name, "summary": workflow.summary, - "steps": workflow.steps, + "steps": workflow.steps, # Flattened for backward compat + "step_entries": step_entries_info, # New: with concurrency info "first_step": workflow.steps[0] if workflow.steps else None, } ) diff --git a/src/deepwork/core/parser.py b/src/deepwork/core/parser.py index a3942242..480ab6d4 100644 --- a/src/deepwork/core/parser.py +++ b/src/deepwork/core/parser.py @@ -187,21 +187,74 @@ def from_dict(cls, data: dict[str, Any]) -> "Step": ) +@dataclass +class WorkflowStepEntry: + """Represents a single entry in a workflow's step list. + + Each entry can be either: + - A single step (sequential execution) + - A list of steps (concurrent execution) + """ + + step_ids: list[str] # Single step has one ID, concurrent group has multiple + is_concurrent: bool = False + + @property + def first_step(self) -> str: + """Get the first step ID in this entry.""" + return self.step_ids[0] if self.step_ids else "" + + def all_step_ids(self) -> list[str]: + """Get all step IDs in this entry.""" + return self.step_ids + + @classmethod + def from_data(cls, data: str | list[str]) -> "WorkflowStepEntry": + """Create WorkflowStepEntry from YAML data (string or list).""" + if isinstance(data, str): + return cls(step_ids=[data], is_concurrent=False) + else: + return cls(step_ids=list(data), is_concurrent=True) + + @dataclass class Workflow: """Represents a named workflow grouping steps into a multi-step sequence.""" name: str summary: str - steps: list[str] # List of step IDs in order + step_entries: list[WorkflowStepEntry] # List of step entries (sequential or concurrent) + + @property + def steps(self) -> list[str]: + """Get flattened list of all step IDs for backward compatibility.""" + result: list[str] = [] + for entry in self.step_entries: + result.extend(entry.step_ids) + return result + + def get_step_entry_for_step(self, step_id: str) -> WorkflowStepEntry | None: + """Get the workflow step entry containing the given step ID.""" + for entry in self.step_entries: + if step_id in entry.step_ids: + return entry + return None + + def get_entry_index_for_step(self, step_id: str) -> int | None: + """Get the index of the entry containing the given step ID.""" + for i, entry in enumerate(self.step_entries): + if step_id in entry.step_ids: + return i + return None @classmethod def from_dict(cls, data: dict[str, Any]) -> "Workflow": """Create Workflow from dictionary.""" + step_entries = [WorkflowStepEntry.from_data(s) for s in data["steps"]] return cls( name=data["name"], summary=data["summary"], - steps=data["steps"], + step_entries=step_entries, ) @@ -407,6 +460,57 @@ def get_step_position_in_workflow(self, step_id: str) -> tuple[int, int] | None: except ValueError: return None + def get_step_entry_position_in_workflow( + self, step_id: str + ) -> tuple[int, int, WorkflowStepEntry] | None: + """ + Get the entry-based position of a step within its workflow. + + For concurrent step groups, multiple steps share the same entry position. + + Args: + step_id: Step ID to look up + + Returns: + Tuple of (1-based entry position, total entries, WorkflowStepEntry), + or None if standalone + """ + workflow = self.get_workflow_for_step(step_id) + if not workflow: + return None + + entry_index = workflow.get_entry_index_for_step(step_id) + if entry_index is None: + return None + + entry = workflow.step_entries[entry_index] + return (entry_index + 1, len(workflow.step_entries), entry) + + def get_concurrent_step_info(self, step_id: str) -> tuple[int, int] | None: + """ + Get information about a step's position within a concurrent group. + + Args: + step_id: Step ID to look up + + Returns: + Tuple of (1-based position in group, total in group) if step is in + a concurrent group, None if step is not in a concurrent group + """ + workflow = self.get_workflow_for_step(step_id) + if not workflow: + return None + + entry = workflow.get_step_entry_for_step(step_id) + if entry is None or not entry.is_concurrent: + return None + + try: + index = entry.step_ids.index(step_id) + return (index + 1, len(entry.step_ids)) + except ValueError: + return None + def validate_workflows(self) -> None: """ Validate workflow definitions. diff --git a/src/deepwork/schemas/job_schema.py b/src/deepwork/schemas/job_schema.py index 3acb11fe..e29b852c 100644 --- a/src/deepwork/schemas/job_schema.py +++ b/src/deepwork/schemas/job_schema.py @@ -46,6 +46,30 @@ ], } +# Schema for a single step reference (step ID) +STEP_ID_SCHEMA: dict[str, Any] = { + "type": "string", + "pattern": "^[a-z][a-z0-9_]*$", +} + +# Schema for a concurrent step group (array of step IDs that can run in parallel) +# minItems=1 allows single-item arrays to indicate a step with multiple parallel instances +# (e.g., [fetch_campaign_data] means run this step for each campaign in parallel) +CONCURRENT_STEPS_SCHEMA: dict[str, Any] = { + "type": "array", + "minItems": 1, + "description": "Array of step IDs that can be executed concurrently, or single step with multiple instances", + "items": STEP_ID_SCHEMA, +} + +# Schema for a workflow step entry (either single step or concurrent group) +WORKFLOW_STEP_ENTRY_SCHEMA: dict[str, Any] = { + "oneOf": [ + STEP_ID_SCHEMA, + CONCURRENT_STEPS_SCHEMA, + ], +} + # Schema for a workflow definition WORKFLOW_SCHEMA: dict[str, Any] = { "type": "object", @@ -65,11 +89,8 @@ "steps": { "type": "array", "minItems": 1, - "description": "Ordered list of step IDs that comprise this workflow", - "items": { - "type": "string", - "pattern": "^[a-z][a-z0-9_]*$", - }, + "description": "Ordered list of step entries. Each entry is either a step ID (string) or an array of step IDs for concurrent execution.", + "items": WORKFLOW_STEP_ENTRY_SCHEMA, }, }, "additionalProperties": False, diff --git a/src/deepwork/templates/claude/skill-job-meta.md.jinja b/src/deepwork/templates/claude/skill-job-meta.md.jinja index 1bedb8a2..ea258a87 100644 --- a/src/deepwork/templates/claude/skill-job-meta.md.jinja +++ b/src/deepwork/templates/claude/skill-job-meta.md.jinja @@ -47,9 +47,17 @@ description: "{{ job_summary }}" {{ workflow.summary }} **Steps in order**: -{% for step_id in workflow.steps %} +{% for entry in workflow.step_entries %} +{% if entry.is_concurrent %} +{{ loop.index }}. **Concurrent Steps** - Execute the following tasks in parallel: +{% for task in entry.concurrent_steps %} + - **Background Task {{ task.task_number }}**: {{ task.id }} - {{ task.description }} +{% endfor %} +{% else %} +{% set step_id = entry.step_ids[0] %} {% set step = steps | selectattr("id", "equalto", step_id) | first %} {{ loop.index }}. **{{ step_id }}** - {{ step.description if step else "Unknown step" }} +{% endif %} {% endfor %} **Start workflow**: `/{{ job_name }}.{{ workflow.first_step }}` diff --git a/tests/fixtures/jobs/concurrent_steps_job/job.yml b/tests/fixtures/jobs/concurrent_steps_job/job.yml new file mode 100644 index 00000000..8609c512 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/job.yml @@ -0,0 +1,89 @@ +name: concurrent_workflow +version: "1.0.0" +summary: "Workflow with concurrent steps for testing" +description: | + A test workflow that demonstrates concurrent step execution. + Some steps run sequentially while others run in parallel. + +workflows: + - name: full_analysis + summary: "Complete analysis workflow with parallel research phase" + steps: + - setup + - [research_web, research_docs, research_interviews] + - compile_results + - final_review + +steps: + - id: setup + name: "Setup" + description: "Initialize the analysis environment" + instructions_file: steps/setup.md + outputs: + - setup_complete.md + + - id: research_web + name: "Web Research" + description: "Research information from web sources" + instructions_file: steps/research_web.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - web_research.md + dependencies: + - setup + + - id: research_docs + name: "Document Research" + description: "Research information from internal documents" + instructions_file: steps/research_docs.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - docs_research.md + dependencies: + - setup + + - id: research_interviews + name: "Interview Research" + description: "Research information from stakeholder interviews" + instructions_file: steps/research_interviews.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - interviews_research.md + dependencies: + - setup + + - id: compile_results + name: "Compile Results" + description: "Compile all research into a unified report" + instructions_file: steps/compile_results.md + inputs: + - file: web_research.md + from_step: research_web + - file: docs_research.md + from_step: research_docs + - file: interviews_research.md + from_step: research_interviews + outputs: + - compiled_results.md + dependencies: + - research_web + - research_docs + - research_interviews + + - id: final_review + name: "Final Review" + description: "Review and finalize the analysis" + instructions_file: steps/final_review.md + inputs: + - file: compiled_results.md + from_step: compile_results + outputs: + - final_report.md + dependencies: + - compile_results diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md b/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md new file mode 100644 index 00000000..3a8f9550 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md @@ -0,0 +1,7 @@ +# Compile Results Instructions + +Compile all research into a unified report: + +1. Merge findings from all research sources +2. Identify patterns and insights +3. Create unified document diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md b/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md new file mode 100644 index 00000000..085acc9c --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md @@ -0,0 +1,7 @@ +# Final Review Instructions + +Review and finalize the analysis: + +1. Proofread and edit +2. Verify accuracy +3. Format final report diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md new file mode 100644 index 00000000..3d51c3fc --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md @@ -0,0 +1,7 @@ +# Document Research Instructions + +Research information from internal documents: + +1. Review internal documentation +2. Extract relevant data +3. Summarize findings diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md new file mode 100644 index 00000000..25f07db4 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md @@ -0,0 +1,7 @@ +# Interview Research Instructions + +Research information from stakeholder interviews: + +1. Conduct interviews +2. Document responses +3. Identify key themes diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md new file mode 100644 index 00000000..d50ef19a --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md @@ -0,0 +1,7 @@ +# Web Research Instructions + +Research information from web sources: + +1. Search relevant websites +2. Compile findings +3. Document sources diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md b/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md new file mode 100644 index 00000000..496deb36 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md @@ -0,0 +1,7 @@ +# Setup Instructions + +Initialize the analysis environment by: + +1. Creating necessary directories +2. Setting up configuration +3. Verifying access to required resources diff --git a/tests/unit/test_generator.py b/tests/unit/test_generator.py index bce2a5f1..dd90ba30 100644 --- a/tests/unit/test_generator.py +++ b/tests/unit/test_generator.py @@ -268,6 +268,98 @@ def test_generate_all_skills_with_exposed_steps( assert all(p.name == "SKILL.md" for p in skill_paths) +class TestConcurrentStepsGeneration: + """Tests for concurrent steps in skill generation.""" + + def test_generate_meta_skill_with_concurrent_steps( + self, fixtures_dir: Path, temp_dir: Path + ) -> None: + """Test generating meta-skill for job with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + meta_skill_path = generator.generate_meta_skill(job, adapter, temp_dir) + + assert meta_skill_path.exists() + content = meta_skill_path.read_text() + + # Check meta-skill content has workflow section + assert "# concurrent_workflow" in content + assert "full_analysis" in content + + # Check concurrent steps are rendered correctly + assert "Concurrent Steps" in content + assert "Background Task 1" in content + assert "Background Task 2" in content + assert "Background Task 3" in content + assert "research_web" in content + assert "research_docs" in content + assert "research_interviews" in content + + def test_meta_skill_context_has_step_entries(self, fixtures_dir: Path, temp_dir: Path) -> None: + """Test that meta-skill context includes step_entries with concurrency info.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + context = generator._build_meta_skill_context(job, adapter) + + assert "workflows" in context + assert len(context["workflows"]) == 1 + + workflow = context["workflows"][0] + assert "step_entries" in workflow + assert len(workflow["step_entries"]) == 4 + + # Check first entry (sequential) + entry1 = workflow["step_entries"][0] + assert entry1["is_concurrent"] is False + assert entry1["step_ids"] == ["setup"] + + # Check second entry (concurrent) + entry2 = workflow["step_entries"][1] + assert entry2["is_concurrent"] is True + assert entry2["step_ids"] == ["research_web", "research_docs", "research_interviews"] + assert "concurrent_steps" in entry2 + assert len(entry2["concurrent_steps"]) == 3 + assert entry2["concurrent_steps"][0]["task_number"] == 1 + assert entry2["concurrent_steps"][0]["id"] == "research_web" + + def test_generate_all_skills_with_concurrent_steps( + self, fixtures_dir: Path, temp_dir: Path + ) -> None: + """Test generating all skills for job with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + skill_paths = generator.generate_all_skills(job, adapter, temp_dir) + + # 1 meta-skill + 6 step skills + assert len(skill_paths) == 7 + assert all(p.exists() for p in skill_paths) + + # Check all step skills are generated + expected_dirs = [ + "concurrent_workflow", # Meta-skill + "concurrent_workflow.setup", + "concurrent_workflow.research_web", + "concurrent_workflow.research_docs", + "concurrent_workflow.research_interviews", + "concurrent_workflow.compile_results", + "concurrent_workflow.final_review", + ] + actual_dirs = [p.parent.name for p in skill_paths] + assert actual_dirs == expected_dirs + + class TestDocSpecIntegration: """Tests for doc spec integration in skill generation.""" diff --git a/tests/unit/test_parser.py b/tests/unit/test_parser.py index aa1eae7e..0c968242 100644 --- a/tests/unit/test_parser.py +++ b/tests/unit/test_parser.py @@ -447,3 +447,119 @@ def test_raises_for_invalid_schema(self, fixtures_dir: Path) -> None: with pytest.raises(ParseError, match="validation failed"): parse_job_definition(job_dir) + + +class TestConcurrentSteps: + """Tests for concurrent step parsing in workflows.""" + + def test_parses_concurrent_steps_workflow(self, fixtures_dir: Path) -> None: + """Test parsing job with concurrent steps in workflow.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + assert job.name == "concurrent_workflow" + assert len(job.workflows) == 1 + assert job.workflows[0].name == "full_analysis" + + def test_workflow_step_entries(self, fixtures_dir: Path) -> None: + """Test workflow step_entries structure with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + assert len(workflow.step_entries) == 4 + + # First entry: sequential step + assert not workflow.step_entries[0].is_concurrent + assert workflow.step_entries[0].step_ids == ["setup"] + + # Second entry: concurrent steps + assert workflow.step_entries[1].is_concurrent + assert workflow.step_entries[1].step_ids == [ + "research_web", + "research_docs", + "research_interviews", + ] + + # Third entry: sequential step + assert not workflow.step_entries[2].is_concurrent + assert workflow.step_entries[2].step_ids == ["compile_results"] + + # Fourth entry: sequential step + assert not workflow.step_entries[3].is_concurrent + assert workflow.step_entries[3].step_ids == ["final_review"] + + def test_workflow_flattened_steps(self, fixtures_dir: Path) -> None: + """Test backward-compatible flattened steps list.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + # Flattened list should include all step IDs + assert workflow.steps == [ + "setup", + "research_web", + "research_docs", + "research_interviews", + "compile_results", + "final_review", + ] + + def test_get_step_entry_for_step(self, fixtures_dir: Path) -> None: + """Test getting the step entry containing a step.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + + # Sequential step + entry = workflow.get_step_entry_for_step("setup") + assert entry is not None + assert not entry.is_concurrent + assert entry.step_ids == ["setup"] + + # Concurrent step + entry = workflow.get_step_entry_for_step("research_web") + assert entry is not None + assert entry.is_concurrent + assert "research_web" in entry.step_ids + + def test_get_step_entry_position_in_workflow(self, fixtures_dir: Path) -> None: + """Test getting entry-based position in workflow.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + # Sequential step + result = job.get_step_entry_position_in_workflow("setup") + assert result is not None + entry_pos, total_entries, entry = result + assert entry_pos == 1 + assert total_entries == 4 + assert not entry.is_concurrent + + # Concurrent step - all share same entry position + for step_id in ["research_web", "research_docs", "research_interviews"]: + result = job.get_step_entry_position_in_workflow(step_id) + assert result is not None + entry_pos, total_entries, entry = result + assert entry_pos == 2 # All in second position + assert total_entries == 4 + assert entry.is_concurrent + + def test_get_concurrent_step_info(self, fixtures_dir: Path) -> None: + """Test getting info about position within concurrent group.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + # Sequential step returns None + assert job.get_concurrent_step_info("setup") is None + + # Concurrent steps return their position in group + result = job.get_concurrent_step_info("research_web") + assert result == (1, 3) + + result = job.get_concurrent_step_info("research_docs") + assert result == (2, 3) + + result = job.get_concurrent_step_info("research_interviews") + assert result == (3, 3)