|
| 1 | +# Output Aggregation Implementation Plan |
| 2 | + |
| 3 | +## Overview |
| 4 | +Implement output aggregation for map steps with performance-focused, test-first approach. |
| 5 | + |
| 6 | +## Stage 1: Baseline Performance Measurement |
| 7 | + |
| 8 | +### Tasks |
| 9 | +- Run existing performance tests multiple times (3-5 runs) |
| 10 | +- Calculate average values for each metric |
| 11 | +- Document results in `PERFORMANCE.md` |
| 12 | + |
| 13 | +### Commands |
| 14 | +```bash |
| 15 | +# Run performance tests (repeat 3-5 times) |
| 16 | +pnpm nx test:pgtap core -- pkgs/core/tests/performance/*.sql |
| 17 | + |
| 18 | +# Document results in PERFORMANCE.md with format: |
| 19 | +# - Test name |
| 20 | +# - Average execution time |
| 21 | +# - Min/Max values |
| 22 | +# - Standard deviation if significant |
| 23 | +``` |
| 24 | + |
| 25 | +## Stage 2: Test-First Development (Naive Implementation) |
| 26 | + |
| 27 | +### Approach |
| 28 | +Write failing tests one at a time, implement inline solution to make them pass. |
| 29 | + |
| 30 | +### Test Scenarios (in order of complexity) |
| 31 | +1. **Basic map output aggregation** |
| 32 | + - Single map step with 3 tasks |
| 33 | + - Verify outputs aggregated in task_index order |
| 34 | + |
| 35 | +2. **Empty map output** |
| 36 | + - Map step with 0 tasks |
| 37 | + - Should return `[]` as output |
| 38 | + |
| 39 | +3. **Map feeding into single step** |
| 40 | + - Map step output aggregated as array |
| 41 | + - Single step receives full array as dependency input |
| 42 | + |
| 43 | +4. **Map feeding into another map** |
| 44 | + - First map outputs array |
| 45 | + - Second map processes each element |
| 46 | + |
| 47 | +5. **Edge case: NULL outputs** |
| 48 | + - Some tasks return NULL |
| 49 | + - Aggregation should include NULLs in array |
| 50 | + |
| 51 | +6. **Run completion with map leaf step** |
| 52 | + - Map step as leaf (no dependents) |
| 53 | + - Run output should contain aggregated array |
| 54 | + |
| 55 | +### Development Workflow |
| 56 | +```bash |
| 57 | +# 1. Write test |
| 58 | +vim pkgs/core/tests/map_output_aggregation_test.sql |
| 59 | + |
| 60 | +# 2. Run test (should fail) |
| 61 | +pkgs/core/scripts/run-test-with-colors pkgs/core/tests/map_output_aggregation_test.sql |
| 62 | + |
| 63 | +# 3. Update functions in database |
| 64 | +psql $DATABASE_URL -f updated_function.sql |
| 65 | + |
| 66 | +# 4. Re-run test (iterate until passing) |
| 67 | +pkgs/core/scripts/run-test-with-colors pkgs/core/tests/map_output_aggregation_test.sql |
| 68 | + |
| 69 | +# 5. Repeat for next test scenario |
| 70 | +``` |
| 71 | + |
| 72 | +### Implementation Notes |
| 73 | +**Naive approach**: Inline aggregation directly in the affected functions |
| 74 | +- **`start_tasks`**: Aggregate map outputs inline in deps CTE |
| 75 | +- **`maybe_complete_run`**: Aggregate map outputs for leaf steps |
| 76 | +- **`complete_task`**: Aggregate for broadcast events |
| 77 | + |
| 78 | +## Stage 3: Performance Measurement (Naive) |
| 79 | + |
| 80 | +### Tasks |
| 81 | +- Run performance tests with naive implementation |
| 82 | +- Compare with baseline |
| 83 | +- Document in `PERFORMANCE.md` |
| 84 | + |
| 85 | +### Expected Impact |
| 86 | +- `start_tasks`: Moderate overhead (aggregation per dependency) |
| 87 | +- `maybe_complete_run`: Minimal (only at run completion) |
| 88 | +- `complete_task`: Minimal (only for broadcasts) |
| 89 | + |
| 90 | +## Stage 4: Map-to-Map Optimization |
| 91 | + |
| 92 | +### Concept |
| 93 | +Optimize the map->map case where we aggregate outputs only to immediately decompose them: |
| 94 | +- Map A task[i] → output[i] |
| 95 | +- Currently: Aggregate to array → decompose in Map B |
| 96 | +- Optimized: Map A task[i] → Map B task[i] directly |
| 97 | + |
| 98 | +### Implementation Strategy |
| 99 | +```sql |
| 100 | +-- In start_tasks deps CTE, add special case: |
| 101 | +CASE |
| 102 | + WHEN step.step_type = 'map' AND dep_step.step_type = 'map' THEN |
| 103 | + -- Direct task-to-task transfer |
| 104 | + (SELECT output FROM pgflow.step_tasks |
| 105 | + WHERE run_id = st.run_id |
| 106 | + AND step_slug = dep.dep_slug |
| 107 | + AND task_index = st.task_index |
| 108 | + AND status = 'completed') |
| 109 | + ELSE |
| 110 | + -- Standard aggregation for non-map dependents |
| 111 | + ... |
| 112 | +END |
| 113 | +``` |
| 114 | + |
| 115 | +### Tests |
| 116 | +1. **Map-to-map direct transfer** |
| 117 | + - Verify task[i] gets output[i] without aggregation |
| 118 | + |
| 119 | +2. **Map-to-map with different sizes** |
| 120 | + - Source map: 5 tasks |
| 121 | + - Target map: 5 tasks (should work) |
| 122 | + - Error handling if sizes mismatch |
| 123 | + |
| 124 | +## Stage 5: Final Performance Measurement |
| 125 | + |
| 126 | +### Tasks |
| 127 | +- Run all performance tests |
| 128 | +- Compare baseline vs naive vs optimized |
| 129 | +- Document final results and recommendations |
| 130 | + |
| 131 | +### Metrics to Track |
| 132 | +- Execution time per function |
| 133 | +- Memory usage (if measurable) |
| 134 | +- Query complexity (EXPLAIN ANALYZE) |
| 135 | + |
| 136 | +## Stage 6: Function Extraction Decision |
| 137 | + |
| 138 | +### Evaluation Criteria |
| 139 | +After measuring performance of inline implementation: |
| 140 | +1. **Performance overhead**: Is function call cost acceptable? |
| 141 | +2. **Code duplication**: How much repetition exists? |
| 142 | +3. **Maintainability**: Would function improve code clarity? |
| 143 | + |
| 144 | +### If extracting to function: |
| 145 | +```sql |
| 146 | +-- Create pgflow.get_step_output() helper |
| 147 | +-- Update all three locations to use helper |
| 148 | +-- Re-run performance tests |
| 149 | +-- Document final decision and rationale |
| 150 | +``` |
| 151 | + |
| 152 | +## Notes for Implementation |
| 153 | + |
| 154 | +### Key Files to Modify |
| 155 | +1. `pkgs/core/schemas/0120_function_start_tasks.sql` (lines 46-53) |
| 156 | +2. `pkgs/core/schemas/0100_function_maybe_complete_run.sql` (lines 16-27) |
| 157 | +3. `pkgs/core/schemas/0100_function_complete_task.sql` (line 156) |
| 158 | + |
| 159 | +### Testing Database Access |
| 160 | +```bash |
| 161 | +# Get database URL |
| 162 | +source .env.local |
| 163 | +echo $DATABASE_URL |
| 164 | + |
| 165 | +# Direct psql access for function updates |
| 166 | +psql $DATABASE_URL |
| 167 | + |
| 168 | +# View current function |
| 169 | +\sf pgflow.start_tasks |
| 170 | +``` |
| 171 | + |
| 172 | +### Performance Testing Tips |
| 173 | +- Run tests when system is idle |
| 174 | +- Use consistent hardware/environment |
| 175 | +- Warm up database before measurements |
| 176 | +- Consider connection pooling effects |
| 177 | + |
| 178 | +## Success Criteria |
| 179 | +- [ ] All map output aggregation tests passing |
| 180 | +- [ ] Performance impact < 10% for typical workflows |
| 181 | +- [ ] Map-to-map optimization shows measurable improvement |
| 182 | +- [ ] Documentation complete with performance analysis |
| 183 | +- [ ] Decision made on function extraction based on data |
0 commit comments