Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .claude/commands/help-review.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Your job is to help me understand changes made to $ARGUMENTS by line/section changed.
**Current branch:** !`git branch --show-current`
If there is `PLAN.md` or `PLAN_<somethign_related_to_current_branch>.md`, read it before starting.

Here is the diff of the changes:

<changes>
!`git show -p --no-ext-diff -- $ARGUMENTS`
</changes>
3 changes: 3 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"Bash(./scripts/atlas-migrate-hash:*)",
"Bash(./scripts/run-test-with-colors:*)",
"Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -c:*)",
"Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -f:*)",
"Bash(bin/run-test-with-colors:*)",
"Bash(cat:*)",
"Bash(cd:*)",
Expand All @@ -17,6 +18,8 @@
"Bash(gh run list:*)",
"Bash(gh run view:*)",
"Bash(git rm:*)",
"Bash(git show -p --no-ext-diff --:*)",
"Bash(git whatchanged:*)",
"Bash(grep:*)",
"Bash(ls:*)",
"Bash(mkdir:*)",
Expand Down
3 changes: 2 additions & 1 deletion .claude/sql_style.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ Always qualify columns and arguments:
- `start_flow.run_id` not just `run_id` in functions

## Keyword Arguments
Use `param => "value"` NOT `param := "value"`
Use `param => "value"` NOT `param := "value"`
- Note on aliasing tables: when writing SQL functions and working with dependencies/dependents and steps and states, I want you to build your aliases such that you use parent/child prefixes and _step (for pgflow.steps) or _state (for pgflow.step_states) suffixes accordingly. dep should mean a row in pgflow.deps, not a parent dependency. do not use dep to indicate a row from steps or step_states.
59 changes: 45 additions & 14 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

**NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.**

### Current State
### Features

- ✅ **WORKING**: Empty array maps (taskless) cascade and complete correctly
- ✅ **WORKING**: Task spawning creates N tasks with correct indices
- ✅ **WORKING**: Dependency count propagation for map steps
- ✅ **WORKING**: Array element extraction - tasks get full array instead of individual items
- ❌ **MISSING**: Output aggregation - no way to combine map task outputs for dependents
- ✅ **DONE**: Empty array maps (taskless) cascade and complete correctly
- ✅ **DONE**: Task spawning creates N tasks with correct indices
- ✅ **DONE**: Dependency count propagation for map steps
- ✅ **DONE**: Array element extraction - tasks receive individual array elements
- ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
- ⏳ **NEXT**: DSL support for `.map()` for defining map steps

### Chores

- ⏳ **WAITING**: Integration tests for map steps
- ⏳ **WAITING**: Consolidated migration for map steps
- ⏳ **WAITING**: Documentation for map steps
- ⏳ **WAITING**: Graphite stack merge for map steps

## Implementation Status

Expand Down Expand Up @@ -67,16 +75,15 @@
- Handles both root maps (from run input) and dependent maps (from step outputs)
- Tests with actual array data processing

#### ❌ Remaining Work
- [x] **PR #217: Output Aggregation** - `09-17-add-map-step-output-aggregation` (THIS PR)

- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION)
- Inline aggregation implementation in complete_task, start_tasks, maybe_complete_run
- Full test coverage (17 tests) for all aggregation scenarios
- Handles NULL preservation, empty arrays, order preservation
- Validates non-array outputs to map steps fail correctly
- Fixed broadcast aggregation to send full array not individual task output

- Aggregate map task outputs when step completes
- Store aggregated output for dependent steps to consume
- Maintain task_index ordering in aggregated arrays
- Tests for aggregation with actual map task outputs
- **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented
- **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run)
#### ❌ Remaining Work

- [ ] **DSL Support for .map() Step Type**

Expand All @@ -93,6 +100,30 @@
- Type safety for input/output types
- Compile-time enforcement of single dependency rule

- [ ] **Fix Orphaned Messages on Run Failure**

- Archive all pending messages when run fails
- Handle map sibling tasks specially
- Fix type constraint violations to fail immediately without retries
- See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md)
- Critical for production: prevents queue performance degradation
- Tests already written (stashed) that document the problem

- [ ] **Performance Optimization: step_states.output Column**

- Migrate from inline aggregation to storing outputs in step_states
- See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md)
- Benefits:
- Eliminate redundant aggregation queries
- 30-70% performance improvement for map chains
- Cleaner architecture with single source of truth
- Implementation:
- Add output column to step_states table
- Update complete_task to populate output on completion
- Simplify consumers (start_tasks, maybe_complete_run, broadcasts)
- Update all aggregation tests (~17 files)
- **Note**: This is an optimization that should be done after core functionality is stable

- [ ] **Integration Tests**

- End-to-end workflows with real array data
Expand Down
184 changes: 184 additions & 0 deletions PLAN_orphaned_messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Plan: Fix Orphaned Messages on Run Failure

## Problem Statement

