diff --git a/tests/shell_script_tests/README.md b/tests/shell_script_tests/README.md new file mode 100644 index 00000000..95bf0468 --- /dev/null +++ b/tests/shell_script_tests/README.md @@ -0,0 +1,76 @@ +# Shell Script Tests + +Automated tests for DeepWork shell scripts, with a focus on validating Claude Code hooks JSON response formats. + +## Scripts Tested + +| Script | Type | Description | +|--------|------|-------------| +| `policy_stop_hook.sh` | Stop Hook | Evaluates policies and blocks agent stop if policies are triggered | +| `user_prompt_submit.sh` | UserPromptSubmit Hook | Captures work tree state when user submits a prompt | +| `capture_prompt_work_tree.sh` | Helper | Records current git state for `compare_to: prompt` policies | +| `make_new_job.sh` | Utility | Creates directory structure for new DeepWork jobs | + +## Claude Code Hooks JSON Format + +Hook scripts must return valid JSON responses. The tests enforce these formats: + +### Stop Hooks (`hooks.after_agent`) +```json +{} // Allow stop +{"decision": "block", "reason": "..."} // Block stop with reason +``` + +### UserPromptSubmit Hooks (`hooks.before_prompt`) +```json +{} // No output or empty object (side-effect only hooks) +``` + +### All Hooks +- Must return valid JSON if producing output +- Non-JSON output on stdout is **not allowed** (stderr is ok) +- Exit code 0 indicates success (even when blocking) + +## Running Tests + +```bash +# Run all shell script tests +uv run pytest tests/shell_script_tests/ -v + +# Run tests for a specific script +uv run pytest tests/shell_script_tests/test_policy_stop_hook.py -v + +# Run with coverage +uv run pytest tests/shell_script_tests/ --cov=src/deepwork +``` + +## Test Structure + +``` +tests/shell_script_tests/ +├── conftest.py # Shared fixtures and helpers +├── test_policy_stop_hook.py # Stop hook blocking/allowing tests +├── test_user_prompt_submit.py # Prompt submission hook tests +├── test_capture_prompt_work_tree.py # Work tree capture tests +├── test_hooks_json_format.py # JSON format validation tests +└── test_make_new_job.py # Job directory creation tests +``` + +## Shared Fixtures + +Available in `conftest.py`: + +| Fixture | Description | +|---------|-------------| +| `git_repo` | Basic git repo with initial commit | +| `git_repo_with_policy` | Git repo with a Python file policy | +| `policy_hooks_dir` | Path to policy hooks scripts | +| `jobs_scripts_dir` | Path to job management scripts | + +## Adding New Tests + +1. Use shared fixtures from `conftest.py` when possible +2. Use `run_shell_script()` helper for running scripts +3. Validate JSON output with `validate_json_output()` and `validate_stop_hook_response()` +4. Test both success and failure cases +5. Verify exit codes (hooks should exit 0 even when blocking) diff --git a/tests/shell_script_tests/conftest.py b/tests/shell_script_tests/conftest.py new file mode 100644 index 00000000..085cf2ff --- /dev/null +++ b/tests/shell_script_tests/conftest.py @@ -0,0 +1,115 @@ +"""Shared fixtures for shell script tests.""" + +import json +import os +import subprocess +from pathlib import Path + +import pytest +from git import Repo + + +@pytest.fixture +def git_repo(tmp_path: Path) -> Path: + """Create a basic git repo for testing.""" + repo = Repo.init(tmp_path) + + readme = tmp_path / "README.md" + readme.write_text("# Test Project\n") + repo.index.add(["README.md"]) + repo.index.commit("Initial commit") + + return tmp_path + + +@pytest.fixture +def git_repo_with_policy(tmp_path: Path) -> Path: + """Create a git repo with policy that will fire.""" + repo = Repo.init(tmp_path) + + readme = tmp_path / "README.md" + readme.write_text("# Test Project\n") + repo.index.add(["README.md"]) + repo.index.commit("Initial commit") + + # Policy that triggers on any Python file + policy_file = tmp_path / ".deepwork.policy.yml" + policy_file.write_text( + """- name: "Python File Policy" + trigger: "**/*.py" + compare_to: prompt + instructions: | + Review Python files for quality. +""" + ) + + # Empty baseline so new files trigger + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir(exist_ok=True) + (deepwork_dir / ".last_work_tree").write_text("") + + return tmp_path + + +@pytest.fixture +def policy_hooks_dir() -> Path: + """Return the path to the policy hooks scripts directory.""" + return ( + Path(__file__).parent.parent.parent + / "src" + / "deepwork" + / "standard_jobs" + / "deepwork_policy" + / "hooks" + ) + + +@pytest.fixture +def jobs_scripts_dir() -> Path: + """Return the path to the jobs scripts directory.""" + return ( + Path(__file__).parent.parent.parent / "src" / "deepwork" / "standard_jobs" / "deepwork_jobs" + ) + + +def run_shell_script( + script_path: Path, + cwd: Path, + args: list[str] | None = None, + hook_input: dict | None = None, + env_extra: dict[str, str] | None = None, +) -> tuple[str, str, int]: + """ + Run a shell script and return its output. + + Args: + script_path: Path to the shell script + cwd: Working directory to run the script in + args: Optional list of arguments to pass to the script + hook_input: Optional JSON input to pass via stdin + env_extra: Optional extra environment variables + + Returns: + Tuple of (stdout, stderr, return_code) + """ + env = os.environ.copy() + env["PYTHONPATH"] = str(Path(__file__).parent.parent.parent / "src") + if env_extra: + env.update(env_extra) + + cmd = ["bash", str(script_path)] + if args: + cmd.extend(args) + + stdin_data = json.dumps(hook_input) if hook_input else "" + + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + input=stdin_data, + env=env, + ) + + return result.stdout, result.stderr, result.returncode diff --git a/tests/shell_script_tests/test_capture_prompt_work_tree.py b/tests/shell_script_tests/test_capture_prompt_work_tree.py new file mode 100644 index 00000000..4f187b13 --- /dev/null +++ b/tests/shell_script_tests/test_capture_prompt_work_tree.py @@ -0,0 +1,257 @@ +"""Tests for capture_prompt_work_tree.sh helper script. + +This script captures the git work tree state for use with +compare_to: prompt policies. It should: +1. Create .deepwork directory if needed +2. Stage all changes with git add -A +3. Record changed files to .deepwork/.last_work_tree +4. Handle various git states gracefully +""" + +from pathlib import Path + +import pytest +from git import Repo + +from .conftest import run_shell_script + + +@pytest.fixture +def git_repo_with_changes(git_repo: Path) -> Path: + """Create a git repo with uncommitted changes.""" + # Create some changed files + (git_repo / "modified.py").write_text("# Modified file\n") + (git_repo / "src").mkdir(exist_ok=True) + (git_repo / "src" / "main.py").write_text("# Main file\n") + + return git_repo + + +def run_capture_script(script_path: Path, cwd: Path) -> tuple[str, str, int]: + """Run the capture_prompt_work_tree.sh script.""" + return run_shell_script(script_path, cwd) + + +class TestCapturePromptWorkTreeBasic: + """Basic functionality tests for capture_prompt_work_tree.sh.""" + + def test_exits_successfully(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the script exits with code 0.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + assert code == 0, f"Expected exit code 0, got {code}. stderr: {stderr}" + + def test_creates_deepwork_directory(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the script creates .deepwork directory.""" + deepwork_dir = git_repo / ".deepwork" + assert not deepwork_dir.exists(), "Precondition: .deepwork should not exist" + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + assert code == 0, f"Script failed with stderr: {stderr}" + assert deepwork_dir.exists(), "Script should create .deepwork directory" + + def test_creates_last_work_tree_file(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the script creates .last_work_tree file.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + assert code == 0, f"Script failed with stderr: {stderr}" + assert work_tree_file.exists(), "Script should create .last_work_tree file" + + def test_empty_repo_produces_empty_file(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that a clean repo produces an empty work tree file.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + # Clean repo should have empty or minimal content + # May have .deepwork/.last_work_tree itself listed + assert code == 0, f"Script failed with stderr: {stderr}" + + +class TestCapturePromptWorkTreeFileTracking: + """Tests for file tracking behavior in capture_prompt_work_tree.sh.""" + + def test_captures_staged_files(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that staged files are captured.""" + # Create and stage a file + new_file = git_repo / "staged.py" + new_file.write_text("# Staged file\n") + repo = Repo(git_repo) + repo.index.add(["staged.py"]) + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + assert "staged.py" in content, "Staged file should be in work tree" + + def test_captures_unstaged_changes(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that unstaged changes are captured (after staging by script).""" + # Create an unstaged file + unstaged = git_repo / "unstaged.py" + unstaged.write_text("# Unstaged file\n") + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + assert "unstaged.py" in content, "Unstaged file should be captured" + + def test_captures_files_in_subdirectories(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that files in subdirectories are captured.""" + # Create files in nested directories + src_dir = git_repo / "src" / "components" + src_dir.mkdir(parents=True) + (src_dir / "button.py").write_text("# Button component\n") + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + assert "src/components/button.py" in content, "Nested file should be captured" + + def test_captures_multiple_files( + self, policy_hooks_dir: Path, git_repo_with_changes: Path + ) -> None: + """Test that multiple files are captured.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo_with_changes) + + work_tree_file = git_repo_with_changes / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + assert "modified.py" in content, "Modified file should be captured" + assert "src/main.py" in content, "File in src/ should be captured" + + def test_file_list_is_sorted_and_unique(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the file list is sorted and deduplicated.""" + # Create multiple files + (git_repo / "z_file.py").write_text("# Z file\n") + (git_repo / "a_file.py").write_text("# A file\n") + (git_repo / "m_file.py").write_text("# M file\n") + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + lines = [line for line in work_tree_file.read_text().strip().split("\n") if line] + + # Extract just the test files we created (filter out .deepwork files) + test_files = [f for f in lines if f.endswith("_file.py")] + + assert code == 0, f"Script failed with stderr: {stderr}" + assert test_files == sorted(test_files), "Files should be sorted" + assert len(test_files) == len(set(test_files)), "Files should be unique" + + +class TestCapturePromptWorkTreeGitStates: + """Tests for handling various git states in capture_prompt_work_tree.sh.""" + + def test_handles_deleted_files(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that deleted files are handled gracefully.""" + # Create and commit a file, then delete it + to_delete = git_repo / "to_delete.py" + to_delete.write_text("# Will be deleted\n") + repo = Repo(git_repo) + repo.index.add(["to_delete.py"]) + repo.index.commit("Add file to delete") + + # Now delete it + to_delete.unlink() + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + assert code == 0, f"Script should handle deletions. stderr: {stderr}" + + def test_handles_renamed_files(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that renamed files are tracked.""" + # Create and commit a file + old_name = git_repo / "old_name.py" + old_name.write_text("# Original file\n") + repo = Repo(git_repo) + repo.index.add(["old_name.py"]) + repo.index.commit("Add original file") + + # Rename it + new_name = git_repo / "new_name.py" + old_name.rename(new_name) + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + # Both old (deleted) and new should appear as changes + assert "new_name.py" in content, "New filename should be captured" + + def test_handles_modified_files(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that modified committed files are tracked.""" + # Modify an existing committed file + readme = git_repo / "README.md" + readme.write_text("# Modified content\n") + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert code == 0, f"Script failed with stderr: {stderr}" + assert "README.md" in content, "Modified file should be captured" + + +class TestCapturePromptWorkTreeIdempotence: + """Tests for idempotent behavior of capture_prompt_work_tree.sh.""" + + def test_multiple_runs_succeed(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the script can be run multiple times.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + + for i in range(3): + stdout, stderr, code = run_capture_script(script_path, git_repo) + assert code == 0, f"Run {i + 1} failed with stderr: {stderr}" + + def test_updates_on_new_changes(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that subsequent runs capture new changes.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + + # First run + run_capture_script(script_path, git_repo) + + # Add a new file + (git_repo / "new_file.py").write_text("# New\n") + + # Second run + run_capture_script(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + + assert "new_file.py" in content, "New file should be captured" + + def test_existing_deepwork_dir_not_error(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that existing .deepwork directory is not an error.""" + # Pre-create the directory + (git_repo / ".deepwork").mkdir() + + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_capture_script(script_path, git_repo) + + assert code == 0, f"Should handle existing .deepwork dir. stderr: {stderr}" diff --git a/tests/shell_script_tests/test_hooks_json_format.py b/tests/shell_script_tests/test_hooks_json_format.py new file mode 100644 index 00000000..14de1b21 --- /dev/null +++ b/tests/shell_script_tests/test_hooks_json_format.py @@ -0,0 +1,363 @@ +"""Tests for Claude Code hooks JSON format validation. + +Claude Code hooks have specific JSON response formats that must be followed: + +Stop hooks (hooks.after_agent): + - {} - Allow stop (empty object) + - {"decision": "block", "reason": "..."} - Block stop with reason + +UserPromptSubmit hooks (hooks.before_prompt): + - {} - No response needed (empty object) + - No output - Also acceptable + +BeforeTool hooks (hooks.before_tool): + - {} - Allow tool execution + - {"decision": "block", "reason": "..."} - Block tool execution + +All hooks: + - Must return valid JSON if producing output + - Must not contain non-JSON output on stdout (stderr is ok) + - Exit code 0 indicates success +""" + +import json +import os +import tempfile +from pathlib import Path + +import pytest +from git import Repo + +from .conftest import run_shell_script + + +def run_hook_script( + script_path: Path, + cwd: Path, + hook_input: dict | None = None, +) -> tuple[str, str, int]: + """Run a hook script and return its output.""" + return run_shell_script(script_path, cwd, hook_input=hook_input) + + +def validate_json_output(output: str) -> dict | None: + """ + Validate that output is valid JSON or empty. + + Args: + output: The stdout from a hook script + + Returns: + Parsed JSON dict, or None if empty/no output + + Raises: + AssertionError: If output is invalid JSON + """ + stripped = output.strip() + + if not stripped: + return None + + try: + result = json.loads(stripped) + assert isinstance(result, dict), "Hook output must be a JSON object" + return result + except json.JSONDecodeError as e: + pytest.fail(f"Invalid JSON output: {stripped!r}. Error: {e}") + + +def validate_stop_hook_response(response: dict | None) -> None: + """ + Validate a Stop hook response follows Claude Code format. + + Args: + response: Parsed JSON response or None + + Raises: + AssertionError: If response format is invalid + """ + if response is None: + # No output is acceptable for stop hooks + return + + if response == {}: + # Empty object means allow stop + return + + # Must have decision and reason for blocking + assert "decision" in response, ( + f"Stop hook blocking response must have 'decision' key: {response}" + ) + assert response["decision"] == "block", ( + f"Stop hook decision must be 'block', got: {response['decision']}" + ) + assert "reason" in response, f"Stop hook blocking response must have 'reason' key: {response}" + assert isinstance(response["reason"], str), f"Stop hook reason must be a string: {response}" + + # Reason should not be empty when blocking + assert response["reason"].strip(), "Stop hook blocking reason should not be empty" + + +def validate_prompt_hook_response(response: dict | None) -> None: + """ + Validate a UserPromptSubmit hook response. + + Args: + response: Parsed JSON response or None + + Raises: + AssertionError: If response format is invalid + """ + if response is None: + # No output is acceptable + return + + # Empty object or valid JSON object is fine + assert isinstance(response, dict), f"Prompt hook output must be a JSON object: {response}" + + +class TestPolicyStopHookJsonFormat: + """Tests specifically for policy_stop_hook.sh JSON format compliance.""" + + def test_allow_response_is_empty_json(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that allow response is empty JSON object.""" + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + response = validate_json_output(stdout) + validate_stop_hook_response(response) + + if response is not None: + assert response == {}, f"Allow response should be empty: {response}" + + def test_block_response_has_required_fields( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that block response has decision and reason.""" + # Create a file that triggers the policy + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy) + + response = validate_json_output(stdout) + validate_stop_hook_response(response) + + # Should be blocking + assert response is not None, "Expected blocking response" + assert response.get("decision") == "block", "Expected block decision" + assert "reason" in response, "Expected reason field" + + def test_block_reason_contains_policy_info( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that block reason contains policy information.""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy) + + response = validate_json_output(stdout) + + assert response is not None, "Expected blocking response" + reason = response.get("reason", "") + + # Should contain useful policy information + assert "Policy" in reason or "policy" in reason, f"Reason should mention policy: {reason}" + + def test_no_extraneous_keys_in_response( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that response only contains expected keys.""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy) + + response = validate_json_output(stdout) + + if response and response != {}: + # Only decision and reason are valid keys for stop hooks + valid_keys = {"decision", "reason"} + actual_keys = set(response.keys()) + assert actual_keys <= valid_keys, ( + f"Unexpected keys in response: {actual_keys - valid_keys}" + ) + + def test_output_is_single_line_json( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that JSON output is single-line (no pretty printing).""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy) + + # Remove trailing newline and check for internal newlines + output = stdout.strip() + if output: + # JSON output should ideally be single line + # Multiple lines could indicate print statements or logging + lines = output.split("\n") + # Only the last line should be JSON + json_line = lines[-1] + # Verify the JSON is parseable + json.loads(json_line) + + +class TestUserPromptSubmitHookJsonFormat: + """Tests for user_prompt_submit.sh JSON format compliance.""" + + def test_output_is_valid_json_or_empty(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that output is valid JSON or empty.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + response = validate_json_output(stdout) + validate_prompt_hook_response(response) + + def test_does_not_block_prompt_submission(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that hook does not block prompt submission.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + response = validate_json_output(stdout) + + # UserPromptSubmit hooks should not block + if response: + assert response.get("decision") != "block", ( + "UserPromptSubmit hook should not return block decision" + ) + + +class TestHooksJsonFormatWithTranscript: + """Tests for hook JSON format when using transcript input.""" + + def test_stop_hook_with_transcript_input( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test stop hook JSON format when transcript is provided.""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + # Create mock transcript + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + transcript_path = f.name + f.write( + json.dumps( + { + "role": "assistant", + "message": {"content": [{"type": "text", "text": "Hello"}]}, + } + ) + ) + f.write("\n") + + try: + script_path = policy_hooks_dir / "policy_stop_hook.sh" + hook_input = {"transcript_path": transcript_path} + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy, hook_input) + + response = validate_json_output(stdout) + validate_stop_hook_response(response) + + finally: + os.unlink(transcript_path) + + def test_stop_hook_with_promise_returns_empty( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that promised policies return empty JSON.""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + # Create transcript with promise tag + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + transcript_path = f.name + f.write( + json.dumps( + { + "role": "assistant", + "message": { + "content": [ + { + "type": "text", + "text": "✓ Python File Policy", + } + ] + }, + } + ) + ) + f.write("\n") + + try: + script_path = policy_hooks_dir / "policy_stop_hook.sh" + hook_input = {"transcript_path": transcript_path} + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy, hook_input) + + response = validate_json_output(stdout) + validate_stop_hook_response(response) + + # Should be empty (allow) because policy was promised + if response is not None: + assert response == {}, f"Expected empty response: {response}" + + finally: + os.unlink(transcript_path) + + +class TestHooksExitCodes: + """Tests for hook script exit codes.""" + + def test_stop_hook_exits_zero_on_allow(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that stop hook exits 0 when allowing.""" + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + assert code == 0, f"Allow should exit 0. stderr: {stderr}" + + def test_stop_hook_exits_zero_on_block( + self, policy_hooks_dir: Path, git_repo_with_policy: Path + ) -> None: + """Test that stop hook exits 0 even when blocking.""" + py_file = git_repo_with_policy / "test.py" + py_file.write_text("# Python file\n") + repo = Repo(git_repo_with_policy) + repo.index.add(["test.py"]) + + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo_with_policy) + + # Hooks should exit 0 and communicate via JSON + assert code == 0, f"Block should still exit 0. stderr: {stderr}" + + def test_user_prompt_hook_exits_zero(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that user prompt hook always exits 0.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + assert code == 0, f"User prompt hook should exit 0. stderr: {stderr}" + + def test_capture_script_exits_zero(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that capture script exits 0.""" + script_path = policy_hooks_dir / "capture_prompt_work_tree.sh" + stdout, stderr, code = run_hook_script(script_path, git_repo) + + assert code == 0, f"Capture script should exit 0. stderr: {stderr}" diff --git a/tests/shell_script_tests/test_make_new_job.py b/tests/shell_script_tests/test_make_new_job.py new file mode 100644 index 00000000..913d66ea --- /dev/null +++ b/tests/shell_script_tests/test_make_new_job.py @@ -0,0 +1,313 @@ +"""Tests for make_new_job.sh utility script. + +This script creates the directory structure for a new DeepWork job. +It should: +1. Validate job name format (lowercase, letters/numbers/underscores) +2. Create the job directory structure under .deepwork/jobs/ +3. Create required subdirectories (steps/, hooks/, templates/) +4. Create AGENTS.md with guidance +5. Handle existing jobs gracefully (error) +6. Handle missing .deepwork directory by creating it +""" + +from pathlib import Path + +import pytest + +from .conftest import run_shell_script + + +@pytest.fixture +def project_dir(tmp_path: Path) -> Path: + """Create a basic project directory.""" + return tmp_path + + +@pytest.fixture +def project_with_deepwork(tmp_path: Path) -> Path: + """Create a project with existing .deepwork/jobs directory.""" + jobs_dir = tmp_path / ".deepwork" / "jobs" + jobs_dir.mkdir(parents=True) + return tmp_path + + +def run_make_new_job( + script_path: Path, + cwd: Path, + job_name: str | None = None, +) -> tuple[str, str, int]: + """Run the make_new_job.sh script.""" + args = [job_name] if job_name else None + return run_shell_script(script_path, cwd, args=args, env_extra={"NO_COLOR": "1"}) + + +class TestMakeNewJobUsage: + """Tests for make_new_job.sh usage and help output.""" + + def test_shows_usage_without_arguments(self, jobs_scripts_dir: Path, project_dir: Path) -> None: + """Test that the script shows usage when called without arguments.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_dir) + + assert code == 1, "Should exit with error when no arguments" + assert "Usage:" in stdout, "Should show usage information" + assert "job_name" in stdout.lower(), "Should mention job_name argument" + + def test_shows_example_in_usage(self, jobs_scripts_dir: Path, project_dir: Path) -> None: + """Test that the usage includes an example.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_dir) + + assert "Example:" in stdout, "Should show example usage" + + +class TestMakeNewJobNameValidation: + """Tests for job name validation in make_new_job.sh.""" + + def test_accepts_lowercase_name( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that lowercase names are accepted.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "valid_job") + + assert code == 0, f"Should accept lowercase name. stderr: {stderr}" + + def test_accepts_name_with_numbers( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that names with numbers are accepted.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "job123") + + assert code == 0, f"Should accept name with numbers. stderr: {stderr}" + + def test_accepts_name_with_underscores( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that names with underscores are accepted.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "my_new_job") + + assert code == 0, f"Should accept underscores. stderr: {stderr}" + + def test_rejects_uppercase_name( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that uppercase names are rejected.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "InvalidJob") + + assert code != 0, "Should reject uppercase name" + # Check for error message in stdout (script uses echo) + output = stdout + stderr + assert "invalid" in output.lower() or "error" in output.lower(), ( + "Should show error for invalid name" + ) + + def test_rejects_name_starting_with_number( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that names starting with numbers are rejected.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "123job") + + assert code != 0, "Should reject name starting with number" + + def test_rejects_name_with_hyphens( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that names with hyphens are rejected.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "my-job") + + assert code != 0, "Should reject name with hyphens" + + def test_rejects_name_with_spaces( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that names with spaces are rejected.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + # This will be passed as two arguments by bash, causing an error + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "my job") + + # Either fails validation or treats "job" as separate (job is valid name) + # The key is it shouldn't create "my job" as a directory name + bad_dir = project_with_deepwork / ".deepwork" / "jobs" / "my job" + assert not bad_dir.exists(), "Should not create directory with space in name" + + +class TestMakeNewJobDirectoryStructure: + """Tests for directory structure creation in make_new_job.sh.""" + + def test_creates_main_job_directory( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that the main job directory is created.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + job_dir = project_with_deepwork / ".deepwork" / "jobs" / "test_job" + assert job_dir.exists(), "Job directory should be created" + assert job_dir.is_dir(), "Job path should be a directory" + + def test_creates_steps_directory( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that steps/ subdirectory is created.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + steps_dir = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "steps" + assert steps_dir.exists(), "steps/ directory should be created" + assert steps_dir.is_dir(), "steps/ should be a directory" + + def test_creates_hooks_directory( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that hooks/ subdirectory is created.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + hooks_dir = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "hooks" + assert hooks_dir.exists(), "hooks/ directory should be created" + assert hooks_dir.is_dir(), "hooks/ should be a directory" + + def test_creates_templates_directory( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that templates/ subdirectory is created.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + templates_dir = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "templates" + assert templates_dir.exists(), "templates/ directory should be created" + assert templates_dir.is_dir(), "templates/ should be a directory" + + def test_creates_gitkeep_files( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that .gitkeep files are created in empty directories.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + job_dir = project_with_deepwork / ".deepwork" / "jobs" / "test_job" + + hooks_gitkeep = job_dir / "hooks" / ".gitkeep" + templates_gitkeep = job_dir / "templates" / ".gitkeep" + + assert hooks_gitkeep.exists(), "hooks/.gitkeep should be created" + assert templates_gitkeep.exists(), "templates/.gitkeep should be created" + + def test_creates_agents_md(self, jobs_scripts_dir: Path, project_with_deepwork: Path) -> None: + """Test that AGENTS.md file is created.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + agents_md = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "AGENTS.md" + assert agents_md.exists(), "AGENTS.md should be created" + + content = agents_md.read_text() + assert "Job Management" in content, "AGENTS.md should have job management content" + assert "deepwork_jobs" in content, "AGENTS.md should reference deepwork_jobs" + + +class TestMakeNewJobAgentsMdContent: + """Tests for AGENTS.md content in make_new_job.sh.""" + + def test_agents_md_contains_slash_commands( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that AGENTS.md lists recommended slash commands.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + agents_md = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "AGENTS.md" + content = agents_md.read_text() + + assert "/deepwork_jobs.define" in content, "Should mention define command" + assert "/deepwork_jobs.implement" in content, "Should mention implement command" + assert "/deepwork_jobs.learn" in content, "Should mention learn command" + + def test_agents_md_contains_directory_structure( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that AGENTS.md documents the directory structure.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "test_job") + + agents_md = project_with_deepwork / ".deepwork" / "jobs" / "test_job" / "AGENTS.md" + content = agents_md.read_text() + + assert "job.yml" in content, "Should mention job.yml" + assert "steps/" in content, "Should document steps directory" + assert "hooks/" in content, "Should document hooks directory" + assert "templates/" in content, "Should document templates directory" + + +class TestMakeNewJobErrorHandling: + """Tests for error handling in make_new_job.sh.""" + + def test_fails_if_job_already_exists( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that creating a job that already exists fails.""" + # First create the job + script_path = jobs_scripts_dir / "make_new_job.sh" + run_make_new_job(script_path, project_with_deepwork, "existing_job") + + # Try to create it again + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "existing_job") + + assert code != 0, "Should fail when job already exists" + output = stdout + stderr + assert "exist" in output.lower() or "error" in output.lower(), ( + "Should mention that job exists" + ) + + def test_creates_deepwork_directory_if_missing( + self, jobs_scripts_dir: Path, project_dir: Path + ) -> None: + """Test that .deepwork/jobs is created if it doesn't exist.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_dir, "new_job") + + assert code == 0, f"Should succeed even without .deepwork. stderr: {stderr}" + + job_dir = project_dir / ".deepwork" / "jobs" / "new_job" + assert job_dir.exists(), "Should create .deepwork/jobs/new_job" + + +class TestMakeNewJobOutput: + """Tests for output messages in make_new_job.sh.""" + + def test_shows_success_message( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that success message is shown.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "new_job") + + assert code == 0, f"Should succeed. stderr: {stderr}" + # Check for informational output + assert "new_job" in stdout, "Output should mention job name" + + def test_shows_next_steps(self, jobs_scripts_dir: Path, project_with_deepwork: Path) -> None: + """Test that next steps are shown after creation.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "new_job") + + assert code == 0, f"Should succeed. stderr: {stderr}" + # Should mention what to do next + assert "next" in stdout.lower() or "step" in stdout.lower(), "Should show next steps" + + def test_shows_directory_structure_created( + self, jobs_scripts_dir: Path, project_with_deepwork: Path + ) -> None: + """Test that created directory structure is shown.""" + script_path = jobs_scripts_dir / "make_new_job.sh" + stdout, stderr, code = run_make_new_job(script_path, project_with_deepwork, "new_job") + + assert code == 0, f"Should succeed. stderr: {stderr}" + # Should show what was created + assert "AGENTS.md" in stdout or "steps" in stdout, "Should show created structure" diff --git a/tests/shell_script_tests/test_policy_stop_hook.py b/tests/shell_script_tests/test_policy_stop_hook.py index 1134b267..07a2d221 100644 --- a/tests/shell_script_tests/test_policy_stop_hook.py +++ b/tests/shell_script_tests/test_policy_stop_hook.py @@ -6,40 +6,25 @@ import json import os -import subprocess import tempfile from pathlib import Path import pytest from git import Repo - -@pytest.fixture -def shell_scripts_dir() -> Path: - """Return the path to the source shell scripts directory.""" - return ( - Path(__file__).parent.parent.parent - / "src" - / "deepwork" - / "standard_jobs" - / "deepwork_policy" - / "hooks" - ) +from .conftest import run_shell_script @pytest.fixture -def git_repo_with_policy(tmp_path: Path) -> Path: - """Create a git repo with a policy file and trigger a policy.""" - # Initialize git repo +def git_repo_with_src_policy(tmp_path: Path) -> Path: + """Create a git repo with a policy file that triggers on src/** changes.""" repo = Repo.init(tmp_path) - # Create initial commit readme = tmp_path / "README.md" readme.write_text("# Test Project\n") repo.index.add(["README.md"]) repo.index.commit("Initial commit") - # Create a policy file that triggers on src/** changes # Use compare_to: prompt since test repos don't have origin remote policy_file = tmp_path / ".deepwork.policy.yml" policy_file.write_text( @@ -52,82 +37,42 @@ def git_repo_with_policy(tmp_path: Path) -> Path: """ ) - # Create .deepwork directory with empty baseline - # (so new files are detected as "changed since prompt") + # Empty baseline means all current files are "new" deepwork_dir = tmp_path / ".deepwork" deepwork_dir.mkdir(exist_ok=True) - # Empty baseline means all current files are "new" (deepwork_dir / ".last_work_tree").write_text("") return tmp_path -@pytest.fixture -def git_repo_no_policy(tmp_path: Path) -> Path: - """Create a git repo without a policy file.""" - repo = Repo.init(tmp_path) - - readme = tmp_path / "README.md" - readme.write_text("# Test Project\n") - repo.index.add(["README.md"]) - repo.index.commit("Initial commit") - - return tmp_path - - def run_stop_hook( script_path: Path, cwd: Path, hook_input: dict | None = None, ) -> tuple[str, str, int]: - """ - Run the policy_stop_hook.sh script and return its output. - - Args: - script_path: Path to the policy_stop_hook.sh script - cwd: Working directory to run the script in - hook_input: Optional JSON input to pass via stdin - - Returns: - Tuple of (stdout, stderr, return_code) - """ - env = os.environ.copy() - # Ensure Python can find the deepwork module - env["PYTHONPATH"] = str(Path(__file__).parent.parent.parent / "src") - - stdin_data = json.dumps(hook_input) if hook_input else "" - - result = subprocess.run( - ["bash", str(script_path)], - cwd=cwd, - capture_output=True, - text=True, - input=stdin_data, - env=env, - ) - - return result.stdout, result.stderr, result.returncode + """Run the policy_stop_hook.sh script and return its output.""" + return run_shell_script(script_path, cwd, hook_input=hook_input) class TestPolicyStopHookBlocking: """Tests for policy_stop_hook.sh blocking behavior.""" def test_outputs_block_json_when_policy_fires( - self, shell_scripts_dir: Path, git_repo_with_policy: Path + self, policy_hooks_dir: Path, git_repo_with_src_policy: Path ) -> None: """Test that the hook outputs blocking JSON when a policy fires.""" # Create a file that triggers the policy - src_dir = git_repo_with_policy / "src" + src_dir = git_repo_with_src_policy / "src" src_dir.mkdir(exist_ok=True) (src_dir / "main.py").write_text("# New file\n") # Stage the change - repo = Repo(git_repo_with_policy) + repo = Repo(git_repo_with_src_policy) repo.index.add(["src/main.py"]) # Run the stop hook - script_path = shell_scripts_dir / "policy_stop_hook.sh" - stdout, stderr, code = run_stop_hook(script_path, git_repo_with_policy) + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_stop_hook(script_path, git_repo_with_src_policy) # Parse the output as JSON output = stdout.strip() @@ -145,15 +90,15 @@ def test_outputs_block_json_when_policy_fires( assert "Test Policy" in result["reason"], f"Policy name not in reason: {result}" def test_outputs_empty_json_when_no_policy_fires( - self, shell_scripts_dir: Path, git_repo_with_policy: Path + self, policy_hooks_dir: Path, git_repo_with_src_policy: Path ) -> None: """Test that the hook outputs empty JSON when no policy fires.""" # Don't create any files that would trigger the policy # (policy triggers on src/** but we haven't created anything in src/) # Run the stop hook - script_path = shell_scripts_dir / "policy_stop_hook.sh" - stdout, stderr, code = run_stop_hook(script_path, git_repo_with_policy) + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_stop_hook(script_path, git_repo_with_src_policy) # Parse the output as JSON output = stdout.strip() @@ -167,12 +112,10 @@ def test_outputs_empty_json_when_no_policy_fires( # Should be empty JSON (no blocking) assert result == {}, f"Expected empty JSON when no policies fire, got: {result}" - def test_exits_early_when_no_policy_file( - self, shell_scripts_dir: Path, git_repo_no_policy: Path - ) -> None: + def test_exits_early_when_no_policy_file(self, policy_hooks_dir: Path, git_repo: Path) -> None: """Test that the hook exits cleanly when no policy file exists.""" - script_path = shell_scripts_dir / "policy_stop_hook.sh" - stdout, stderr, code = run_stop_hook(script_path, git_repo_no_policy) + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_stop_hook(script_path, git_repo) # Should exit with code 0 and produce no output (or empty) assert code == 0, f"Expected exit code 0, got {code}. stderr: {stderr}" @@ -188,16 +131,16 @@ def test_exits_early_when_no_policy_file( pass def test_respects_promise_tags( - self, shell_scripts_dir: Path, git_repo_with_policy: Path + self, policy_hooks_dir: Path, git_repo_with_src_policy: Path ) -> None: """Test that promised policies are not re-triggered.""" # Create a file that triggers the policy - src_dir = git_repo_with_policy / "src" + src_dir = git_repo_with_src_policy / "src" src_dir.mkdir(exist_ok=True) (src_dir / "main.py").write_text("# New file\n") # Stage the change - repo = Repo(git_repo_with_policy) + repo = Repo(git_repo_with_src_policy) repo.index.add(["src/main.py"]) # Create a mock transcript with the promise tag @@ -223,9 +166,9 @@ def test_respects_promise_tags( try: # Run the stop hook with transcript path - script_path = shell_scripts_dir / "policy_stop_hook.sh" + script_path = policy_hooks_dir / "policy_stop_hook.sh" hook_input = {"transcript_path": transcript_path} - stdout, stderr, code = run_stop_hook(script_path, git_repo_with_policy, hook_input) + stdout, stderr, code = run_stop_hook(script_path, git_repo_with_src_policy, hook_input) # Parse the output output = stdout.strip() @@ -238,7 +181,7 @@ def test_respects_promise_tags( finally: os.unlink(transcript_path) - def test_safety_pattern_prevents_firing(self, shell_scripts_dir: Path, tmp_path: Path) -> None: + def test_safety_pattern_prevents_firing(self, policy_hooks_dir: Path, tmp_path: Path) -> None: """Test that safety patterns prevent policies from firing.""" # Initialize git repo repo = Repo.init(tmp_path) @@ -279,7 +222,7 @@ def test_safety_pattern_prevents_firing(self, shell_scripts_dir: Path, tmp_path: repo.index.add(["src/main.py", "docs/api.md"]) # Run the stop hook - script_path = shell_scripts_dir / "policy_stop_hook.sh" + script_path = policy_hooks_dir / "policy_stop_hook.sh" stdout, stderr, code = run_stop_hook(script_path, tmp_path) # Parse the output @@ -296,19 +239,19 @@ class TestPolicyStopHookJsonFormat: """Tests for the JSON output format of policy_stop_hook.sh.""" def test_json_has_correct_structure( - self, shell_scripts_dir: Path, git_repo_with_policy: Path + self, policy_hooks_dir: Path, git_repo_with_src_policy: Path ) -> None: """Test that blocking JSON has the correct Claude Code structure.""" # Create a file that triggers the policy - src_dir = git_repo_with_policy / "src" + src_dir = git_repo_with_src_policy / "src" src_dir.mkdir(exist_ok=True) (src_dir / "main.py").write_text("# New file\n") - repo = Repo(git_repo_with_policy) + repo = Repo(git_repo_with_src_policy) repo.index.add(["src/main.py"]) - script_path = shell_scripts_dir / "policy_stop_hook.sh" - stdout, stderr, code = run_stop_hook(script_path, git_repo_with_policy) + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_stop_hook(script_path, git_repo_with_src_policy) result = json.loads(stdout.strip()) @@ -322,18 +265,18 @@ def test_json_has_correct_structure( assert len(result["reason"]) > 0 def test_reason_contains_policy_instructions( - self, shell_scripts_dir: Path, git_repo_with_policy: Path + self, policy_hooks_dir: Path, git_repo_with_src_policy: Path ) -> None: """Test that the reason includes the policy instructions.""" - src_dir = git_repo_with_policy / "src" + src_dir = git_repo_with_src_policy / "src" src_dir.mkdir(exist_ok=True) (src_dir / "main.py").write_text("# New file\n") - repo = Repo(git_repo_with_policy) + repo = Repo(git_repo_with_src_policy) repo.index.add(["src/main.py"]) - script_path = shell_scripts_dir / "policy_stop_hook.sh" - stdout, stderr, code = run_stop_hook(script_path, git_repo_with_policy) + script_path = policy_hooks_dir / "policy_stop_hook.sh" + stdout, stderr, code = run_stop_hook(script_path, git_repo_with_src_policy) result = json.loads(stdout.strip()) diff --git a/tests/shell_script_tests/test_user_prompt_submit.py b/tests/shell_script_tests/test_user_prompt_submit.py new file mode 100644 index 00000000..b503727b --- /dev/null +++ b/tests/shell_script_tests/test_user_prompt_submit.py @@ -0,0 +1,166 @@ +"""Tests for user_prompt_submit.sh shell script. + +This script is called as a Claude Code UserPromptSubmit hook. +It should: +1. Execute successfully (exit code 0) +2. Output valid JSON or no output (hooks allow both) +3. Capture work tree state by calling capture_prompt_work_tree.sh +""" + +import json +from pathlib import Path + +import pytest +from git import Repo + +from .conftest import run_shell_script + + +def run_user_prompt_submit_hook( + script_path: Path, + cwd: Path, + hook_input: dict | None = None, +) -> tuple[str, str, int]: + """Run the user_prompt_submit.sh script and return its output.""" + return run_shell_script(script_path, cwd, hook_input=hook_input) + + +class TestUserPromptSubmitHookExecution: + """Tests for user_prompt_submit.sh execution behavior.""" + + def test_exits_successfully(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook exits with code 0.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + assert code == 0, f"Expected exit code 0, got {code}. stderr: {stderr}" + + def test_creates_deepwork_directory(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook creates .deepwork directory if it doesn't exist.""" + deepwork_dir = git_repo / ".deepwork" + assert not deepwork_dir.exists(), "Precondition: .deepwork should not exist" + + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + assert code == 0, f"Script failed with stderr: {stderr}" + assert deepwork_dir.exists(), "Hook should create .deepwork directory" + + def test_creates_last_work_tree_file(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook creates .deepwork/.last_work_tree file.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + assert code == 0, f"Script failed with stderr: {stderr}" + assert work_tree_file.exists(), "Hook should create .last_work_tree file" + + def test_captures_staged_changes(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook captures staged file changes.""" + # Create and stage a new file + new_file = git_repo / "new_file.py" + new_file.write_text("# New file\n") + repo = Repo(git_repo) + repo.index.add(["new_file.py"]) + + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + assert code == 0, f"Script failed with stderr: {stderr}" + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + assert "new_file.py" in content, "Staged file should be captured" + + def test_captures_untracked_files(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook captures untracked files.""" + # Create an untracked file (don't stage it) + untracked = git_repo / "untracked.txt" + untracked.write_text("untracked content\n") + + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + assert code == 0, f"Script failed with stderr: {stderr}" + + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + content = work_tree_file.read_text() + # After running the hook, files are staged, so check for the file + assert "untracked.txt" in content, "Untracked file should be captured" + + +class TestUserPromptSubmitHookJsonOutput: + """Tests for user_prompt_submit.sh JSON output format. + + Claude Code UserPromptSubmit hooks can output: + - Empty output (most common for side-effect-only hooks) + - Valid JSON (if the hook needs to communicate something) + + Either is acceptable; invalid JSON is NOT acceptable. + """ + + def test_output_is_empty_or_valid_json(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that output is either empty or valid JSON.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + output = stdout.strip() + + if output: + # If there's output, it must be valid JSON + try: + result = json.loads(output) + assert isinstance(result, dict), "JSON output should be an object" + except json.JSONDecodeError as e: + pytest.fail(f"Output is not valid JSON: {output!r}. Error: {e}") + + def test_does_not_block_prompt(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook does not return a blocking response.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + + output = stdout.strip() + + if output: + try: + result = json.loads(output) + # UserPromptSubmit hooks should not block + assert result.get("decision") != "block", ( + "UserPromptSubmit hook should not block prompt submission" + ) + except json.JSONDecodeError: + pass # Empty or non-JSON output is fine + + +class TestUserPromptSubmitHookIdempotence: + """Tests for idempotent behavior of user_prompt_submit.sh.""" + + def test_multiple_runs_succeed(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that the hook can be run multiple times successfully.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + + # Run multiple times + for i in range(3): + stdout, stderr, code = run_user_prompt_submit_hook(script_path, git_repo) + assert code == 0, f"Run {i + 1} failed with stderr: {stderr}" + + def test_updates_work_tree_on_new_changes(self, policy_hooks_dir: Path, git_repo: Path) -> None: + """Test that subsequent runs update the work tree state.""" + script_path = policy_hooks_dir / "user_prompt_submit.sh" + repo = Repo(git_repo) + + # First run - capture initial state + run_user_prompt_submit_hook(script_path, git_repo) + work_tree_file = git_repo / ".deepwork" / ".last_work_tree" + assert work_tree_file.exists(), "Work tree file should exist after first run" + + # Create and stage a new file + new_file = git_repo / "another_file.py" + new_file.write_text("# Another file\n") + repo.index.add(["another_file.py"]) + + # Second run - should capture new file + run_user_prompt_submit_hook(script_path, git_repo) + updated_content = work_tree_file.read_text() + + assert "another_file.py" in updated_content, "New file should be captured" diff --git a/uv.lock b/uv.lock index d780f0db..ccd8b049 100644 --- a/uv.lock +++ b/uv.lock @@ -126,7 +126,7 @@ toml = [ [[package]] name = "deepwork" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "click" },