Skip to content

Commit ce696e8

Browse files
committed
feat: Add scripts and tests for task spawning and step execution in flow management
- Introduced collect_perf_data.sh for performance testing of large array handling - Updated start_ready_steps function to handle empty map steps and initialize task states - Added migration script to modify start_ready_steps for correct task spawning - Created tests for map step message queueing, delayed message scheduling, and task spawning - Ensured proper handling of initial_tasks, task indices, and step status transitions - Included tests for both map and single steps to verify correct task creation and message dispatching
1 parent c0fc2a1 commit ce696e8

File tree

8 files changed

+759
-80
lines changed

8 files changed

+759
-80
lines changed

PLAN.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,19 @@
1818
- Enhanced start_flow() for root map validation and count setting
1919
- Tests for root map scenarios
2020

21-
- [ ] **Task Spawning**
21+
- [x] **PR #210: Task Spawning** - `09-12-task-spawning` (COMPLETED)
2222

2323
- Enhanced start_ready_steps() for N task generation
2424
- Empty array auto-completion
2525
- Tests for batch task creation
2626

27+
- [ ] **Cascade Complete Taskless Steps**
28+
29+
- Extract taskless completion from start_ready_steps()
30+
- Add cascade capability for chains of taskless steps
31+
- Generic solution for all initial_tasks=0 steps
32+
- See PLAN_cascade_complete_taskless_steps.md for details
33+
2734
- [ ] **Array Element Extraction**
2835