When a run fails, messages for pending tasks remain in the queue indefinitely, causing:
1. **Resource waste**: Workers continuously poll orphaned messages
2. **Performance degradation**: Queue operations slow down over time
3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned
4. **Type violations**: Deterministic errors retry unnecessarily

## Current Behavior

### When fail_task is called
```sql
-- Only archives the single failing task's message
SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id);
-- Leaves all other queued messages orphaned
```

### When type constraint violation occurs
```sql
-- Raises exception, causes retries
RAISE EXCEPTION 'Map step % expects array input...';
-- Transaction rolls back, but retries will hit same error
```

## Implementation Plan

### 1. Update fail_task Function
**File**: `pkgs/core/schemas/0100_function_fail_task.sql`

Add after marking run as failed (around line 47):
```sql
-- Archive all pending messages for this run
WITH tasks_to_archive AS (
SELECT t.msg_id
FROM pgflow.step_tasks t
WHERE t.run_id = fail_task.run_id
AND t.status = 'pending'
AND t.msg_id IS NOT NULL
)
SELECT pgmq.archive('pgflow_tasks_queue', msg_id)
FROM tasks_to_archive;
```

### 2. Update complete_task for Type Violations
**File**: `pkgs/core/schemas/0100_function_complete_task.sql`

Replace the current RAISE EXCEPTION block (lines 115-120) with:
```sql
IF v_dependent_map_slug IS NOT NULL THEN
-- Mark run as failed immediately (no retries for type violations)
UPDATE pgflow.runs
SET status = 'failed',
failed_at = now(),
error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)',
v_dependent_map_slug,
complete_task.step_slug,
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END,
complete_task.output)
WHERE run_id = complete_task.run_id;

-- Archive ALL pending messages for this run
PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id)
FROM pgflow.step_tasks t
WHERE t.run_id = complete_task.run_id
AND t.status = 'pending'
AND t.msg_id IS NOT NULL;

-- Mark the current task as failed (not completed)
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
error_message = format('Type contract violation: produced %s instead of array',
CASE WHEN complete_task.output IS NULL THEN 'null'
ELSE jsonb_typeof(complete_task.output) END)
WHERE run_id = complete_task.run_id
AND step_slug = complete_task.step_slug
AND task_index = complete_task.task_index;

-- Return empty result set (task not completed)
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
RETURN;
END IF;
```

### 3. Add Supporting Index
**File**: New migration or add to existing

```sql
-- Speed up the archiving query
CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg
ON pgflow.step_tasks(run_id, status)
WHERE status = 'pending' AND msg_id IS NOT NULL;
```

## Testing

### Tests Already Written (Stashed)

1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`**
- Verifies all map task messages are archived when one fails
- Tests: 8 assertions about message archiving and status

2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`**
- Verifies type violations archive all pending messages
- Tests: 8 assertions about queue cleanup and run status

### How to Run Tests
```bash
# After unstashing and implementing the fixes:
pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql
pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql
```

## Migration Considerations

### Backward Compatibility
- New behavior only affects failed runs (safe)
- Archiving preserves messages (can be recovered if needed)
- No schema changes to existing tables

### Performance Impact
- One-time cost during failure (acceptable)
- Prevents ongoing performance degradation (improvement)
- Index ensures archiving query is efficient

### Rollback Plan
If issues arise:
1. Remove the archiving logic
2. Messages remain in queue (old behavior)
3. No data loss since we archive, not delete

## Edge Cases to Consider

### 1. Concurrent Task Completion
If multiple tasks complete/fail simultaneously:
- PostgreSQL row locks ensure consistency
- Each failure archives all pending messages
- Idempotent: archiving already-archived messages is safe

### 2. Very Large Map Steps
For maps with 1000+ tasks:
- Archiving might take several seconds
- Consider batching if performance issues arise
- Current approach should handle up to ~10k tasks reasonably

### 3. Mixed Step Types
When run has both map and single steps:
- Archive logic handles all pending tasks regardless of type
- Correctly archives both map siblings and unrelated pending tasks

## Future Enhancements (Not for this PR)

1. **Selective Archiving**: Only archive tasks that can't proceed
2. **Batch Operations**: Archive in chunks for very large runs
3. **Recovery Mechanism**: Function to unarchive and retry
4. **Monitoring**: Track archived message counts for alerting

## Success Criteria

- [ ] All tests pass (both new test files)
- [ ] No orphaned messages after run failure
- [ ] Type violations don't retry
- [ ] Performance acceptable for maps with 100+ tasks
- [ ] No impact on successful run performance

## Implementation Checklist

- [ ] Update `fail_task` function
- [ ] Update `complete_task` function
- [ ] Add database index
- [ ] Unstash and run tests
- [ ] Test with large map steps (100+ tasks)
- [ ] Update migration file
- [ ] Document behavior change in function comments

## Notes

- This fix is **critical for production** - without it, queue performance will degrade over time
- Type violations are **deterministic** - retrying them is always wasteful
- Archiving (vs deleting) preserves debugging capability
- The fix is relatively simple (~30 lines of SQL) but high impact
Loading
Loading