Skip to content

Commit df1acae

Browse files
committed
feat: add task_index attribute to step_task_record and start_tasks function
- Updated the step_task_record type to include task_index - Modified start_tasks function to return task_index alongside msg_id - Adjusted database schema and TypeScript types to support task_index - Enhanced input construction logic for map and non-map steps based on task_index - Added migration to modify start_tasks function and type to handle task_index - Updated test to complete a task with the new task_index attribute
1 parent 842f929 commit df1acae

File tree

6 files changed

+191
-13
lines changed

6 files changed

+191
-13
lines changed

pkgs/core/schemas/0040_types.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ create type pgflow.step_task_record as (
44
run_id uuid,
55
step_slug text,
66
input jsonb,
7-
msg_id bigint
7+
msg_id bigint,
8+
task_index int
89
);

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ as $$
2525
),
2626
start_tasks_update as (
2727
update pgflow.step_tasks
28-
set
28+
set
2929
attempts_count = attempts_count + 1,
3030
status = 'started',
3131
started_at = now(),
@@ -171,7 +171,8 @@ as $$
171171
-- If no dependencies, defaults to empty object
172172
coalesce(dep_out.deps_output, '{}'::jsonb)
173173
END as input,
174-
st.message_id as msg_id
174+
st.message_id as msg_id,
175+
st.task_index as task_index
175176
from tasks st
176177
join runs r on st.run_id = r.run_id
177178
join pgflow.steps step on

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ export type Database = {
515515
step_slug: string | null
516516
input: Json | null
517517
msg_id: number | null
518+
task_index: number | null
518519
}
519520
}
520521
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
-- Modify "step_task_record" composite type
2+
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer;
3+
-- Modify "start_tasks" function
4+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
5+
with tasks as (
6+
select
7+
task.flow_slug,
8+
task.run_id,
9+
task.step_slug,
10+
task.task_index,
11+
task.message_id
12+
from pgflow.step_tasks as task
13+
join pgflow.runs r on r.run_id = task.run_id
14+
where task.flow_slug = start_tasks.flow_slug
15+
and task.message_id = any(msg_ids)
16+
and task.status = 'queued'
17+
-- MVP: Don't start tasks on failed runs
18+
and r.status != 'failed'
19+
),
20+
start_tasks_update as (
21+
update pgflow.step_tasks
22+
set
23+
attempts_count = attempts_count + 1,
24+
status = 'started',
25+
started_at = now(),
26+
last_worker_id = worker_id
27+
from tasks
28+
where step_tasks.message_id = tasks.message_id
29+
and step_tasks.flow_slug = tasks.flow_slug
30+
and step_tasks.status = 'queued'
31+
),
32+
runs as (
33+
select
34+
r.run_id,
35+
r.input
36+
from pgflow.runs r
37+
where r.run_id in (select run_id from tasks)
38+
),
39+
deps as (
40+
select
41+
st.run_id,
42+
st.step_slug,
43+
dep.dep_slug,
44+
-- Aggregate map outputs or use single output
45+
CASE
46+
WHEN dep_step.step_type = 'map' THEN
47+
-- Aggregate all task outputs ordered by task_index
48+
-- Use COALESCE to return empty array if no tasks
49+
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
50+
FROM pgflow.step_tasks dt
51+
WHERE dt.run_id = st.run_id
52+
AND dt.step_slug = dep.dep_slug
53+
AND dt.status = 'completed')
54+
ELSE
55+
-- Single step: use the single task output
56+
dep_task.output
57+
END as dep_output
58+
from tasks st
59+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
60+
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
61+
left join pgflow.step_tasks dep_task on
62+
dep_task.run_id = st.run_id and
63+
dep_task.step_slug = dep.dep_slug and
64+
dep_task.status = 'completed'
65+
and dep_step.step_type = 'single' -- Only join for single steps
66+
),
67+
deps_outputs as (
68+
select
69+
d.run_id,
70+
d.step_slug,
71+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
72+
count(*) as dep_count
73+
from deps d
74+
group by d.run_id, d.step_slug
75+
),
76+
timeouts as (
77+
select
78+
task.message_id,
79+
task.flow_slug,
80+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
81+
from tasks task
82+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
83+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
84+
),
85+
-- Batch update visibility timeouts for all messages
86+
set_vt_batch as (
87+
select pgflow.set_vt_batch(
88+
start_tasks.flow_slug,
89+
array_agg(t.message_id order by t.message_id),
90+
array_agg(t.vt_delay order by t.message_id)
91+
)
92+
from timeouts t
93+
)
94+
select
95+
st.flow_slug,
96+
st.run_id,
97+
st.step_slug,
98+
-- ==========================================
99+
-- INPUT CONSTRUCTION LOGIC
100+
-- ==========================================
101+
-- This nested CASE statement determines how to construct the input
102+
-- for each task based on the step type (map vs non-map).
103+
--
104+
-- The fundamental difference:
105+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
106+
-- - Non-map steps: Receive structured objects with named keys
107+
-- (e.g., {"run": {...}, "dependency1": {...}})
108+
-- ==========================================
109+
CASE
110+
-- -------------------- MAP STEPS --------------------
111+
-- Map steps process arrays element-by-element.
112+
-- Each task receives ONE element from the array at its task_index position.
113+
WHEN step.step_type = 'map' THEN
114+
-- Map steps get raw array elements without any wrapper object
115+
CASE
116+
-- ROOT MAP: Gets array from run input
117+
-- Example: run input = [1, 2, 3]
118+
-- task 0 gets: 1
119+
-- task 1 gets: 2
120+
-- task 2 gets: 3
121+
WHEN step.deps_count = 0 THEN
122+
-- Root map (deps_count = 0): no dependencies, reads from run input.
123+
-- Extract the element at task_index from the run's input array.
124+
-- Note: If run input is not an array, this will return NULL
125+
-- and the flow will fail (validated in start_flow).
126+
jsonb_array_element(r.input, st.task_index)
127+
128+
-- DEPENDENT MAP: Gets array from its single dependency
129+
-- Example: dependency output = ["a", "b", "c"]
130+
-- task 0 gets: "a"
131+
-- task 1 gets: "b"
132+
-- task 2 gets: "c"
133+
ELSE
134+
-- Has dependencies (should be exactly 1 for map steps).
135+
-- Extract the element at task_index from the dependency's output array.
136+
--
137+
-- Why the subquery with jsonb_each?
138+
-- - The dependency outputs a raw array: [1, 2, 3]
139+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
140+
-- - We need to unwrap and get just the array value
141+
-- - Map steps have exactly 1 dependency (enforced by add_step)
142+
-- - So jsonb_each will return exactly 1 row
143+
-- - We extract the 'value' which is the raw array [1, 2, 3]
144+
-- - Then get the element at task_index from that array
145+
(SELECT jsonb_array_element(value, st.task_index)
146+
FROM jsonb_each(dep_out.deps_output)
147+
LIMIT 1)
148+
END
149+
150+
-- -------------------- NON-MAP STEPS --------------------
151+
-- Regular (non-map) steps receive ALL inputs as a structured object.
152+
-- This includes the original run input plus all dependency outputs.
153+
ELSE
154+
-- Non-map steps get structured input with named keys
155+
-- Example output: {
156+
-- "run": {"original": "input"},
157+
-- "step1": {"output": "from_step1"},
158+
-- "step2": {"output": "from_step2"}
159+
-- }
160+
--
161+
-- Build object with 'run' key containing original input
162+
jsonb_build_object('run', r.input) ||
163+
-- Merge with deps_output which already has dependency outputs
164+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
165+
-- If no dependencies, defaults to empty object
166+
coalesce(dep_out.deps_output, '{}'::jsonb)
167+
END as input,
168+
st.message_id as msg_id,
169+
st.task_index as task_index
170+
from tasks st
171+
join runs r on st.run_id = r.run_id
172+
join pgflow.steps step on
173+
step.flow_slug = st.flow_slug and
174+
step.step_slug = st.step_slug
175+
left join deps_outputs dep_out on
176+
dep_out.run_id = st.run_id and
177+
dep_out.step_slug = st.step_slug
178+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
1+
h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -16,3 +16,4 @@ h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
1616
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=
1717
20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60=
1818
20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8=
19+
20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql h1:DguPK41IfsMykzodXqZq0BmW1IXZW8ZTj6rkw4LaHFE=

pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,17 @@ select is(
7575
do $$
7676
declare
7777
v_run_id uuid;
78-
v_task pgflow.step_task_record;
7978
v_start_time timestamp;
8079
v_duration interval;
8180
begin
8281
select run_id into v_run_id from pgflow.runs limit 1;
8382

84-
-- Get consumer task
85-
select * into v_task from pgflow.step_tasks
86-
where run_id = v_run_id and step_slug = 'consumer' and status = 'started';
87-
88-
-- Complete it
83+
-- The consumer task was already started in the previous test assertion (line 69)
84+
-- We just need to complete it
8985
perform pgflow.complete_task(
90-
v_task.run_id,
91-
v_task.step_slug,
92-
0,
86+
v_run_id,
87+
'consumer',
88+
0, -- consumer is a single task, so task_index is 0
9389
jsonb_build_object('processed', 100)
9490
);
9591

0 commit comments

Comments
 (0)