2936
- Enhanced start_tasks() for map input extraction
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# PLAN: Cascade Complete Taskless Steps
2+
3+
## Problem Statement
4+
5+
Steps with `initial_tasks = 0` need immediate completion without task execution. When such a step completes, its dependents may become ready - and if those dependents are also taskless, they should complete immediately as well, creating a cascade effect.
6+
7+
Currently, this cascade doesn't happen, leaving taskless steps in a "ready but not completed" state.
8+
9+
## Current State
10+
11+
`start_ready_steps` currently contains logic to complete empty map steps (taskless), but:
12+
- It only handles the immediate step, not cascading to dependents
13+
- This logic is mixed with task spawning concerns
14+
- It can't handle chains of taskless steps
15+
16+
This plan extracts that logic into a dedicated function and adds cascade capability.
17+
18+
## Taskless Step Types
19+
20+
### Current
21+
- **Empty array maps**: Map steps receiving `[]` input
22+
23+
### Future (generic design)
24+
- **Condition gates**: Evaluate JSONP conditions, route without execution
25+
- **Validators**: Check constraints, pass/fail instantly
26+
- **Aggregators**: Might receive 0 inputs to aggregate
27+
- **Routers**: Direct flow based on input, no processing needed
28+
29+
The solution must be **generic** - not checking `step_type` but relying on `initial_tasks = 0`.
30+
31+
## Edge Cases & Patterns
32+
33+
### Chain cascade
34+
```
35+
A (taskless) → B (taskless) → C (taskless) → D (normal)
36+
```
37+
All taskless steps complete instantly, then D starts.
38+
39+
### Fan-in pattern
40+
```
41+
A (normal) ⟋
42+
→ C (taskless) → D (normal)
43+
B (normal) ⟌
44+
```
45+
C completes only when BOTH A and B complete.
46+
47+
### Mixed cascade
48+
```
49+
A (normal) → B (taskless) → C (taskless) → D (normal) → E (taskless)
50+
```
51+
- B,C cascade when A completes
52+
- E completes instantly when D completes
53+
- Two separate cascade events
54+
55+
### Entire flow taskless
56+
```
57+
Validate → Route → Log
58+
```
59+
Entire flow completes synchronously in `start_flow` call.
60+
61+
## Proposed Solution
62+
63+
### Architecture: Complete Separation
64+
65+
Two independent functions with clear boundaries:
66+
67+
1. **`cascade_complete_taskless_steps(run_id)`**
68+
- Completes ALL ready taskless steps recursively
69+
- Updates dependent step counts
70+
- Handles entire cascade in one call
71+
- Returns count of completed steps
72+
73+
2. **`start_ready_steps(run_id)`**
74+
- ONLY starts steps with `initial_tasks > 0`
75+
- Explicitly excludes taskless steps
76+
- Spawns tasks and sends messages
77+
78+
### Implementation Strategy
79+
80+
```sql
81+
-- Pseudo-code for cascade function
82+
FUNCTION cascade_complete_taskless_steps(run_id)
83+
LOOP
84+
-- Complete ready taskless steps
85+
UPDATE step_states
86+
WHERE remaining_deps = 0
87+
AND initial_tasks = 0
88+
AND status = 'created'
89+
RETURNING * INTO completed_steps
90+
91+
-- Update their dependents
92+
UPDATE step_states
93+
SET remaining_deps = remaining_deps - 1
94+
FROM completed_steps
95+
96+
-- Update run remaining_steps
97+
UPDATE runs
98+
SET remaining_steps = remaining_steps - COUNT(completed_steps)
99+
100+
EXIT WHEN COUNT(completed_steps) = 0
101+
END LOOP
102+
```
103+
104+
### Integration Points
105+
106+
```sql
107+
-- In start_flow
108+
PERFORM cascade_complete_taskless_steps(run_id) -- First
109+
PERFORM start_ready_steps(run_id) -- Second
110+
111+
-- In complete_task
112+
-- After completing task and updating dependents:
113+
PERFORM cascade_complete_taskless_steps(run_id) -- First
114+
PERFORM start_ready_steps(run_id) -- Second
115+
```
116+
117+
### Why Loop Instead of Recursive CTE
118+
119+
- **Snapshot isolation**: Each loop iteration sees previous updates
120+
- **Simpler debugging**: Can log/track each cascade level
121+
- **Clearer semantics**: Each iteration is a "wave" of completions
122+
- **No RETURNING complexity**: Don't need to thread state through recursion
123+
124+
## Testing Strategy
125+
126+
Create dedicated test folder: `pkgs/core/supabase/tests/cascade_taskless/`
127+
128+
### Test cases needed
129+
130+
1. **Basic cascade**: Chain of 3 taskless steps
131+
2. **Fan-in**: Multiple deps converging on taskless step
132+
3. **Mixed flow**: Alternating taskless and normal steps
133+
4. **Empty array maps**: Current use case
134+
5. **Entire taskless flow**: Should complete synchronously
135+
6. **No cascade**: Single taskless step with normal dependent
136+
7. **Realtime events**: Verify each completed step sends event
137+
138+
### Test-First Development
139+
140+
1. Write failing test for simplest case
141+
2. Implement minimal cascade logic
142+
3. Add complex pattern test
143+
4. Extend implementation
144+
5. Repeat until all patterns covered
145+
146+
## Benefits
147+
148+
- **Generic**: Handles all taskless step types, current and future
149+
- **Decoupled**: Clear separation of concerns
150+
- **Efficient**: Batch operations, minimal queries
151+
- **Future-proof**: Ready for worker process separation
152+
- **Testable**: Each function has single responsibility
153+
154+
## Migration Notes
155+
156+
- No schema changes needed
157+
- Pure function additions
158+
- Backward compatible
159+
- Can be deployed independently

pkgs/core/schemas/0100_function_start_ready_steps.sql

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,66 @@ language sql
44
set search_path to ''
55
as $$
66

