From a4226708f1b723cd73c4082666a5d84a6521308e Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 15 Sep 2025 19:54:04 +0200 Subject: [PATCH] chore: update settings and improve cascade_complete_taskless_steps safety and performance - Added new Bash pattern for PGPASSWORD in settings.json - Included new mcp__sequentialthinking__sequentialthinking in settings - Enhanced cascade_complete_taskless_steps with iteration limit to prevent infinite loops - Added safety counter and exception handling for robustness - Improved documentation with detailed comments on safety and performance considerations --- .claude/settings.json | 4 +- PLAN.md | 382 +++++------------- PLAN_cascade_complete_taskless_steps.md | 275 ------------- .../PLAN_use_null_for_map_initial_tasks.md | 195 +++++++++ ...nction_cascade_complete_taskless_steps.sql | 91 +++++ .../schemas/0100_function_complete_task.sql | 21 +- .../core/schemas/0100_function_start_flow.sql | 3 + pkgs/core/src/database-types.ts | 4 + ...93518_pgflow_temp_add_cascade_complete.sql | 321 +++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../completing_taskless_steps/basic.test.sql | 65 +++ .../cascade_iteration_limit.test.sql | 85 ++++ .../cascade_performance.test.sql | 226 +++++++++++ .../normal_to_map_empty.test.sql | 115 ++++++ .../taskless_map_to_two_normals.test.sql | 109 +++++ .../taskless_sequence.test.sql | 91 +++++ .../two_taskless_maps_to_normal.test.sql | 117 ++++++ .../map_queue_messages.test.sql | 26 +- .../map_task_spawning.test.sql | 47 +-- 19 files changed, 1565 insertions(+), 615 deletions(-) delete mode 100644 PLAN_cascade_complete_taskless_steps.md create mode 100644 pkgs/core/PLAN_use_null_for_map_initial_tasks.md create mode 100644 pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql create mode 100644 pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/basic.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/cascade_iteration_limit.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/cascade_performance.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/normal_to_map_empty.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/taskless_map_to_two_normals.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/taskless_sequence.test.sql create mode 100644 pkgs/core/supabase/tests/completing_taskless_steps/two_taskless_maps_to_normal.test.sql diff --git a/.claude/settings.json b/.claude/settings.json index 9d2bb0cfd..eea61cc00 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -4,6 +4,7 @@ "Bash(./scripts/atlas-migrate-diff:*)", "Bash(./scripts/atlas-migrate-hash:*)", "Bash(./scripts/run-test-with-colors:*)", + "Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -c:*)", "Bash(bin/run-test-with-colors:*)", "Bash(cat:*)", "Bash(cd:*)", @@ -52,7 +53,8 @@ "mcp__nx-mcp__nx_docs", "mcp__nx-mcp__nx_project_details", "mcp__nx-mcp__nx_workspace", - "mcp__nx-mcp__nx_workspace_path" + "mcp__nx-mcp__nx_workspace_path", + "mcp__sequentialthinking__sequentialthinking" ], "deny": [] }, diff --git a/PLAN.md b/PLAN.md index 203851f91..fd27acef7 100644 --- a/PLAN.md +++ b/PLAN.md @@ -2,321 +2,149 @@ **NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.** -## Implementation Status - -### Sequential Child PR Plan - -- [x] **PR #207: Add .array() to DSL** - `feature-map-and-array` - - TypeScript DSL enhancement for array creation - - Foundation for map step functionality -- [x] **PR #208: Foundation - Schema & add_step()** - `09-10-feat_add_map_step_type_in_sql` - - Schema changes (initial_tasks, remaining_tasks, constraints) - - add_step() function with map step validation - - Basic tests for map step creation -- [x] **PR #209: Root Map Support** - `09-11-root-map-support` (COMPLETED) - - - Enhanced start_flow() for root map validation and count setting - - Tests for root map scenarios - -- [x] **PR #210: Task Spawning** - `09-12-task-spawning` (COMPLETED) - - - Enhanced start_ready_steps() for N task generation - - Empty array auto-completion - - Tests for batch task creation - -- [ ] **Cascade Complete Taskless Steps** - - - Extract taskless completion from start_ready_steps() - - Add cascade capability for chains of taskless steps - - Generic solution for all initial_tasks=0 steps - - See PLAN_cascade_complete_taskless_steps.md for details - -- [ ] **Array Element Extraction** - - - Enhanced start_tasks() for map input extraction - - Support for root and dependent maps - - Tests for element extraction - -- [ ] **Dependent Map Support** - - - Enhanced complete_task() for map dependency handling - - Array validation and count propagation - - Tests for dependency scenarios - -- [ ] **Output Aggregation** - - - Enhanced maybe_complete_run() for array aggregation - - Ordered output collection - - Tests for aggregation - -- [ ] **Integration Tests** - - End-to-end test suite - - Edge case coverage - - Performance validation - -- [ ] **Performance Benchmarking Suite** - - Dedicated benchmark functions separate from tests - - Measure task spawning at various scales (100, 1K, 10K, 100K elements) - - Track performance metrics: spawn time, memory usage, queue throughput - - Non-blocking CI workflow that posts results as PR comment - - Runs independently from test suite to avoid timeouts - - Provides visibility without blocking merges - -## Overview - -This implementation establishes the SQL-level foundation for map step functionality, building on PR #207's completed `.array()` method. It focuses exclusively on the database schema and SQL Core layer, providing the infrastructure needed for parallel task spawning, execution, and result aggregation. - -**Dependencies**: PR #207 (`.array()` method) must be completed -**Milestone**: Can create map steps and spawn/complete tasks via direct SQL calls - -## Core Value Proposition - -- **Parallel Task Spawning**: Map steps spawn N tasks based on array dependency length -- **Task Counting Infrastructure**: Robust task counting with complete set of invariants -- **Result Aggregation**: Ordered collection of task results into final array output -- **Empty Array Handling**: Auto-completion for zero-task scenarios - -## Hybrid Strategy: "Fresh Data" vs "Dumb Spawning" - -The implementation uses a hybrid approach that separates **"count determination"** from **"task spawning"** for optimal performance and maintainability. - -### Key Principle - -**Determine task counts where data is naturally available, spawn tasks where it's most efficient.** - -### Fresh Data Functions - -Functions that have access to fresh array data handle validation and count setting: - -- **`start_flow()`**: Has fresh `runs.input` → validate and set `initial_tasks` for root maps -- **`complete_task()`**: Has fresh `output` → validate and set `initial_tasks` for dependent maps - -### Dumb Functions - -Functions that use pre-computed counts without JSON parsing: - -- **`start_ready_steps()`**: Copies `initial_tasks → remaining_tasks` and spawns N tasks efficiently -- **`start_tasks()`**: Extracts array elements using `task_index` -- **`maybe_complete_run()`**: Aggregates results into ordered arrays - -### Benefits - -1. **Minimal JSON Parsing**: Array parsing happens exactly twice - once in each "fresh data" function -2. **Performance Predictable**: No duplicate work or re-reading large arrays -3. **Clean Separation**: Each function has focused responsibility -4. **Atomic Operations**: Count setting happens under existing locks - -## Implementation Components - -The implementation is split across multiple PRs as shown in the Sequential Child PR Plan above. Each PR builds on the previous one to deliver complete map step functionality. - -## Database Schema Changes +### Current State -For detailed schema development workflow, migration generation, and regeneration instructions, see: +- ✅ **WORKING**: Empty array maps (taskless) cascade and complete correctly +- ✅ **WORKING**: Task spawning creates N tasks with correct indices +- ✅ **WORKING**: Dependency count propagation for map steps +- ❌ **MISSING**: Array element extraction - tasks get full array instead of individual items +- ❌ **MISSING**: Output aggregation - no way to combine map task outputs for dependents -- `.claude/schema_development.md` - Concise workflow guide +### What Needs to Be Done -### Schema Updates (DONE) +1. **Array Element Extraction in `start_tasks()`** -#### 1. Enable Map Step Type (DONE) + - Each map task must receive `array[task_index]` not the entire array + - Requires modifying the input assembly logic to use `jsonb_array_element()` -**File**: `pkgs/core/schemas/0050_tables_definitions.sql` +2. **Output Aggregation When Map Completes** + - When all map tasks finish, aggregate outputs: `jsonb_agg(output ORDER BY task_index)` + - Store this somewhere accessible to dependent steps + - Options: Add column to step_states, compute on-demand, or temporary storage -- Updated constraint to allow 'map' step type +### Example: What Should Happen (But Doesn't) -#### 2. Remove Single Task Constraint (DONE) +```sql +-- Given a flow: normalStep -> mapStep -> finalStep -**File**: `pkgs/core/schemas/0060_tables_runtime.sql` +-- 1. normalStep completes with output: +'["apple", "banana", "cherry"]' -- Removed `only_single_task_per_step` constraint +-- 2. mapStep should spawn 3 tasks: +-- Task 0 receives: {"normalStep": "apple"} ← NOT WORKING (gets full array) +-- Task 1 receives: {"normalStep": "banana"} ← NOT WORKING (gets full array) +-- Task 2 receives: {"normalStep": "cherry"} ← NOT WORKING (gets full array) -#### 3. Add Initial Tasks Column and Update Remaining Tasks (DONE) +-- 3. Each task processes and outputs: +-- Task 0 outputs: {"processed": "APPLE"} +-- Task 1 outputs: {"processed": "BANANA"} +-- Task 2 outputs: {"processed": "CHERRY"} -**File**: `pkgs/core/schemas/0060_tables_runtime.sql` +-- 4. When mapStep completes, aggregate outputs: +'[{"processed": "APPLE"}, {"processed": "BANANA"}, {"processed": "CHERRY"}]' ← NOT WORKING -- Added `initial_tasks` column with DEFAULT 1 -- Made `remaining_tasks` nullable -- Added `remaining_tasks_state_consistency` constraint +-- 5. finalStep receives the aggregated array as input +``` -## Function Changes - -### 1. `add_step()` - Map Step Creation (DONE) - -**File**: `pkgs/core/schemas/0100_function_add_step.sql` - -**Completed Changes:** - -- Added `step_type TEXT DEFAULT 'single'` parameter -- Added validation for map steps (max 1 dependency) -- Function now stores step_type in database - -### 2. `start_flow()` - Root Map Count Setting (CURRENT PR) - -**File**: `pkgs/core/schemas/0100_function_start_flow.sql` - -**Required Changes:** - -- Detect root map steps (step_type='map' AND deps_count=0) -- Validate that `runs.input` is an array for root maps -- Set `initial_tasks = jsonb_array_length(input)` for root maps -- Fail with clear error if input is not array for root map - -### 3. `complete_task()` - Dependent Map Count Setting (TODO) - -**File**: `pkgs/core/schemas/0100_function_complete_task.sql` - -**Required Changes:** - -- Detect map dependents when a step completes -- For single→map: validate output is array, set `initial_tasks = array_length` -- For map→map: count completed tasks, set `initial_tasks = task_count` -- Fail with clear error if dependency output is not array when needed - -### 4. `start_ready_steps()` - Task Spawning (TODO) - -**File**: `pkgs/core/schemas/0100_function_start_ready_steps.sql` - -**Required Changes:** - -- Generate N tasks using `generate_series(0, initial_tasks-1)` for map steps -- Handle empty arrays: direct transition to 'completed' when initial_tasks=0 -- Send appropriate realtime events (step:started or step:completed) -- Insert multiple step_tasks records with proper task_index values - -**Key Decision - Empty Array Handling:** - -- Transition directly `created` → `completed` for initial_tasks=0 -- Send single `step:completed` event with `output: []` - -### 5. `start_tasks()` - Array Element Extraction (TODO) - -**File**: `pkgs/core/schemas/0120_function_start_tasks.sql` - -**Required Changes:** - -- Extract array elements based on task_index for map steps -- Root maps: extract from `runs.input[task_index]` -- Dependent maps: extract from aggregated dependency output -- Single steps: unchanged behavior (keep existing logic) - -### 6. `maybe_complete_run()` - Output Aggregation (TODO) - -**File**: `pkgs/core/schemas/0100_function_maybe_complete_run.sql` - -**Required Changes:** - -- Aggregate map step outputs into arrays ordered by `task_index` -- Single steps: unchanged (single output value) -- Maintain run output structure: `{step1: output1, mapStep: [item1, item2, ...]}` - -## Testing Strategy - -### Tests by PR - -**PR #208 (DONE):** - -- map_step_creation.test.sql -- step_type_validation.test.sql -- map_dependency_limit.test.sql -- map_step_with_no_deps.test.sql - -**PR #209 (CURRENT):** - -- root_map_array_validation.test.sql -- root_map_initial_tasks.test.sql -- mixed_step_types.test.sql -- multiple_root_maps.test.sql -- null_input_validation.test.sql -- large_array_handling.test.sql -- nested_array_handling.test.sql -- mixed_type_arrays.test.sql -- invalid_json_types.test.sql -- flow_only_maps.test.sql - -**Subsequent PRs:** Each will include its own comprehensive test suite covering: - -- Function-specific tests -- Edge cases and error handling -- Integration with existing functionality - -**Final PR:** Integration test suite covering end-to-end workflows +## Implementation Status -## Edge Cases Handled +### Sequential Child PR Plan -### 1. Empty Arrays +#### ✅ Completed PRs -- Root maps: detected in `start_flow()`, `remaining_tasks = 0` → auto-complete in `start_ready_steps()` -- Dependent maps: `remaining_tasks = 0` set in `complete_task()` → auto-complete in `start_ready_steps()` +- [x] **PR #207: Add .array() to DSL** - `feature-map-and-array` -### 2. Array Validation + - TypeScript DSL enhancement for array creation + - Foundation for map step functionality -- Root maps: validate `runs.input` is array in `start_flow()` -- Dependent maps: validate dependency output is array in `complete_task()` -- Both: fail fast with clear error messages +- [x] **PR #208: Foundation - Schema & add_step()** - `09-10-feat_add_map_step_type_in_sql` -### 3. Map→Map Dependencies + - Schema changes (initial_tasks, remaining_tasks, constraints) + - add_step() function with map step validation + - Basic tests for map step creation -- Parent map has N completed tasks -- Child map gets `initial_tasks = N` (count of parent tasks) -- Each child task reads from aggregated parent array using `task_index` -- Simple aggregation approach (no optimization needed for MVP) +- [x] **PR #209: Root Map Support** - `09-11-root-map-support` -### 4. Non-Map Dependents of Maps + - Enhanced start_flow() for root map validation and count setting + - Tests for root map scenarios -- Single step depending on map step gets aggregated array -- Built on-demand in `start_tasks()`: `jsonb_agg(output ORDER BY task_index)` -- Preserves array ordering +- [x] **PR #210: Task Spawning** - `09-12-task-spawning` -### 5. Failure Semantics + - Enhanced start_ready_steps() for N task generation + - Empty array auto-completion + - Tests for batch task creation -- Array validation failures: immediately fail step with clear error -- Individual task failures: follow normal retry → task fail → step fail → run fail -- Empty arrays: auto-complete successfully (not failures) +- [x] **PR #211: Cascade Complete Taskless Steps** - `09-15-complete-cascade` -## Performance Optimizations + - Extracted taskless completion from start_ready_steps() + - Added cascade_complete_taskless_steps() function with iteration safety + - Generic solution for all initial_tasks=0 steps + - Fixed flow_slug matching bug in dep_updates CTE + - All taskless cascade tests passing (7/7 test files) -1. **Minimal JSON Parsing**: Array parsing happens exactly twice - once in each "fresh data" function -2. **Batch Operations**: Use `generate_series()` for efficient task creation -3. **Atomic Updates**: Leverage existing locks and transactions -4. **On-Demand Aggregation**: Only aggregate when needed for non-map dependents -5. **Simple Aggregation**: Map→map uses consistent aggregation approach for clarity +- [x] **PR #212: Dependent Map Count Propagation** + - Enhanced complete_task() sets initial_tasks for dependent maps + - Array validation and count propagation working + - Cascade handles taskless dependent maps -## Success Criteria +#### ❌ Remaining Work -### Functional Requirements +- [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE) -1. ✅ **Map Step Creation**: `add_step` accepts `step_type='map'` parameter -2. ✅ **Dynamic Task Spawning**: Map steps spawn N tasks based on array length -3. ✅ **Empty Array Handling**: Zero-length arrays auto-complete with `[]` output -4. ✅ **Result Aggregation**: Task outputs aggregated in task_index order -5. ✅ **Task Count Propagation**: Map dependents get correct task counts + - Enhanced start_tasks() to distribute array elements to map tasks + - Each map task receives its specific array element based on task_index + - Handles both root maps (from run input) and dependent maps (from step outputs) + - Tests with actual array data processing -### Data Integrity Requirements +- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION) -1. ✅ **Consistent State Transitions**: Task counts maintained correctly -2. ✅ **Ordered Aggregation**: Results maintain task_index ordering + - Aggregate map task outputs when step completes + - Store aggregated output for dependent steps to consume + - Maintain task_index ordering in aggregated arrays + - Tests for aggregation with actual map task outputs -### Performance Requirements +- [ ] **DSL Support for .map() Step Type** -1. ✅ **Batch Operations**: Task spawning uses efficient generate_series approach + - Add `.map()` method to Flow DSL for defining map steps + - Constraints: + - Locked to exactly one dependency (enforced at compile time) + - Dependency must return an array (type-checked) + - Syntax design: + - Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)` + - Root maps: Decide between `{ array: 'run' }` or omitting array property + - Return type always inferred as array + - Comprehensive tests: + - Runtime validation of array dependencies + - Type safety for input/output types + - Compile-time enforcement of single dependency rule -### Testing Requirements +- [ ] **Integration Tests** -1. ✅ **Invariant Testing**: Task counting constraints thoroughly tested -2. ✅ **Edge Case Coverage**: Empty arrays, large arrays, error scenarios -3. ✅ **Integration Testing**: Multi-step workflows validated + - End-to-end workflows with real array data + - Basic happy path coverage + - This should be minimal and added to the Edge Worker integration test suite for now -## Risk Mitigation +- [ ] **Semantic Improvement: NULL for Unknown initial_tasks** (OPTIONAL - Can be deferred) -### Identified Risks + - Change initial_tasks from "1 as placeholder" to NULL for dependent map steps + - Benefits: Semantic correctness (NULL = unknown, not "1 task") + - Scope: Schema change to allow NULL, update 5+ SQL functions + - See detailed plan in `pkgs/core/PLAN_use_null_for_map_initial_tasks.md` + - **Note**: This is a semantic improvement only - current approach works functionally + - **Warning**: If deferred, new tests for Array Distribution and Output Aggregation will + assume initial_tasks = 1 for dependent maps, making this change harder later -**Risk 1: Performance Impact** +- [ ] **Migration Consolidation** -- **Mitigation**: Efficient SQL patterns (generate_series, batch operations) -- **Testing**: Performance validation with large arrays (1000+ elements) + - Remove all temporary/incremental migrations from feature branches + - Generate a single consolidated migration for the entire map infrastructure + - Ensure clean migration path from current production schema + - If NULL improvement is done, include it in the consolidated migration -**Risk 2: Empty Array Edge Cases** +- [ ] **Graphite Stack Merge** -- **Mitigation**: Explicit auto-completion logic and dedicated testing -- **Testing**: Comprehensive empty array scenario coverage -- **Validation**: End-to-end empty array workflow testing + - Configure Graphite merge queue for the complete PR stack + - Ensure all PRs in sequence can be merged together + - Final validation before merge to main + - Merge queue to be set such that it verifies only the top PR + (it is because of CI check for temp migrations) diff --git a/PLAN_cascade_complete_taskless_steps.md b/PLAN_cascade_complete_taskless_steps.md deleted file mode 100644 index 7880b3f74..000000000 --- a/PLAN_cascade_complete_taskless_steps.md +++ /dev/null @@ -1,275 +0,0 @@ -# PLAN: Cascade Complete Taskless Steps - -## Problem Statement - -Steps with `initial_tasks = 0` need immediate completion without task execution. When such a step completes, its dependents may become ready - and if those dependents are also taskless, they should complete immediately as well, creating a cascade effect. - -Currently, this cascade doesn't happen, leaving taskless steps in a "ready but not completed" state. - -## Current State - -`start_ready_steps` currently contains logic to complete empty map steps (taskless), but: -- It only handles the immediate step, not cascading to dependents -- This logic is mixed with task spawning concerns -- It can't handle chains of taskless steps - -This plan extracts that logic into a dedicated function and adds cascade capability. - -## Taskless Step Types - -### Current -- **Empty array maps**: Map steps receiving `[]` input - -### Future (generic design) -- **Condition gates**: Evaluate JSONP conditions, route without execution -- **Validators**: Check constraints, pass/fail instantly -- **Aggregators**: Might receive 0 inputs to aggregate -- **Routers**: Direct flow based on input, no processing needed - -The solution must be **generic** - not checking `step_type` but relying on `initial_tasks = 0`. - -## Edge Cases & Patterns - -### Chain cascade -``` -A (taskless) → B (taskless) → C (taskless) → D (normal) -``` -All taskless steps complete instantly, then D starts. - -### Fan-in pattern -``` -A (normal) ⟋ - → C (taskless) → D (normal) -B (normal) ⟌ -``` -C completes only when BOTH A and B complete. - -### Mixed cascade -``` -A (normal) → B (taskless) → C (taskless) → D (normal) → E (taskless) -``` -- B,C cascade when A completes -- E completes instantly when D completes -- Two separate cascade events - -### Entire flow taskless -``` -Validate → Route → Log -``` -Entire flow completes synchronously in `start_flow` call. - -## Proposed Solution - -### Performance Analysis - Corrected - -Initial analysis was **incorrect**. After code review, the actual situation: - -1. **complete_task** calls **start_ready_steps** on EVERY task completion - - For 10k tasks = 10,000 calls to start_ready_steps - -2. **BUT** dependent steps' `remaining_deps` only decrements when STEP completes - - Happens ONCE when all tasks done, not 10,000 times - -3. **Cascade would check 10k times but find nothing 9,999 times** - - Tasks 1-9,999: cascade checks, finds no ready taskless steps - - Task 10,000: cascade finds chain ready, runs 50 iterations ONCE - -**Real impact**: 10,000 wasted checks + 50 iterations = **10,050 operations** (not 500,000!) - -### Call Site Heat Analysis - -| Call Site | Heat Level | When Cascade Needed | Actual Frequency | -|-----------|------------|---------------------|------------------| -| **start_flow()** | 🧊 COLD | Always check | Once per workflow | -| **complete_task()** | 🔥🔥🔥 HOT | Only when step completes | Once per step (not task!) | -| **start_ready_steps()** | 🔥 HOT | Never - wrong place | N/A | - -### PRIMARY SOLUTION: Simple Conditional Cascade - -Only call cascade when a step actually completes: - -```sql --- In complete_task, after line 91 -IF v_step_state.status = 'completed' THEN - -- Step just completed, cascade any ready taskless steps - PERFORM cascade_complete_taskless_steps(run_id); - - -- Send broadcast event (existing code) - PERFORM realtime.send(...); -END IF; - --- Remove cascade from start_ready_steps entirely -``` - -This reduces cascade calls from 10,000 (every task) to 1 (when step completes)! - -### The Cascade Function - -Use a simple loop that completes all ready taskless steps: - -```sql -CREATE OR REPLACE FUNCTION pgflow.cascade_complete_taskless_steps(run_id uuid) -RETURNS int -LANGUAGE plpgsql -AS $$ -DECLARE - v_total_completed int := 0; - v_iteration_completed int; -BEGIN - LOOP - WITH completed AS ( - UPDATE pgflow.step_states - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0 - WHERE step_states.run_id = cascade_complete_taskless_steps.run_id - AND status = 'created' - AND remaining_deps = 0 - AND initial_tasks = 0 - RETURNING * - ), - dep_updates AS ( - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - 1 - FROM completed c - JOIN pgflow.deps d ON d.flow_slug = c.flow_slug - AND d.dep_slug = c.step_slug - WHERE ss.run_id = c.run_id - AND ss.step_slug = d.step_slug - ), - -- Send realtime events and update run count... - SELECT COUNT(*) INTO v_iteration_completed FROM completed; - - EXIT WHEN v_iteration_completed = 0; - v_total_completed := v_total_completed + v_iteration_completed; - END LOOP; - - RETURN v_total_completed; -END; -$$; -``` - -**Performance**: 50 iterations once per step completion is acceptable - -### Integration Points - -```sql --- In start_flow (COLD PATH) -PERFORM cascade_complete_taskless_steps(run_id); -PERFORM start_ready_steps(run_id); - --- In complete_task (HOT PATH - but only when step completes) -IF step_completed THEN - PERFORM cascade_complete_taskless_steps(run_id); -END IF; - --- NOT in start_ready_steps - that was the wrong place -``` - -### Why Other Approaches Fail - -#### Recursive CTE: PostgreSQL Limitations -- ❌ Cannot use subqueries referencing recursive CTE -- ❌ Cannot use NOT EXISTS with recursive reference -- ❌ Cannot use aggregates on recursive reference -- ❌ Cannot check "all dependencies satisfied" condition - -#### One-Wave Approach: Weird Coupling -- ❌ Creates strange dependencies between unrelated steps -- ❌ Filter2 would complete when some UNRELATED step's task completes -- ❌ Confusing semantics and hard to debug - -#### Calling from start_ready_steps: Wrong Layer -- ❌ Would check for cascade on EVERY task (10,000 times) -- ❌ 9,999 wasted checks finding nothing -- ❌ Wrong separation of concerns - -#### No Cascade: Steps Never Complete -- ❌ Taskless steps have no tasks to complete them -- ❌ Would remain stuck in 'created' state forever - -### Performance Summary - -| Approach | Calls | Operations | Result | -|----------|-------|------------|--------| -| **Initial (wrong) analysis** | 10,000 | 500,000 | 🔴 Catastrophic | -| **Cascade in start_ready_steps** | 10,000 | 10,050 | 🟡 Wasteful | -| **Conditional cascade (solution)** | 1 | 50 | 🟢 Optimal | - -The simple conditional approach is **200x better** than calling from start_ready_steps and **10,000x better** than the initially feared scenario. - -### Realtime Events - -Each completed step needs to send a realtime event. Add to the loop: - -```sql --- Send realtime events for completed steps -broadcast AS ( - SELECT realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', c.run_id, - 'step_slug', c.step_slug, - 'status', 'completed', - 'started_at', c.started_at, - 'completed_at', c.completed_at, - 'output', '[]'::jsonb -- Empty output for taskless - ), - concat('step:', c.step_slug, ':completed'), - concat('pgflow:run:', c.run_id), - false - ) - FROM completed c -) -``` - -### Integration Points - -```sql --- In start_flow -PERFORM cascade_complete_taskless_steps(run_id) -- First -PERFORM start_ready_steps(run_id) -- Second - --- In complete_task --- After completing task and updating dependents: -PERFORM cascade_complete_taskless_steps(run_id) -- First -PERFORM start_ready_steps(run_id) -- Second -``` - -## Testing Strategy - -Create dedicated test folder: `pkgs/core/supabase/tests/cascade_taskless/` - -### Test cases needed - -1. **Basic cascade**: Chain of 3 taskless steps -2. **Fan-in**: Multiple deps converging on taskless step -3. **Mixed flow**: Alternating taskless and normal steps -4. **Empty array maps**: Current use case -5. **Entire taskless flow**: Should complete synchronously -6. **No cascade**: Single taskless step with normal dependent -7. **Realtime events**: Verify each completed step sends event - -### Test-First Development - -1. Write failing test for simplest case -2. Implement minimal cascade logic -3. Add complex pattern test -4. Extend implementation -5. Repeat until all patterns covered - -## Benefits - -- **Generic**: Handles all taskless step types, current and future -- **Decoupled**: Clear separation of concerns -- **Efficient**: Batch operations, minimal queries -- **Future-proof**: Ready for worker process separation -- **Testable**: Each function has single responsibility - -## Migration Notes - -- No schema changes needed -- Pure function additions -- Backward compatible -- Can be deployed independently \ No newline at end of file diff --git a/pkgs/core/PLAN_use_null_for_map_initial_tasks.md b/pkgs/core/PLAN_use_null_for_map_initial_tasks.md new file mode 100644 index 000000000..d4ab5923f --- /dev/null +++ b/pkgs/core/PLAN_use_null_for_map_initial_tasks.md @@ -0,0 +1,195 @@ +# Plan: Use NULL for Unknown initial_tasks in Dependent Map Steps + +## Motivation +Currently, dependent map steps have `initial_tasks = 1` as a placeholder until their dependencies complete. This is semantically incorrect and confusing: +- `1` implies "will spawn exactly 1 task" but that's false +- `NULL` correctly means "unknown until dependencies complete" +- Reduces cognitive load - see NULL, know it's unknown + +## Critical Considerations +Before implementing, these issues must be addressed: + +### 1. ~~The "Last Dependency" Problem~~ RESOLVED +**Map steps can have at most 1 dependency** (enforced in add_step.sql:24): +```sql +-- This constraint simplifies everything: +IF step_type = 'map' AND array_length(deps_slugs, 1) > 1 THEN + RAISE EXCEPTION 'Map step can have at most one dependency' +``` +This means we always know exactly when to resolve `initial_tasks` - when the single dependency completes! + +### 2. Non-Array Output Handling +What if a dependency doesn't produce an array? +```sql +-- Must handle both cases: +CASE + WHEN jsonb_typeof(output) = 'array' THEN jsonb_array_length(output) + ELSE 1 -- Treat non-array as single item to map +END +``` + +### 3. ~~Race Condition Prevention~~ RESOLVED +Since map steps have at most 1 dependency, no race conditions possible! +The single dependency completes once, updates initial_tasks atomically. + +### 4. The Start Ready Steps Problem +**CRITICAL**: We cannot use COALESCE - steps must NOT start with NULL initial_tasks +```sql +-- WRONG: COALESCE(initial_tasks, 1) +-- RIGHT: Add assertion before starting +IF started_step.initial_tasks IS NULL THEN + RAISE EXCEPTION 'Cannot start step % with unknown initial_tasks', step_slug; +END IF; +``` + +## Implementation Plan + +### 1. Schema Change +```sql +-- In 0060_tables_runtime.sql +ALTER TABLE pgflow.step_states +ALTER COLUMN initial_tasks DROP NOT NULL, +ALTER COLUMN initial_tasks DROP DEFAULT; + +-- Update constraint +ALTER TABLE pgflow.step_states +DROP CONSTRAINT step_states_initial_tasks_check, +ADD CONSTRAINT step_states_initial_tasks_check + CHECK (initial_tasks IS NULL OR initial_tasks >= 0); +``` + +### 2. Update start_flow Function +```sql +-- In 0100_function_start_flow.sql +-- Change initial_tasks assignment logic: +CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + -- Root map: get array length from input + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN + jsonb_array_length(start_flow.input) + ELSE + 1 + END + WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN + -- Dependent map: unknown until dependencies complete + NULL + ELSE + -- Single steps: always 1 task + 1 +END +``` + +### 3. Update start_ready_steps Function +```sql +-- In 0100_function_start_ready_steps.sql + +-- Empty map detection remains unchanged: +AND step_state.initial_tasks = 0 -- NULL != 0, so NULL maps won't match + +-- Add NULL check BEFORE starting steps: +ready_steps AS ( + SELECT * + FROM pgflow.step_states + WHERE remaining_deps = 0 + AND status = 'created' + AND initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count +) + +-- Task generation stays the same (no COALESCE needed): +CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) +``` + +### 4. Update complete_task Function +```sql +-- In 0100_function_complete_task.sql + +-- Simplified: map steps have exactly 1 dependency +initial_tasks = CASE + WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL THEN + -- Resolve NULL to actual value based on output + CASE + WHEN jsonb_typeof(complete_task.output) = 'array' + THEN jsonb_array_length(complete_task.output) + ELSE 1 -- Non-array treated as single item + END + ELSE ss.initial_tasks -- Keep existing value +END + +-- Note: This already works for single->map! +-- Just need to extend for map->map when we aggregate outputs +``` + +### 5. Update cascade_complete_taskless_steps Function +```sql +-- In 0100_function_cascade_complete_taskless_steps.sql + +-- Update the initial_tasks setting for cascade: +initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 -- Empty array propagation + ELSE ss.initial_tasks -- Keep NULL as NULL +END + +-- The BOOL_OR(c.initial_tasks = 0) already handles NULL correctly +-- (NULL = 0 returns false, which is what we want) +``` + +### 6. Add Safety Assertions +```sql +-- Add check constraint or trigger to ensure: +-- When status changes from 'created' to 'started', +-- initial_tasks must NOT be NULL + +ALTER TABLE pgflow.step_states +ADD CONSTRAINT initial_tasks_known_when_started + CHECK ( + status != 'started' + OR initial_tasks IS NOT NULL + ); +``` + +### 7. Update Tests +- Update test expectations to check for NULL instead of 1 +- Add specific tests for NULL -> actual value transitions +- Test that steps can't start with NULL initial_tasks + +### 8. Migration Strategy +```sql +-- Create migration to update existing data: +UPDATE pgflow.step_states +SET initial_tasks = NULL +WHERE step_slug IN ( + SELECT s.step_slug + FROM pgflow.steps s + WHERE s.step_type = 'map' + AND EXISTS ( + SELECT 1 FROM pgflow.deps d + WHERE d.flow_slug = s.flow_slug + AND d.step_slug = s.step_slug + ) +) +AND status = 'created'; +``` + +## Benefits +1. **Semantic correctness**: NULL = unknown, not "1 task" placeholder +2. **Clearer mental model**: No translation needed when reading state +3. **Easier debugging**: Can immediately see which values are unresolved +4. **Type safety**: TypeScript `number | null` enforces proper handling +5. **Simpler than expected**: Map steps having max 1 dependency eliminates complexity + +## Simplified Implementation Path +Since map steps can only have 0 or 1 dependency: +1. Root maps (0 deps): Get initial_tasks from flow input immediately +2. Dependent maps (1 dep): Start with NULL, resolve when dependency completes +3. No multi-dependency complexity or race conditions! + +## Testing Checklist +- [ ] Root map steps get correct initial_tasks from input array +- [ ] Dependent map steps start with NULL initial_tasks +- [ ] Single -> Map updates NULL to array length +- [ ] Map -> Map updates NULL to aggregated array length (future) +- [ ] Empty array propagation sets 0, not NULL +- [ ] Steps cannot start with NULL initial_tasks +- [ ] All arithmetic operations handle NULL safely \ No newline at end of file diff --git a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql new file mode 100644 index 000000000..6f13e6b4b --- /dev/null +++ b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql @@ -0,0 +1,91 @@ +create or replace function pgflow.cascade_complete_taskless_steps(run_id uuid) +returns int +language plpgsql +as $$ +DECLARE + v_total_completed int := 0; + v_iteration_completed int; + v_iterations int := 0; + v_max_iterations int := 50; +BEGIN + LOOP + -- Safety counter to prevent infinite loops + v_iterations := v_iterations + 1; + IF v_iterations > v_max_iterations THEN + RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; + END IF; + + WITH completed AS ( + -- Complete all ready taskless steps in topological order + UPDATE pgflow.step_states ss + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = s.flow_slug + AND ss.step_slug = s.step_slug + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + RETURNING ss.* + ), + dep_updates AS ( + -- Update remaining_deps and initial_tasks for dependents of completed steps + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - dep_count.count, + -- If the dependent is a map step and its dependency completed with 0 tasks, + -- set its initial_tasks to 0 as well + initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 + ELSE ss.initial_tasks + END + FROM ( + -- Count how many completed steps are dependencies of each dependent + SELECT + d.flow_slug, + d.step_slug as dependent_slug, + COUNT(*) as count, + BOOL_OR(c.initial_tasks = 0) as has_zero_tasks + FROM completed c + JOIN pgflow.deps d ON d.flow_slug = c.flow_slug + AND d.dep_slug = c.step_slug + GROUP BY d.flow_slug, d.step_slug + ) dep_count, + pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = dep_count.flow_slug + AND ss.step_slug = dep_count.dependent_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug + ), + run_updates AS ( + -- Update run's remaining_steps count + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - c.completed_count, + status = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN 'completed' + ELSE r.status + END, + completed_at = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN now() + ELSE r.completed_at + END + FROM (SELECT COUNT(*) AS completed_count FROM completed) c + WHERE r.run_id = cascade_complete_taskless_steps.run_id + AND c.completed_count > 0 + ) + SELECT COUNT(*) INTO v_iteration_completed FROM completed; + + EXIT WHEN v_iteration_completed = 0; + v_total_completed := v_total_completed + v_iteration_completed; + END LOOP; + + RETURN v_total_completed; +END; +$$; diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index 4bce7b934..da9227c49 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -70,11 +70,19 @@ dependent_steps_lock AS ( ), -- Update all dependent steps dependent_steps_update AS ( - UPDATE pgflow.step_states - SET remaining_deps = pgflow.step_states.remaining_deps - 1 - FROM dependent_steps - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = dependent_steps.dependent_step_slug + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - 1, + -- For map dependents of single steps producing arrays, set initial_tasks + initial_tasks = CASE + WHEN s.step_type = 'map' AND jsonb_typeof(complete_task.output) = 'array' + THEN jsonb_array_length(complete_task.output) + ELSE ss.initial_tasks + END + FROM dependent_steps ds, pgflow.steps s + WHERE ss.run_id = complete_task.run_id + AND ss.step_slug = ds.dependent_step_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug ) -- Only decrement remaining_steps, don't update status UPDATE pgflow.runs @@ -89,6 +97,9 @@ WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.st -- Send broadcast event for step completed if the step is completed IF v_step_state.status = 'completed' THEN + -- Step just completed, cascade any ready taskless steps + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:completed', diff --git a/pkgs/core/schemas/0100_function_start_flow.sql b/pkgs/core/schemas/0100_function_start_flow.sql index f831c0b31..efcae0428 100644 --- a/pkgs/core/schemas/0100_function_start_flow.sql +++ b/pkgs/core/schemas/0100_function_start_flow.sql @@ -93,6 +93,9 @@ PERFORM realtime.send( false ); +-- Complete any taskless steps that are ready (e.g., empty array maps) +PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); + PERFORM pgflow.start_ready_steps(v_created_run.run_id); RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index bf91818ea..8841d74a2 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -374,6 +374,10 @@ export type Database = { Args: { base_delay: number; attempts_count: number } Returns: number } + cascade_complete_taskless_steps: { + Args: { run_id: string } + Returns: number + } complete_task: { Args: { run_id: string diff --git a/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql b/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql new file mode 100644 index 000000000..a706760dd --- /dev/null +++ b/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql @@ -0,0 +1,321 @@ +-- Create "cascade_complete_taskless_steps" function +CREATE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_total_completed int := 0; + v_iteration_completed int; + v_iterations int := 0; + v_max_iterations int := 50; +BEGIN + LOOP + -- Safety counter to prevent infinite loops + v_iterations := v_iterations + 1; + IF v_iterations > v_max_iterations THEN + RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; + END IF; + + WITH completed AS ( + -- Complete all ready taskless steps in topological order + UPDATE pgflow.step_states ss + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = s.flow_slug + AND ss.step_slug = s.step_slug + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + RETURNING ss.* + ), + dep_updates AS ( + -- Update remaining_deps and initial_tasks for dependents of completed steps + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - dep_count.count, + -- If the dependent is a map step and its dependency completed with 0 tasks, + -- set its initial_tasks to 0 as well + initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 + ELSE ss.initial_tasks + END + FROM ( + -- Count how many completed steps are dependencies of each dependent + SELECT + d.flow_slug, + d.step_slug as dependent_slug, + COUNT(*) as count, + BOOL_OR(c.initial_tasks = 0) as has_zero_tasks + FROM completed c + JOIN pgflow.deps d ON d.flow_slug = c.flow_slug + AND d.dep_slug = c.step_slug + GROUP BY d.flow_slug, d.step_slug + ) dep_count, + pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = dep_count.flow_slug + AND ss.step_slug = dep_count.dependent_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug + ), + run_updates AS ( + -- Update run's remaining_steps count + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - c.completed_count, + status = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN 'completed' + ELSE r.status + END, + completed_at = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN now() + ELSE r.completed_at + END + FROM (SELECT COUNT(*) AS completed_count FROM completed) c + WHERE r.run_id = cascade_complete_taskless_steps.run_id + AND c.completed_count > 0 + ) + SELECT COUNT(*) INTO v_iteration_completed FROM completed; + + EXIT WHEN v_iteration_completed = 0; + v_total_completed := v_total_completed + v_iteration_completed; + END LOOP; + + RETURN v_total_completed; +END; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; +begin + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = complete_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + FOR UPDATE +), +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- Find all dependent steps if the current step was completed +dependent_steps AS ( + SELECT d.step_slug AS dependent_step_slug + FROM pgflow.deps d + JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug + WHERE d.dep_slug = complete_task.step_slug + ORDER BY d.step_slug -- Ensure consistent ordering +), +-- Lock dependent steps before updating +dependent_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) + FOR UPDATE +), +-- Update all dependent steps +dependent_steps_update AS ( + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - 1, + -- For map dependents of single steps producing arrays, set initial_tasks + initial_tasks = CASE + WHEN s.step_type = 'map' AND jsonb_typeof(complete_task.output) = 'array' + THEN jsonb_array_length(complete_task.output) + ELSE ss.initial_tasks + END + FROM dependent_steps ds, pgflow.steps s + WHERE ss.run_id = complete_task.run_id + AND ss.step_slug = ds.dependent_step_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug +) +-- Only decrement remaining_steps, don't update status +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- Get the updated step state for broadcasting +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- Send broadcast event for step completed if the step is completed +IF v_step_state.status = 'completed' THEN + -- Step just completed, cascade any ready taskless steps + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', complete_task.output, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); +END IF; + +-- For completed tasks: archive the message +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "start_flow" function +CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_created_run pgflow.runs%ROWTYPE; + v_root_map_count int; +begin + +-- Check for root map steps and validate input +WITH root_maps AS ( + SELECT step_slug + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + AND steps.step_type = 'map' + AND steps.deps_count = 0 +) +SELECT COUNT(*) INTO v_root_map_count FROM root_maps; + +-- If we have root map steps, validate that input is an array +IF v_root_map_count > 0 THEN + -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) + IF start_flow.input IS NULL THEN + RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; + END IF; + + -- Then check if it's not an array + IF jsonb_typeof(start_flow.input) != 'array' THEN + RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', + start_flow.flow_slug, jsonb_typeof(start_flow.input); + END IF; +END IF; + +WITH + flow_steps AS ( + SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + ), + created_run AS ( + INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) + VALUES ( + COALESCE(start_flow.run_id, gen_random_uuid()), + start_flow.flow_slug, + start_flow.input, + (SELECT count(*) FROM flow_steps) + ) + RETURNING * + ), + created_step_states AS ( + INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) + SELECT + fs.flow_slug, + (SELECT created_run.run_id FROM created_run), + fs.step_slug, + fs.deps_count, + -- For root map steps (map with no deps), set initial_tasks to array length + -- For all other steps, set initial_tasks to 1 + CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN + jsonb_array_length(start_flow.input) + ELSE + 1 + END + ELSE + 1 + END + FROM flow_steps fs + ) +SELECT * FROM created_run INTO v_created_run; + +-- Send broadcast event for run started +PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:started', + 'run_id', v_created_run.run_id, + 'flow_slug', v_created_run.flow_slug, + 'input', v_created_run.input, + 'status', 'started', + 'remaining_steps', v_created_run.remaining_steps, + 'started_at', v_created_run.started_at + ), + 'run:started', + concat('pgflow:run:', v_created_run.run_id), + false +); + +-- Complete any taskless steps that are ready (e.g., empty array maps) +PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); + +PERFORM pgflow.start_ready_steps(v_created_run.run_id); + +RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 0ce668083..492cdcf53 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:B1TUYLUWFgJLPH1+HbGlvevYntnO8YIG4ysa9K1dKkE= +h1:ftS8hMfs4DMg/E9bV4r14WzDpKvPKLCLtpqg1bdybXg= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -11,3 +11,4 @@ h1:B1TUYLUWFgJLPH1+HbGlvevYntnO8YIG4ysa9K1dKkE= 20250912075001_pgflow_temp_pr1_schema.sql h1:zVvGuRX/m8uPFCuJ7iAqOQ71onkCtze6P9d9ZsOgs98= 20250912080800_pgflow_temp_pr2_root_maps.sql h1:v2KdChKBPBOIq3nCVVtKWy1OVcIROV+tPtaTUPQujSo= 20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto= +20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc= diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/basic.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/basic.test.sql new file mode 100644 index 000000000..599ced03c --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/basic.test.sql @@ -0,0 +1,65 @@ +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Test: Single map step with empty array auto-completes via start_flow +select diag('Testing single map step with empty array completes automatically'); + +-- Create a flow with a single map step +select pgflow.create_flow('single_map_flow'); +select pgflow.add_step( + flow_slug => 'single_map_flow', + step_slug => 'map_step', + step_type => 'map' +); + +-- Start flow with empty array - this should trigger cascade completion +WITH flow AS ( + SELECT * FROM pgflow.start_flow('single_map_flow', '[]'::jsonb) +) +SELECT run_id INTO TEMPORARY test_run_id FROM flow; + +-- Verify the run was created +SELECT isnt( + (SELECT run_id FROM test_run_id), + null, + 'Run should be created' +); + +-- Verify the step state exists +select is( + (select count(*) from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'map_step'), + 1::bigint, + 'Step state should exist for map_step' +); + +-- Verify the step is completed (this is the key test) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'map_step'), + 'completed', + 'Map step with empty array should be automatically completed' +); + +-- Verify no tasks were created +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug = 'map_step'), + 0::bigint, + 'No tasks should be created for empty array map step' +); + +-- Verify the run is also completed (since it's the only step) +select is( + (select r.status from pgflow.runs r + join test_run_id t on r.run_id = t.run_id), + 'completed', + 'Run should be completed when only step is taskless and completed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/cascade_iteration_limit.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/cascade_iteration_limit.test.sql new file mode 100644 index 000000000..4c26d0196 --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/cascade_iteration_limit.test.sql @@ -0,0 +1,85 @@ +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Test: Cascade iteration safety limit (50 iterations) +select diag('Testing cascade iteration safety limit prevents infinite loops'); + +-- Create a flow with 51 chained map steps to exceed the 50 iteration limit +select pgflow.create_flow('long_cascade_flow'); + +-- Create first map step (root) +select pgflow.add_step( + flow_slug => 'long_cascade_flow', + step_slug => 'map_0', + step_type => 'map' +); + +-- Create 50 more chained map steps (map_1 through map_50) +DO $$ +DECLARE + i INT; +BEGIN + FOR i IN 1..50 LOOP + PERFORM pgflow.add_step( + flow_slug => 'long_cascade_flow', + step_slug => 'map_' || i, + deps_slugs => ARRAY['map_' || (i-1)], + step_type => 'map' + ); + END LOOP; +END $$; + +-- Verify we created 51 steps +select is( + (select count(*) from pgflow.steps where flow_slug = 'long_cascade_flow'), + 51::bigint, + '51 map steps should be created' +); + +-- Try to start flow with empty array - should hit the iteration limit +-- The cascade function should raise an exception after 50 iterations +-- We need to catch this exception in the test +DO $$ +DECLARE + v_error_caught BOOLEAN := FALSE; + v_error_message TEXT; +BEGIN + BEGIN + PERFORM pgflow.start_flow('long_cascade_flow', '[]'::jsonb); + EXCEPTION + WHEN OTHERS THEN + v_error_caught := TRUE; + v_error_message := SQLERRM; + END; + + -- Store results in a temp table for the test to check + CREATE TEMP TABLE cascade_error_test ( + error_caught BOOLEAN, + error_message TEXT + ); + INSERT INTO cascade_error_test VALUES (v_error_caught, v_error_message); +END $$; + +-- Verify an error was caught +select is( + (select error_caught from cascade_error_test), + true, + 'Should catch an exception when cascade exceeds iteration limit' +); + +-- Verify the error message mentions the iteration limit +select ok( + (select error_message from cascade_error_test) LIKE '%Cascade loop exceeded safety limit of%iterations%', + 'Error message should mention exceeding 50 iteration safety limit' +); + +-- Verify no run was created (transaction should have rolled back) +select is( + (select count(*) from pgflow.runs where flow_slug = 'long_cascade_flow'), + 0::bigint, + 'No run should be created when cascade fails with iteration limit' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/cascade_performance.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/cascade_performance.test.sql new file mode 100644 index 000000000..473807eac --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/cascade_performance.test.sql @@ -0,0 +1,226 @@ +begin; +select plan(7); +select pgflow_tests.reset_db(); + +-- Test: Measure cascade performance for different chain lengths +select diag('Testing cascade performance with increasing chain lengths'); + +-- Helper function to create a chain of N map steps +CREATE OR REPLACE FUNCTION create_map_chain(flow_name text, chain_length int) +RETURNS void AS $$ +DECLARE + i INT; +BEGIN + PERFORM pgflow.create_flow(flow_name); + + -- Create root map + PERFORM pgflow.add_step( + flow_name, + 'map_0', + step_type => 'map' + ); + + -- Create chain of dependent maps + FOR i IN 1..(chain_length - 1) LOOP + PERFORM pgflow.add_step( + flow_name, + 'map_' || i, + ARRAY['map_' || (i-1)], + step_type => 'map' + ); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- Test different chain lengths and measure performance +CREATE TEMP TABLE cascade_performance ( + chain_length int, + execution_time_ms numeric, + time_per_step_ms numeric +); + +-- Test 5-step cascade +DO $$ +DECLARE + v_start_time timestamptz; + v_end_time timestamptz; + v_duration interval; + v_ms numeric; +BEGIN + PERFORM create_map_chain('cascade_5', 5); + + v_start_time := clock_timestamp(); + PERFORM pgflow.start_flow('cascade_5', '[]'::jsonb); + v_end_time := clock_timestamp(); + + v_duration := v_end_time - v_start_time; + v_ms := EXTRACT(EPOCH FROM v_duration) * 1000; + + INSERT INTO cascade_performance VALUES (5, v_ms, v_ms / 5); + + RAISE NOTICE 'CASCADE 5 steps: % ms total, % ms per step', + round(v_ms, 2), round(v_ms / 5, 2); +END $$; + +-- Test 10-step cascade +DO $$ +DECLARE + v_start_time timestamptz; + v_end_time timestamptz; + v_duration interval; + v_ms numeric; +BEGIN + PERFORM create_map_chain('cascade_10', 10); + + v_start_time := clock_timestamp(); + PERFORM pgflow.start_flow('cascade_10', '[]'::jsonb); + v_end_time := clock_timestamp(); + + v_duration := v_end_time - v_start_time; + v_ms := EXTRACT(EPOCH FROM v_duration) * 1000; + + INSERT INTO cascade_performance VALUES (10, v_ms, v_ms / 10); + + RAISE NOTICE 'CASCADE 10 steps: % ms total, % ms per step', + round(v_ms, 2), round(v_ms / 10, 2); +END $$; + +-- Test 25-step cascade +DO $$ +DECLARE + v_start_time timestamptz; + v_end_time timestamptz; + v_duration interval; + v_ms numeric; +BEGIN + PERFORM create_map_chain('cascade_25', 25); + + v_start_time := clock_timestamp(); + PERFORM pgflow.start_flow('cascade_25', '[]'::jsonb); + v_end_time := clock_timestamp(); + + v_duration := v_end_time - v_start_time; + v_ms := EXTRACT(EPOCH FROM v_duration) * 1000; + + INSERT INTO cascade_performance VALUES (25, v_ms, v_ms / 25); + + RAISE NOTICE 'CASCADE 25 steps: % ms total, % ms per step', + round(v_ms, 2), round(v_ms / 25, 2); +END $$; + +-- Test 49-step cascade (just under the limit) +DO $$ +DECLARE + v_start_time timestamptz; + v_end_time timestamptz; + v_duration interval; + v_ms numeric; +BEGIN + PERFORM create_map_chain('cascade_49', 49); + + v_start_time := clock_timestamp(); + PERFORM pgflow.start_flow('cascade_49', '[]'::jsonb); + v_end_time := clock_timestamp(); + + v_duration := v_end_time - v_start_time; + v_ms := EXTRACT(EPOCH FROM v_duration) * 1000; + + INSERT INTO cascade_performance VALUES (49, v_ms, v_ms / 49); + + RAISE NOTICE 'CASCADE 49 steps: % ms total, % ms per step', + round(v_ms, 2), round(v_ms / 49, 2); +END $$; + +-- Display performance summary +DO $$ +DECLARE + perf_row RECORD; +BEGIN + RAISE NOTICE ''; + RAISE NOTICE '========================================'; + RAISE NOTICE '🚀 CASCADE PERFORMANCE SUMMARY'; + RAISE NOTICE '========================================'; + + FOR perf_row IN + SELECT * FROM cascade_performance ORDER BY chain_length + LOOP + RAISE NOTICE ' % steps: % ms (% ms/step)', + LPAD(perf_row.chain_length::text, 2), + LPAD(round(perf_row.execution_time_ms, 1)::text, 7), + round(perf_row.time_per_step_ms, 2); + END LOOP; + + -- Calculate scaling factor + WITH scaling AS ( + SELECT + MAX(execution_time_ms) / MIN(execution_time_ms) as time_ratio, + MAX(chain_length)::numeric / MIN(chain_length) as length_ratio + FROM cascade_performance + ) + SELECT + CASE + WHEN time_ratio < length_ratio * 1.5 THEN 'LINEAR or better' + WHEN time_ratio < length_ratio * length_ratio * 0.5 THEN 'SUB-QUADRATIC' + ELSE 'QUADRATIC or worse' + END as scaling_behavior + INTO perf_row + FROM scaling; + + RAISE NOTICE '========================================'; + RAISE NOTICE 'Scaling behavior: %', perf_row.scaling_behavior; + RAISE NOTICE '========================================'; + RAISE NOTICE ''; +END $$; + +-- Verify all cascades completed successfully +select is( + (select count(*) from pgflow.runs where status = 'completed'), + 4::bigint, + 'All 4 cascade test runs should complete' +); + +-- Verify 5-step cascade completed all steps +select is( + (select count(*) from pgflow.step_states + where flow_slug = 'cascade_5' and status = 'completed'), + 5::bigint, + '5-step cascade should complete all 5 steps' +); + +-- Verify 10-step cascade completed all steps +select is( + (select count(*) from pgflow.step_states + where flow_slug = 'cascade_10' and status = 'completed'), + 10::bigint, + '10-step cascade should complete all 10 steps' +); + +-- Verify 25-step cascade completed all steps +select is( + (select count(*) from pgflow.step_states + where flow_slug = 'cascade_25' and status = 'completed'), + 25::bigint, + '25-step cascade should complete all 25 steps' +); + +-- Verify 49-step cascade completed all steps +select is( + (select count(*) from pgflow.step_states + where flow_slug = 'cascade_49' and status = 'completed'), + 49::bigint, + '49-step cascade should complete all 49 steps' +); + +-- Performance assertions +select ok( + (select MAX(execution_time_ms) from cascade_performance) < 5000, + 'Even 49-step cascade should complete in under 5 seconds' +); + +select ok( + (select AVG(time_per_step_ms) from cascade_performance) < 100, + 'Average time per step should be under 100ms' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/normal_to_map_empty.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/normal_to_map_empty.test.sql new file mode 100644 index 000000000..25a85eb96 --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/normal_to_map_empty.test.sql @@ -0,0 +1,115 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: Normal step producing empty array -> map dependent +select diag('Testing normal step producing [] -> map dependent'); + +-- Create a flow with normal step followed by map step +select pgflow.create_flow('normal_to_map_flow'); + +-- Add normal root step +select pgflow.add_step( + flow_slug => 'normal_to_map_flow', + step_slug => 'normal_step', + step_type => 'single' +); + +-- Add dependent map step +select pgflow.add_step( + 'normal_to_map_flow', + 'dependent_map', + ARRAY['normal_step'], + step_type => 'map' +); + +-- Start flow with some input +WITH flow AS ( + SELECT * FROM pgflow.start_flow('normal_to_map_flow', '{"data": "test"}'::jsonb) +) +SELECT run_id INTO TEMPORARY test_run_id FROM flow; + +-- Verify the run was created +SELECT isnt( + (SELECT run_id FROM test_run_id), + null, + 'Run should be created' +); + +-- Verify normal step is started (has a task) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step'), + 'started', + 'Normal step should be started' +); + +-- Verify a task was created for normal step +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug = 'normal_step'), + 1::bigint, + 'One task should be created for normal step' +); + +-- Start and complete the task with empty array output +-- First start the task +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('normal_to_map_flow', 1, 1) LIMIT 1 +) +SELECT pgflow.complete_task( + (SELECT run_id FROM task), + (SELECT step_slug FROM task), + 0, + '[]'::jsonb -- empty array output +) +FROM task; + +-- Verify normal step is completed +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step'), + 'completed', + 'Normal step should be completed' +); + +-- Verify dependent map is completed (cascade should have triggered) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'dependent_map'), + 'completed', + 'Dependent map step should be automatically completed via cascade' +); + +-- Verify dependent map has initial_tasks = 0 +select is( + (select ss.initial_tasks from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'dependent_map'), + 0, + 'Dependent map should have initial_tasks = 0' +); + +-- Verify no tasks were created for dependent map +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug = 'dependent_map'), + 0::bigint, + 'No tasks should be created for dependent map with empty array' +); + +-- Verify the run is completed +select is( + (select r.status from pgflow.runs r + join test_run_id t on r.run_id = t.run_id), + 'completed', + 'Run should be completed when all steps are done' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/taskless_map_to_two_normals.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/taskless_map_to_two_normals.test.sql new file mode 100644 index 000000000..43935a280 --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/taskless_map_to_two_normals.test.sql @@ -0,0 +1,109 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: Taskless map root -> two normal dependents +select diag('Testing taskless map root -> two normal dependents'); + +-- Create a flow with taskless map root and two normal dependents +select pgflow.create_flow('map_to_normals_flow'); + +-- Add root map step +select pgflow.add_step( + flow_slug => 'map_to_normals_flow', + step_slug => 'root_map', + step_type => 'map' +); + +-- Add first normal dependent +select pgflow.add_step( + 'map_to_normals_flow', + 'normal_step_1', + ARRAY['root_map'], + step_type => 'single' +); + +-- Add second normal dependent +select pgflow.add_step( + 'map_to_normals_flow', + 'normal_step_2', + ARRAY['root_map'], + step_type => 'single' +); + +-- Start flow with empty array - root map should complete, normal steps should be ready +WITH flow AS ( + SELECT * FROM pgflow.start_flow('map_to_normals_flow', '[]'::jsonb) +) +SELECT run_id INTO TEMPORARY test_run_id FROM flow; + +-- Verify the run was created +SELECT isnt( + (SELECT run_id FROM test_run_id), + null, + 'Run should be created' +); + +-- Verify all three step states exist +select is( + (select count(*) from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id), + 3::bigint, + 'All three step states should exist' +); + +-- Verify root map is completed (taskless) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'root_map'), + 'completed', + 'Root map step should be automatically completed' +); + +-- Verify first normal step is started (has remaining_deps = 0) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step_1'), + 'started', + 'First normal step should be started' +); + +-- Verify second normal step is started (has remaining_deps = 0) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step_2'), + 'started', + 'Second normal step should be started' +); + +-- Verify tasks were created for both normal steps +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug IN ('normal_step_1', 'normal_step_2')), + 2::bigint, + 'Two tasks should be created for normal steps' +); + +-- Verify no tasks for the root map +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug = 'root_map'), + 0::bigint, + 'No tasks should be created for taskless root map' +); + +-- Verify the run is NOT completed (normal steps still need to run) +select is( + (select r.status from pgflow.runs r + join test_run_id t on r.run_id = t.run_id), + 'started', + 'Run should still be started (waiting for normal steps)' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/taskless_sequence.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/taskless_sequence.test.sql new file mode 100644 index 000000000..1041c14cd --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/taskless_sequence.test.sql @@ -0,0 +1,91 @@ +begin; +select plan(7); +select pgflow_tests.reset_db(); + +-- Test: Sequence of taskless map steps (root map -> dependent map) +select diag('Testing taskless sequence: map root -> map dependent'); + +-- Create a flow with two map steps in sequence +select pgflow.create_flow('taskless_sequence_flow'); + +-- Add root map step +select pgflow.add_step( + flow_slug => 'taskless_sequence_flow', + step_slug => 'root_map', + step_type => 'map' +); + +-- Add dependent map step +select pgflow.add_step( + 'taskless_sequence_flow', + 'dependent_map', + ARRAY['root_map'], + step_type => 'map' +); + +-- Start flow with empty array - both steps should cascade to completion +WITH flow AS ( + SELECT * FROM pgflow.start_flow('taskless_sequence_flow', '[]'::jsonb) +) +SELECT run_id INTO TEMPORARY test_run_id FROM flow; + +-- Verify the run was created +SELECT isnt( + (SELECT run_id FROM test_run_id), + null, + 'Run should be created' +); + +-- Verify both step states exist +select is( + (select count(*) from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id), + 2::bigint, + 'Both step states should exist' +); + +-- Verify root map is completed +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'root_map'), + 'completed', + 'Root map step should be automatically completed' +); + +-- Verify dependent map is completed (this tests the cascade) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'dependent_map'), + 'completed', + 'Dependent map step should be automatically completed via cascade' +); + +-- Verify no tasks were created for either step +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id), + 0::bigint, + 'No tasks should be created for taskless steps' +); + +-- Verify the dependent map has remaining_deps = 0 (was decremented) +select is( + (select ss.remaining_deps from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'dependent_map'), + 0, + 'Dependent map should have remaining_deps decremented to 0' +); + +-- Verify the run is completed +select is( + (select r.status from pgflow.runs r + join test_run_id t on r.run_id = t.run_id), + 'completed', + 'Run should be completed when all steps are taskless and completed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/completing_taskless_steps/two_taskless_maps_to_normal.test.sql b/pkgs/core/supabase/tests/completing_taskless_steps/two_taskless_maps_to_normal.test.sql new file mode 100644 index 000000000..20e3605b8 --- /dev/null +++ b/pkgs/core/supabase/tests/completing_taskless_steps/two_taskless_maps_to_normal.test.sql @@ -0,0 +1,117 @@ +begin; +select plan(9); +select pgflow_tests.reset_db(); + +-- Test: Two parallel taskless maps -> one normal dependent (fan-in) +select diag('Testing two parallel taskless maps -> one normal dependent'); + +-- Create a flow with two root taskless maps converging on a normal step +select pgflow.create_flow('maps_to_normal_flow'); + +-- Add first root map step +select pgflow.add_step( + flow_slug => 'maps_to_normal_flow', + step_slug => 'root_map_1', + step_type => 'map' +); + +-- Add second root map step +select pgflow.add_step( + flow_slug => 'maps_to_normal_flow', + step_slug => 'root_map_2', + step_type => 'map' +); + +-- Add normal step that depends on both maps +select pgflow.add_step( + 'maps_to_normal_flow', + 'normal_step', + ARRAY['root_map_1', 'root_map_2'], + step_type => 'single' +); + +-- Start flow with empty array - both root maps should complete, normal step should be ready +WITH flow AS ( + SELECT * FROM pgflow.start_flow('maps_to_normal_flow', '[]'::jsonb) +) +SELECT run_id INTO TEMPORARY test_run_id FROM flow; + +-- Verify the run was created +SELECT isnt( + (SELECT run_id FROM test_run_id), + null, + 'Run should be created' +); + +-- Verify all three step states exist +select is( + (select count(*) from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id), + 3::bigint, + 'All three step states should exist' +); + +-- Verify first root map is completed +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'root_map_1'), + 'completed', + 'First root map should be automatically completed' +); + +-- Verify second root map is completed +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'root_map_2'), + 'completed', + 'Second root map should be automatically completed' +); + +-- Verify normal step is started (both dependencies completed) +select is( + (select ss.status from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step'), + 'started', + 'Normal step should be started after both maps complete' +); + +-- Verify normal step has remaining_deps = 0 +select is( + (select ss.remaining_deps from pgflow.step_states ss + join test_run_id t on ss.run_id = t.run_id + where ss.step_slug = 'normal_step'), + 0, + 'Normal step should have remaining_deps = 0' +); + +-- Verify task was created for normal step +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug = 'normal_step'), + 1::bigint, + 'One task should be created for normal step' +); + +-- Verify no tasks for the root maps +select is( + (select count(*) from pgflow.step_tasks st + join test_run_id t on st.run_id = t.run_id + where st.step_slug IN ('root_map_1', 'root_map_2')), + 0::bigint, + 'No tasks should be created for taskless maps' +); + +-- Verify the run is NOT completed (normal step still needs to run) +select is( + (select r.status from pgflow.runs r + join test_run_id t on r.run_id = t.run_id), + 'started', + 'Run should still be started (waiting for normal step)' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/start_ready_steps/map_queue_messages.test.sql b/pkgs/core/supabase/tests/start_ready_steps/map_queue_messages.test.sql index 3a5eadf20..be357f923 100644 --- a/pkgs/core/supabase/tests/start_ready_steps/map_queue_messages.test.sql +++ b/pkgs/core/supabase/tests/start_ready_steps/map_queue_messages.test.sql @@ -13,17 +13,8 @@ select pgflow.add_step( step_type => 'map' ); --- Start a flow with an array input of 3 items -insert into pgflow.runs (flow_slug, status, input) -values ('test_map_queue', 'started', '["a", "b", "c"]'::jsonb) -returning run_id as test_run_id \gset - --- Initialize step state -insert into pgflow.step_states (flow_slug, run_id, step_slug, initial_tasks, remaining_deps) -values ('test_map_queue', :'test_run_id', 'map_step', 3, 0); - --- Call start_ready_steps -select pgflow.start_ready_steps(:'test_run_id'); +-- Start flow with array input - this will handle everything including calling start_ready_steps +select run_id as test_run_id from pgflow.start_flow('test_map_queue', '["a", "b", "c"]'::jsonb) \gset -- Check messages in the queue with messages as ( @@ -81,17 +72,8 @@ select pgflow.add_step( start_delay => 5 -- 5 second delay ); --- Start flow -insert into pgflow.runs (flow_slug, status, input) -values ('test_delayed_map', 'started', '[1, 2]'::jsonb) -returning run_id as delayed_run_id \gset - --- Initialize step state -insert into pgflow.step_states (flow_slug, run_id, step_slug, initial_tasks, remaining_deps) -values ('test_delayed_map', :'delayed_run_id', 'delayed_map', 2, 0); - --- Call start_ready_steps -select pgflow.start_ready_steps(:'delayed_run_id'); +-- Start flow with array input - this will handle everything +select run_id as delayed_run_id from pgflow.start_flow('test_delayed_map', '[1, 2]'::jsonb) \gset -- Verify messages are scheduled with delay select is( diff --git a/pkgs/core/supabase/tests/start_ready_steps/map_task_spawning.test.sql b/pkgs/core/supabase/tests/start_ready_steps/map_task_spawning.test.sql index 9f2a7af56..cea925350 100644 --- a/pkgs/core/supabase/tests/start_ready_steps/map_task_spawning.test.sql +++ b/pkgs/core/supabase/tests/start_ready_steps/map_task_spawning.test.sql @@ -13,17 +13,8 @@ select pgflow.add_step( step_type => 'map' ); --- Start a flow with an array input of 3 items -insert into pgflow.runs (flow_slug, status, input) -values ('test_map_spawning', 'started', '[1, 2, 3]'::jsonb) -returning run_id as test_run_id \gset - --- Initialize step states (simulating what start_flow does) -insert into pgflow.step_states (flow_slug, run_id, step_slug, initial_tasks, remaining_deps) -values ('test_map_spawning', :'test_run_id', 'map_step', 3, 0); - --- Call start_ready_steps to spawn tasks -select pgflow.start_ready_steps(:'test_run_id'); +-- Start flow with array input - this will initialize step_states properly +select run_id as test_run_id from pgflow.start_flow('test_map_spawning', '[1, 2, 3]'::jsonb) \gset -- Verify step status changed to 'started' select is( @@ -69,32 +60,29 @@ select is( -- Test: Single step still spawns only 1 task select diag('Testing single step spawns only 1 task'); --- Add a single step +-- Create a new flow with a single root step for this test +select pgflow.create_flow('test_single_spawning'); select pgflow.add_step( - flow_slug => 'test_map_spawning', + flow_slug => 'test_single_spawning', step_slug => 'single_step', step_type => 'single' ); --- Initialize single step state -insert into pgflow.step_states (flow_slug, run_id, step_slug, initial_tasks, remaining_deps) -values ('test_map_spawning', :'test_run_id', 'single_step', 1, 0); - --- Call start_ready_steps again -select pgflow.start_ready_steps(:'test_run_id'); +-- Start flow with any input +select run_id as single_run_id from pgflow.start_flow('test_single_spawning', '{}'::jsonb) \gset -- Verify single step spawns only 1 task select is( - (select count(*) from pgflow.step_tasks - where run_id = :'test_run_id' and step_slug = 'single_step'), + (select count(*) from pgflow.step_tasks + where run_id = :'single_run_id' and step_slug = 'single_step'), 1::bigint, 'Single step should create only 1 task' ); -- Verify single step task has task_index = 0 select is( - (select task_index from pgflow.step_tasks - where run_id = :'test_run_id' and step_slug = 'single_step'), + (select task_index from pgflow.step_tasks + where run_id = :'single_run_id' and step_slug = 'single_step'), 0, 'Single step task should have task_index = 0' ); @@ -110,17 +98,8 @@ select pgflow.add_step( step_type => 'map' ); --- Start flow with empty array -insert into pgflow.runs (flow_slug, status, input) -values ('test_empty_map', 'started', '[]'::jsonb) -returning run_id as empty_run_id \gset - --- Initialize step state with initial_tasks = 0 -insert into pgflow.step_states (flow_slug, run_id, step_slug, initial_tasks, remaining_deps) -values ('test_empty_map', :'empty_run_id', 'empty_map_step', 0, 0); - --- Call start_ready_steps -select pgflow.start_ready_steps(:'empty_run_id'); +-- Start flow with empty array - this will handle everything +select run_id as empty_run_id from pgflow.start_flow('test_empty_map', '[]'::jsonb) \gset -- Verify step went directly to 'completed' status select is(