From e7a757f6dd4c51af40f2032662296c629f10c1f6 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 30 Jan 2026 19:47:10 +0000 Subject: [PATCH] feat: Add concurrent steps support in workflow definitions Enable workflows to specify nested arrays of step IDs to indicate steps that can be executed in parallel. The meta template renders concurrent step groups as "Concurrent Steps" with numbered "Background Task" items. Changes: - Schema: Added STEP_ID_SCHEMA, CONCURRENT_STEPS_SCHEMA, WORKFLOW_STEP_ENTRY_SCHEMA - Workflow steps accept either string (sequential) or array (concurrent) - Single-item arrays indicate steps with multiple parallel instances - Parser: Added WorkflowStepEntry dataclass for sequential/concurrent groups - Updated Workflow to use step_entries with backward-compatible steps property - Added get_step_entry_position_in_workflow() and get_concurrent_step_info() - Generator: Build step_entries with concurrency info in workflow context - Template: Render concurrent steps with "Background Task N" formatting - Tests: Added test fixture and 9 new unit tests - Docs: Updated architecture.md and CHANGELOG.md https://claude.ai/code/session_011qMp5U4AFQLMbJQvCE8Ck4 --- CHANGELOG.md | 8 ++ doc/architecture.md | 13 +- src/deepwork/core/generator.py | 28 ++++- src/deepwork/core/parser.py | 108 +++++++++++++++- src/deepwork/schemas/job_schema.py | 31 ++++- .../templates/claude/skill-job-meta.md.jinja | 10 +- .../jobs/concurrent_steps_job/job.yml | 89 ++++++++++++++ .../steps/compile_results.md | 7 ++ .../steps/final_review.md | 7 ++ .../steps/research_docs.md | 7 ++ .../steps/research_interviews.md | 7 ++ .../steps/research_web.md | 7 ++ .../jobs/concurrent_steps_job/steps/setup.md | 7 ++ tests/unit/test_generator.py | 92 ++++++++++++++ tests/unit/test_parser.py | 116 ++++++++++++++++++ 15 files changed, 525 insertions(+), 12 deletions(-) create mode 100644 tests/fixtures/jobs/concurrent_steps_job/job.yml create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md create mode 100644 tests/fixtures/jobs/concurrent_steps_job/steps/setup.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 14c28e77..4f9c4dc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Concurrent steps support in workflow definitions + - Workflows can now specify nested arrays of step IDs to indicate steps that can run in parallel + - Example: `steps: [setup, [task_a, task_b, task_c], finalize]` runs task_a/b/c concurrently + - Single-item arrays indicate a step with multiple parallel instances (e.g., `[fetch_campaign_data]` runs for each campaign) + - New `WorkflowStepEntry` dataclass in parser for sequential/concurrent step groups + - Meta-skill template renders concurrent steps as "Background Task 1/2/3" with clear instructions + - Added `get_step_entry_position_in_workflow()` and `get_concurrent_step_info()` methods to JobDefinition + - Full backward compatibility: existing workflows with simple step arrays continue to work - Agent delegation field for job.yml steps - New `agent` field on steps allows specifying an agent type (e.g., `agent: general-purpose`) - When `agent` is set, generated Claude Code skills automatically include `context: fork` and `agent:` in frontmatter diff --git a/doc/architecture.md b/doc/architecture.md index 879481e3..f4a2e094 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -365,13 +365,22 @@ changelog: # Workflows define named sequences of steps that form complete processes. # Steps not in any workflow are "standalone skills" that can be run anytime. +# Steps can be listed as simple strings (sequential) or arrays (concurrent execution). +# +# Concurrent step patterns: +# 1. Multiple different steps: [step_a, step_b] - run both in parallel +# 2. Single step with multiple instances: [fetch_campaign_data] - indicates this +# step should be run in parallel for each instance (e.g., each ad campaign) +# +# Use a single-item array when a step needs multiple parallel instances, like +# "fetch performance data" that runs once per campaign in an ad reporting job. workflows: - name: full_analysis summary: "Complete competitive analysis from identification through positioning" steps: - identify_competitors - - primary_research - - secondary_research + # Steps in an array execute concurrently (as "Background Tasks") + - [primary_research, secondary_research] - comparative_report - positioning diff --git a/src/deepwork/core/generator.py b/src/deepwork/core/generator.py index 7b575ffc..05ba975c 100644 --- a/src/deepwork/core/generator.py +++ b/src/deepwork/core/generator.py @@ -364,14 +364,38 @@ def _build_meta_skill_context( steps_info.append(step_info) - # Build workflow info + # Build workflow info with concurrent step support workflows_info = [] for workflow in job.workflows: + # Build step entries with concurrency info + step_entries_info = [] + for entry in workflow.step_entries: + entry_info: dict[str, Any] = { + "is_concurrent": entry.is_concurrent, + "step_ids": entry.step_ids, + } + if entry.is_concurrent: + # Add detailed step info for each concurrent step + concurrent_steps = [] + for i, step_id in enumerate(entry.step_ids): + step = job.get_step(step_id) + concurrent_steps.append( + { + "id": step_id, + "name": step.name if step else step_id, + "description": step.description if step else "", + "task_number": i + 1, + } + ) + entry_info["concurrent_steps"] = concurrent_steps + step_entries_info.append(entry_info) + workflows_info.append( { "name": workflow.name, "summary": workflow.summary, - "steps": workflow.steps, + "steps": workflow.steps, # Flattened for backward compat + "step_entries": step_entries_info, # New: with concurrency info "first_step": workflow.steps[0] if workflow.steps else None, } ) diff --git a/src/deepwork/core/parser.py b/src/deepwork/core/parser.py index a3942242..480ab6d4 100644 --- a/src/deepwork/core/parser.py +++ b/src/deepwork/core/parser.py @@ -187,21 +187,74 @@ def from_dict(cls, data: dict[str, Any]) -> "Step": ) +@dataclass +class WorkflowStepEntry: + """Represents a single entry in a workflow's step list. + + Each entry can be either: + - A single step (sequential execution) + - A list of steps (concurrent execution) + """ + + step_ids: list[str] # Single step has one ID, concurrent group has multiple + is_concurrent: bool = False + + @property + def first_step(self) -> str: + """Get the first step ID in this entry.""" + return self.step_ids[0] if self.step_ids else "" + + def all_step_ids(self) -> list[str]: + """Get all step IDs in this entry.""" + return self.step_ids + + @classmethod + def from_data(cls, data: str | list[str]) -> "WorkflowStepEntry": + """Create WorkflowStepEntry from YAML data (string or list).""" + if isinstance(data, str): + return cls(step_ids=[data], is_concurrent=False) + else: + return cls(step_ids=list(data), is_concurrent=True) + + @dataclass class Workflow: """Represents a named workflow grouping steps into a multi-step sequence.""" name: str summary: str - steps: list[str] # List of step IDs in order + step_entries: list[WorkflowStepEntry] # List of step entries (sequential or concurrent) + + @property + def steps(self) -> list[str]: + """Get flattened list of all step IDs for backward compatibility.""" + result: list[str] = [] + for entry in self.step_entries: + result.extend(entry.step_ids) + return result + + def get_step_entry_for_step(self, step_id: str) -> WorkflowStepEntry | None: + """Get the workflow step entry containing the given step ID.""" + for entry in self.step_entries: + if step_id in entry.step_ids: + return entry + return None + + def get_entry_index_for_step(self, step_id: str) -> int | None: + """Get the index of the entry containing the given step ID.""" + for i, entry in enumerate(self.step_entries): + if step_id in entry.step_ids: + return i + return None @classmethod def from_dict(cls, data: dict[str, Any]) -> "Workflow": """Create Workflow from dictionary.""" + step_entries = [WorkflowStepEntry.from_data(s) for s in data["steps"]] return cls( name=data["name"], summary=data["summary"], - steps=data["steps"], + step_entries=step_entries, ) @@ -407,6 +460,57 @@ def get_step_position_in_workflow(self, step_id: str) -> tuple[int, int] | None: except ValueError: return None + def get_step_entry_position_in_workflow( + self, step_id: str + ) -> tuple[int, int, WorkflowStepEntry] | None: + """ + Get the entry-based position of a step within its workflow. + + For concurrent step groups, multiple steps share the same entry position. + + Args: + step_id: Step ID to look up + + Returns: + Tuple of (1-based entry position, total entries, WorkflowStepEntry), + or None if standalone + """ + workflow = self.get_workflow_for_step(step_id) + if not workflow: + return None + + entry_index = workflow.get_entry_index_for_step(step_id) + if entry_index is None: + return None + + entry = workflow.step_entries[entry_index] + return (entry_index + 1, len(workflow.step_entries), entry) + + def get_concurrent_step_info(self, step_id: str) -> tuple[int, int] | None: + """ + Get information about a step's position within a concurrent group. + + Args: + step_id: Step ID to look up + + Returns: + Tuple of (1-based position in group, total in group) if step is in + a concurrent group, None if step is not in a concurrent group + """ + workflow = self.get_workflow_for_step(step_id) + if not workflow: + return None + + entry = workflow.get_step_entry_for_step(step_id) + if entry is None or not entry.is_concurrent: + return None + + try: + index = entry.step_ids.index(step_id) + return (index + 1, len(entry.step_ids)) + except ValueError: + return None + def validate_workflows(self) -> None: """ Validate workflow definitions. diff --git a/src/deepwork/schemas/job_schema.py b/src/deepwork/schemas/job_schema.py index 3acb11fe..e29b852c 100644 --- a/src/deepwork/schemas/job_schema.py +++ b/src/deepwork/schemas/job_schema.py @@ -46,6 +46,30 @@ ], } +# Schema for a single step reference (step ID) +STEP_ID_SCHEMA: dict[str, Any] = { + "type": "string", + "pattern": "^[a-z][a-z0-9_]*$", +} + +# Schema for a concurrent step group (array of step IDs that can run in parallel) +# minItems=1 allows single-item arrays to indicate a step with multiple parallel instances +# (e.g., [fetch_campaign_data] means run this step for each campaign in parallel) +CONCURRENT_STEPS_SCHEMA: dict[str, Any] = { + "type": "array", + "minItems": 1, + "description": "Array of step IDs that can be executed concurrently, or single step with multiple instances", + "items": STEP_ID_SCHEMA, +} + +# Schema for a workflow step entry (either single step or concurrent group) +WORKFLOW_STEP_ENTRY_SCHEMA: dict[str, Any] = { + "oneOf": [ + STEP_ID_SCHEMA, + CONCURRENT_STEPS_SCHEMA, + ], +} + # Schema for a workflow definition WORKFLOW_SCHEMA: dict[str, Any] = { "type": "object", @@ -65,11 +89,8 @@ "steps": { "type": "array", "minItems": 1, - "description": "Ordered list of step IDs that comprise this workflow", - "items": { - "type": "string", - "pattern": "^[a-z][a-z0-9_]*$", - }, + "description": "Ordered list of step entries. Each entry is either a step ID (string) or an array of step IDs for concurrent execution.", + "items": WORKFLOW_STEP_ENTRY_SCHEMA, }, }, "additionalProperties": False, diff --git a/src/deepwork/templates/claude/skill-job-meta.md.jinja b/src/deepwork/templates/claude/skill-job-meta.md.jinja index 1bedb8a2..ea258a87 100644 --- a/src/deepwork/templates/claude/skill-job-meta.md.jinja +++ b/src/deepwork/templates/claude/skill-job-meta.md.jinja @@ -47,9 +47,17 @@ description: "{{ job_summary }}" {{ workflow.summary }} **Steps in order**: -{% for step_id in workflow.steps %} +{% for entry in workflow.step_entries %} +{% if entry.is_concurrent %} +{{ loop.index }}. **Concurrent Steps** - Execute the following tasks in parallel: +{% for task in entry.concurrent_steps %} + - **Background Task {{ task.task_number }}**: {{ task.id }} - {{ task.description }} +{% endfor %} +{% else %} +{% set step_id = entry.step_ids[0] %} {% set step = steps | selectattr("id", "equalto", step_id) | first %} {{ loop.index }}. **{{ step_id }}** - {{ step.description if step else "Unknown step" }} +{% endif %} {% endfor %} **Start workflow**: `/{{ job_name }}.{{ workflow.first_step }}` diff --git a/tests/fixtures/jobs/concurrent_steps_job/job.yml b/tests/fixtures/jobs/concurrent_steps_job/job.yml new file mode 100644 index 00000000..8609c512 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/job.yml @@ -0,0 +1,89 @@ +name: concurrent_workflow +version: "1.0.0" +summary: "Workflow with concurrent steps for testing" +description: | + A test workflow that demonstrates concurrent step execution. + Some steps run sequentially while others run in parallel. + +workflows: + - name: full_analysis + summary: "Complete analysis workflow with parallel research phase" + steps: + - setup + - [research_web, research_docs, research_interviews] + - compile_results + - final_review + +steps: + - id: setup + name: "Setup" + description: "Initialize the analysis environment" + instructions_file: steps/setup.md + outputs: + - setup_complete.md + + - id: research_web + name: "Web Research" + description: "Research information from web sources" + instructions_file: steps/research_web.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - web_research.md + dependencies: + - setup + + - id: research_docs + name: "Document Research" + description: "Research information from internal documents" + instructions_file: steps/research_docs.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - docs_research.md + dependencies: + - setup + + - id: research_interviews + name: "Interview Research" + description: "Research information from stakeholder interviews" + instructions_file: steps/research_interviews.md + inputs: + - file: setup_complete.md + from_step: setup + outputs: + - interviews_research.md + dependencies: + - setup + + - id: compile_results + name: "Compile Results" + description: "Compile all research into a unified report" + instructions_file: steps/compile_results.md + inputs: + - file: web_research.md + from_step: research_web + - file: docs_research.md + from_step: research_docs + - file: interviews_research.md + from_step: research_interviews + outputs: + - compiled_results.md + dependencies: + - research_web + - research_docs + - research_interviews + + - id: final_review + name: "Final Review" + description: "Review and finalize the analysis" + instructions_file: steps/final_review.md + inputs: + - file: compiled_results.md + from_step: compile_results + outputs: + - final_report.md + dependencies: + - compile_results diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md b/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md new file mode 100644 index 00000000..3a8f9550 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/compile_results.md @@ -0,0 +1,7 @@ +# Compile Results Instructions + +Compile all research into a unified report: + +1. Merge findings from all research sources +2. Identify patterns and insights +3. Create unified document diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md b/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md new file mode 100644 index 00000000..085acc9c --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/final_review.md @@ -0,0 +1,7 @@ +# Final Review Instructions + +Review and finalize the analysis: + +1. Proofread and edit +2. Verify accuracy +3. Format final report diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md new file mode 100644 index 00000000..3d51c3fc --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_docs.md @@ -0,0 +1,7 @@ +# Document Research Instructions + +Research information from internal documents: + +1. Review internal documentation +2. Extract relevant data +3. Summarize findings diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md new file mode 100644 index 00000000..25f07db4 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_interviews.md @@ -0,0 +1,7 @@ +# Interview Research Instructions + +Research information from stakeholder interviews: + +1. Conduct interviews +2. Document responses +3. Identify key themes diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md b/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md new file mode 100644 index 00000000..d50ef19a --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/research_web.md @@ -0,0 +1,7 @@ +# Web Research Instructions + +Research information from web sources: + +1. Search relevant websites +2. Compile findings +3. Document sources diff --git a/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md b/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md new file mode 100644 index 00000000..496deb36 --- /dev/null +++ b/tests/fixtures/jobs/concurrent_steps_job/steps/setup.md @@ -0,0 +1,7 @@ +# Setup Instructions + +Initialize the analysis environment by: + +1. Creating necessary directories +2. Setting up configuration +3. Verifying access to required resources diff --git a/tests/unit/test_generator.py b/tests/unit/test_generator.py index bce2a5f1..dd90ba30 100644 --- a/tests/unit/test_generator.py +++ b/tests/unit/test_generator.py @@ -268,6 +268,98 @@ def test_generate_all_skills_with_exposed_steps( assert all(p.name == "SKILL.md" for p in skill_paths) +class TestConcurrentStepsGeneration: + """Tests for concurrent steps in skill generation.""" + + def test_generate_meta_skill_with_concurrent_steps( + self, fixtures_dir: Path, temp_dir: Path + ) -> None: + """Test generating meta-skill for job with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + meta_skill_path = generator.generate_meta_skill(job, adapter, temp_dir) + + assert meta_skill_path.exists() + content = meta_skill_path.read_text() + + # Check meta-skill content has workflow section + assert "# concurrent_workflow" in content + assert "full_analysis" in content + + # Check concurrent steps are rendered correctly + assert "Concurrent Steps" in content + assert "Background Task 1" in content + assert "Background Task 2" in content + assert "Background Task 3" in content + assert "research_web" in content + assert "research_docs" in content + assert "research_interviews" in content + + def test_meta_skill_context_has_step_entries(self, fixtures_dir: Path, temp_dir: Path) -> None: + """Test that meta-skill context includes step_entries with concurrency info.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + context = generator._build_meta_skill_context(job, adapter) + + assert "workflows" in context + assert len(context["workflows"]) == 1 + + workflow = context["workflows"][0] + assert "step_entries" in workflow + assert len(workflow["step_entries"]) == 4 + + # Check first entry (sequential) + entry1 = workflow["step_entries"][0] + assert entry1["is_concurrent"] is False + assert entry1["step_ids"] == ["setup"] + + # Check second entry (concurrent) + entry2 = workflow["step_entries"][1] + assert entry2["is_concurrent"] is True + assert entry2["step_ids"] == ["research_web", "research_docs", "research_interviews"] + assert "concurrent_steps" in entry2 + assert len(entry2["concurrent_steps"]) == 3 + assert entry2["concurrent_steps"][0]["task_number"] == 1 + assert entry2["concurrent_steps"][0]["id"] == "research_web" + + def test_generate_all_skills_with_concurrent_steps( + self, fixtures_dir: Path, temp_dir: Path + ) -> None: + """Test generating all skills for job with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + generator = SkillGenerator() + adapter = ClaudeAdapter() + + skill_paths = generator.generate_all_skills(job, adapter, temp_dir) + + # 1 meta-skill + 6 step skills + assert len(skill_paths) == 7 + assert all(p.exists() for p in skill_paths) + + # Check all step skills are generated + expected_dirs = [ + "concurrent_workflow", # Meta-skill + "concurrent_workflow.setup", + "concurrent_workflow.research_web", + "concurrent_workflow.research_docs", + "concurrent_workflow.research_interviews", + "concurrent_workflow.compile_results", + "concurrent_workflow.final_review", + ] + actual_dirs = [p.parent.name for p in skill_paths] + assert actual_dirs == expected_dirs + + class TestDocSpecIntegration: """Tests for doc spec integration in skill generation.""" diff --git a/tests/unit/test_parser.py b/tests/unit/test_parser.py index aa1eae7e..0c968242 100644 --- a/tests/unit/test_parser.py +++ b/tests/unit/test_parser.py @@ -447,3 +447,119 @@ def test_raises_for_invalid_schema(self, fixtures_dir: Path) -> None: with pytest.raises(ParseError, match="validation failed"): parse_job_definition(job_dir) + + +class TestConcurrentSteps: + """Tests for concurrent step parsing in workflows.""" + + def test_parses_concurrent_steps_workflow(self, fixtures_dir: Path) -> None: + """Test parsing job with concurrent steps in workflow.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + assert job.name == "concurrent_workflow" + assert len(job.workflows) == 1 + assert job.workflows[0].name == "full_analysis" + + def test_workflow_step_entries(self, fixtures_dir: Path) -> None: + """Test workflow step_entries structure with concurrent steps.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + assert len(workflow.step_entries) == 4 + + # First entry: sequential step + assert not workflow.step_entries[0].is_concurrent + assert workflow.step_entries[0].step_ids == ["setup"] + + # Second entry: concurrent steps + assert workflow.step_entries[1].is_concurrent + assert workflow.step_entries[1].step_ids == [ + "research_web", + "research_docs", + "research_interviews", + ] + + # Third entry: sequential step + assert not workflow.step_entries[2].is_concurrent + assert workflow.step_entries[2].step_ids == ["compile_results"] + + # Fourth entry: sequential step + assert not workflow.step_entries[3].is_concurrent + assert workflow.step_entries[3].step_ids == ["final_review"] + + def test_workflow_flattened_steps(self, fixtures_dir: Path) -> None: + """Test backward-compatible flattened steps list.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + # Flattened list should include all step IDs + assert workflow.steps == [ + "setup", + "research_web", + "research_docs", + "research_interviews", + "compile_results", + "final_review", + ] + + def test_get_step_entry_for_step(self, fixtures_dir: Path) -> None: + """Test getting the step entry containing a step.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + workflow = job.workflows[0] + + # Sequential step + entry = workflow.get_step_entry_for_step("setup") + assert entry is not None + assert not entry.is_concurrent + assert entry.step_ids == ["setup"] + + # Concurrent step + entry = workflow.get_step_entry_for_step("research_web") + assert entry is not None + assert entry.is_concurrent + assert "research_web" in entry.step_ids + + def test_get_step_entry_position_in_workflow(self, fixtures_dir: Path) -> None: + """Test getting entry-based position in workflow.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + # Sequential step + result = job.get_step_entry_position_in_workflow("setup") + assert result is not None + entry_pos, total_entries, entry = result + assert entry_pos == 1 + assert total_entries == 4 + assert not entry.is_concurrent + + # Concurrent step - all share same entry position + for step_id in ["research_web", "research_docs", "research_interviews"]: + result = job.get_step_entry_position_in_workflow(step_id) + assert result is not None + entry_pos, total_entries, entry = result + assert entry_pos == 2 # All in second position + assert total_entries == 4 + assert entry.is_concurrent + + def test_get_concurrent_step_info(self, fixtures_dir: Path) -> None: + """Test getting info about position within concurrent group.""" + job_dir = fixtures_dir / "jobs" / "concurrent_steps_job" + job = parse_job_definition(job_dir) + + # Sequential step returns None + assert job.get_concurrent_step_info("setup") is None + + # Concurrent steps return their position in group + result = job.get_concurrent_step_info("research_web") + assert result == (1, 3) + + result = job.get_concurrent_step_info("research_docs") + assert result == (2, 3) + + result = job.get_concurrent_step_info("research_interviews") + assert result == (3, 3)