7-
WITH ready_steps AS (
7+
-- First handle empty array map steps (initial_tasks = 0) - direct transition to completed
8+
WITH empty_map_steps AS (
9+
SELECT step_state.*
10+
FROM pgflow.step_states AS step_state
11+
JOIN pgflow.steps AS step
12+
ON step.flow_slug = step_state.flow_slug
13+
AND step.step_slug = step_state.step_slug
14+
WHERE step_state.run_id = start_ready_steps.run_id
15+
AND step_state.status = 'created'
16+
AND step_state.remaining_deps = 0
17+
AND step.step_type = 'map'
18+
AND step_state.initial_tasks = 0
19+
ORDER BY step_state.step_slug
20+
FOR UPDATE OF step_state
21+
),
22+
completed_empty_steps AS (
23+
UPDATE pgflow.step_states
24+
SET status = 'completed',
25+
started_at = now(),
26+
completed_at = now(),
27+
remaining_tasks = 0
28+
FROM empty_map_steps
29+
WHERE pgflow.step_states.run_id = start_ready_steps.run_id
30+
AND pgflow.step_states.step_slug = empty_map_steps.step_slug
31+
RETURNING pgflow.step_states.*
32+
),
33+
broadcast_empty_completed AS (
34+
SELECT
35+
realtime.send(
36+
jsonb_build_object(
37+
'event_type', 'step:completed',
38+
'run_id', completed_step.run_id,
39+
'step_slug', completed_step.step_slug,
40+
'status', 'completed',
41+
'started_at', completed_step.started_at,
42+
'completed_at', completed_step.completed_at,
43+
'remaining_tasks', 0,
44+
'remaining_deps', 0,
45+
'output', '[]'::jsonb
46+
),
47+
concat('step:', completed_step.step_slug, ':completed'),
48+
concat('pgflow:run:', completed_step.run_id),
49+
false
50+
)
51+
FROM completed_empty_steps AS completed_step
52+
),
53+
54+
-- Now handle non-empty steps (both single and map with initial_tasks > 0)
55+
ready_steps AS (
856
SELECT *
957
FROM pgflow.step_states AS step_state
1058
WHERE step_state.run_id = start_ready_steps.run_id
1159
AND step_state.status = 'created'
1260
AND step_state.remaining_deps = 0
61+
-- Exclude empty map steps already handled
62+
AND NOT EXISTS (
63+
SELECT 1 FROM empty_map_steps
64+
WHERE empty_map_steps.run_id = step_state.run_id
65+
AND empty_map_steps.step_slug = step_state.step_slug
66+
)
1367
ORDER BY step_state.step_slug
1468
FOR UPDATE
1569
),
@@ -23,26 +77,48 @@ started_step_states AS (
2377
AND pgflow.step_states.step_slug = ready_steps.step_slug
2478
RETURNING pgflow.step_states.*
2579
),
26-
sent_messages AS (
80+
81+
-- Generate tasks based on initial_tasks count
82+
-- For single steps: initial_tasks = 1, so generate_series(0, 0) = single task with index 0
83+
-- For map steps: initial_tasks = N, so generate_series(0, N-1) = N tasks with indices 0..N-1
84+
-- Group messages by step for batch sending
85+
message_batches AS (
2786
SELECT
2887
started_step.flow_slug,
2988
started_step.run_id,
3089
started_step.step_slug,
31-
pgmq.send(
32-
started_step.flow_slug,
90+
COALESCE(step.opt_start_delay, 0) as delay,
91+
array_agg(
3392
jsonb_build_object(
3493
'flow_slug', started_step.flow_slug,
3594
'run_id', started_step.run_id,
3695
'step_slug', started_step.step_slug,
37-
'task_index', 0
38-
),
39-
COALESCE(step.opt_start_delay, 0)
40-
) AS msg_id
96+
'task_index', task_idx.task_index
97+
) ORDER BY task_idx.task_index
98+
) AS messages,
99+
array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices
41100
FROM started_step_states AS started_step
42101
JOIN pgflow.steps AS step
43102
ON step.flow_slug = started_step.flow_slug
44103
AND step.step_slug = started_step.step_slug
104+
-- Generate task indices from 0 to initial_tasks-1
105+
CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index)
106+
GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay
45107
),
108+
-- Send messages in batch for better performance with large arrays
109+
sent_messages AS (
110+
SELECT
111+
mb.flow_slug,
112+
mb.run_id,
113+
mb.step_slug,
114+
task_indices.task_index,
115+
msg_ids.msg_id
116+
FROM message_batches mb
117+
CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord)
118+
CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord)
119+
WHERE task_indices.idx_ord = msg_ids.msg_ord
120+
),
121+
46122
broadcast_events AS (
47123
SELECT
48124
realtime.send(
@@ -61,11 +137,14 @@ broadcast_events AS (
61137
)
62138
FROM started_step_states AS started_step
63139
)
64-
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id)
140+
141+
-- Insert all generated tasks with their respective task_index values
142+
INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id)
65143
SELECT
66144
sent_messages.flow_slug,
67145
sent_messages.run_id,
68146
sent_messages.step_slug,
147+
sent_messages.task_index,
69148
sent_messages.msg_id
70149
FROM sent_messages;
71150

0 commit comments

Comments
 (0)