Skip to content

Commit 90350c4

Browse files
committed
docs: improve documentation and READMEs to explain map/array steps
1 parent eed4e5e commit 90350c4

File tree

8 files changed

+1135
-91
lines changed

8 files changed

+1135
-91
lines changed

pkgs/core/README.md

Lines changed: 148 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,106 @@ The SQL Core handles the workflow lifecycle through these key operations:
9494

9595
<a href="./assets/flow-lifecycle.svg"><img src="./assets/flow-lifecycle.svg" alt="Flow Lifecycle" width="25%" height="25%"></a>
9696

97+
## Step Types
98+
99+
pgflow supports two fundamental step types that control how tasks are created and executed:
100+
101+
### Single Steps (Default)
102+
103+
Single steps are the standard step type where each step creates exactly one task when started. These steps process their input as a whole and return a single output value.
104+
105+
```sql
106+
-- Regular single step definition
107+
SELECT pgflow.add_step('my_flow', 'process_data');
108+
```
109+
110+
### Map Steps
111+
112+
Map steps enable parallel processing of arrays by automatically creating multiple tasks - one for each array element. The system handles task distribution, parallel execution, and output aggregation transparently.
113+
114+
```sql
115+
-- Map step definition (step_type => 'map')
116+
SELECT pgflow.add_step(
117+
flow_slug => 'my_flow',
118+
step_slug => 'process_items',
119+
deps_slugs => ARRAY['fetch_items'],
120+
step_type => 'map'
121+
);
122+
```
123+
124+
#### Key Characteristics
125+
126+
- **Multiple Task Creation**: The SQL core creates N tasks for a map step (one per array element), unlike single steps which create one task
127+
- **Element Distribution**: The SQL core distributes individual array elements to tasks based on `task_index`
128+
- **Output Aggregation**: The SQL core aggregates task outputs back into an array for dependent steps
129+
- **Constraint**: Map steps can have at most one dependency (which must return an array), or zero dependencies (then flow input must be an array)
130+
131+
#### Map Step Execution Flow
132+
133+
1. **Array Input Validation**: The SQL core validates that the input is an array
134+
2. **Task Creation**: The SQL core creates N tasks with indices 0 to N-1
135+
3. **Element Distribution**: The SQL core assigns `array[task_index]` as input to each task
136+
4. **Parallel Execution**: Edge workers execute tasks independently in parallel
137+
5. **Output Collection**: The SQL core aggregates outputs preserving array order
138+
6. **Dependent Activation**: The SQL core passes the aggregated array to dependent steps
139+
140+
#### Root Map vs Dependent Map
141+
142+
**Root Map Steps** process the flow's input array directly:
143+
```sql
144+
-- Root map: no dependencies, processes flow input
145+
SELECT pgflow.add_step(
146+
flow_slug => 'batch_processor',
147+
step_slug => 'process_each',
148+
step_type => 'map'
149+
);
150+
151+
-- Starting the flow with array input
152+
SELECT pgflow.start_flow(
153+
flow_slug => 'batch_processor',
154+
input => '[1, 2, 3, 4, 5]'::jsonb
155+
);
156+
```
157+
158+
**Dependent Map Steps** process another step's array output:
159+
```sql
160+
-- Dependent map: processes the array from 'fetch_items'
161+
SELECT pgflow.add_step(
162+
flow_slug => 'data_pipeline',
163+
step_slug => 'transform_each',
164+
deps_slugs => ARRAY['fetch_items'],
165+
step_type => 'map'
166+
);
167+
```
168+
169+
#### Edge Cases and Special Behaviors
170+
171+
1. **Empty Array Cascade**: When a map step receives an empty array (`[]`):
172+
- The SQL core completes it immediately without creating tasks
173+
- The completed map step outputs an empty array
174+
- Any dependent map steps also receive empty arrays and complete immediately
175+
- This cascades through the entire chain of map steps in a single transaction
176+
- Example: `[] → map1 → [] → map2 → [] → map3 → []` all complete together
177+
178+
2. **NULL Values**: NULL array elements are preserved and distributed to their respective tasks
179+
180+
3. **Non-Array Input**: The SQL core fails the step when input is not an array
181+
182+
4. **Type Violations**: When a single step outputs non-array data to a map step, the SQL core fails the entire run (stores the invalid output for debugging, archives all queued messages, prevents orphaned tasks)
183+
184+
#### Implementation Details
185+
186+
Map steps utilize several database fields for state management:
187+
- `initial_tasks`: Number of tasks to create (NULL until array size is known)
188+
- `remaining_tasks`: Tracks incomplete tasks for the step
189+
- `task_index`: Identifies which array element each task processes
190+
- `step_type`: Column value 'map' triggers map behavior
191+
192+
The aggregation process ensures:
193+
- **Order Preservation**: Task outputs maintain array element ordering
194+
- **NULL Handling**: NULL outputs are included in the aggregated array
195+
- **Atomicity**: Aggregation occurs within the same transaction as task completion
196+
97197
## Example flow and its life
98198

99199
Let's walk through creating and running a workflow that fetches a website,
@@ -274,81 +374,17 @@ delay = base_delay * (2 ^ attempts_count)
274374

275375
Timeouts are enforced by setting the message visibility timeout to the step's timeout value plus a small buffer. If a worker doesn't acknowledge completion or failure within this period, the task becomes visible again and can be retried.
276376

277-
## TypeScript Flow DSL
377+
## Workflow Definition with TypeScript DSL
278378

279-
> [!NOTE]
280-
> TypeScript Flow DSL is a Work In Progress and is not ready yet!
281-
282-
### Overview
283-
284-
While the SQL Core engine handles workflow definitions and state management, the primary way to define and work with your workflow logic is via the Flow DSL in TypeScript. This DSL offers a fluent API that makes it straightforward to outline the steps in your flow with full type safety.
285-
286-
### Type Inference System
287-
288-
The most powerful feature of the Flow DSL is its **automatic type inference system**:
289-
290-
1. You only need to annotate the initial Flow input type
291-
2. The return type of each step is automatically inferred from your handler function
292-
3. These return types become available in the payload of dependent steps
293-
4. The TypeScript compiler builds a complete type graph matching your workflow DAG
294-
295-
This means you get full IDE autocompletion and type checking throughout your workflow without manual type annotations.
296-
297-
### Basic Example
298-
299-
Here's an example that matches our website analysis workflow:
300-
301-
```ts
302-
// Provide a type for the input of the Flow
303-
type Input = {
304-
url: string;
305-
};
306-
307-
const AnalyzeWebsite = new Flow<Input>({
308-
slug: 'analyze_website',
309-
maxAttempts: 3,
310-
baseDelay: 5,
311-
timeout: 10,
312-
})
313-
.step(
314-
{ slug: 'website' },
315-
async (input) => await scrapeWebsite(input.run.url)
316-
)
317-
.step(
318-
{ slug: 'sentiment', dependsOn: ['website'], timeout: 30, maxAttempts: 5 },
319-
async (input) => await analyzeSentiment(input.website.content)
320-
)
321-
.step(
322-
{ slug: 'summary', dependsOn: ['website'] },
323-
async (input) => await summarizeWithAI(input.website.content)
324-
)
325-
.step(
326-
{ slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
327-
async (input) =>
328-
await saveToDb({
329-
websiteUrl: input.run.url,
330-
sentiment: input.sentiment.score,
331-
summary: input.summary,
332-
}).status
333-
);
334-
```
335-
336-
### How Payload Types Are Built
337-
338-
The payload object for each step is constructed dynamically based on:
379+
The SQL Core is the DAG orchestration engine that handles dependency resolution, step state management, and task spawning. However, workflows are defined using the TypeScript Flow DSL, which compiles user intent into the SQL primitives that populate the definition tables (`flows`, `steps`, `deps`).
339380

340-
1. **The `run` property**: Always contains the original workflow input
341-
2. **Dependency outputs**: Each dependency's output is available under a key matching the dependency's ID
342-
3. **DAG structure**: Only outputs from direct dependencies are included in the payload
381+
See the [@pgflow/dsl package](../dsl/README.md) for complete documentation on:
382+
- Expressing workflows with type-safe method chaining
383+
- Step types (`.step()`, `.array()`, `.map()`)
384+
- Compilation to SQL migrations
385+
- Type inference and handler context
343386

344-
This means your step handlers receive exactly the data they need, properly typed, without any manual type declarations beyond the initial Flow input type.
345-
346-
### Benefits of Automatic Type Inference
347-
348-
- **Refactoring safety**: Change a step's output, and TypeScript will flag all dependent steps that need updates
349-
- **Discoverability**: IDE autocompletion shows exactly what data is available in each step
350-
- **Error prevention**: Catch typos and type mismatches at compile time, not runtime
351-
- **Documentation**: The types themselves serve as living documentation of your workflow's data flow
387+
The SQL Core executes these compiled definitions, managing when steps are ready, how many tasks to create (1 for single steps, N for map steps), and how to aggregate results.
352388

353389
## Data Flow
354390

@@ -385,6 +421,46 @@ The `saveToDb` step depends on both `sentiment` and `summary`:
385421
}
386422
```
387423

424+
### Map Step Handler Inputs
425+
426+
Map step tasks receive a fundamentally different input structure than single step tasks. Instead of receiving an object with `run` and dependency keys, **map tasks receive only their assigned array element**:
427+
428+
#### Example: Processing user IDs
429+
430+
```json
431+
// Flow input (for root map) or dependency output:
432+
["user123", "user456", "user789"]
433+
434+
// What each map task receives:
435+
// Task 0: "user123"
436+
// Task 1: "user456"
437+
// Task 2: "user789"
438+
439+
// NOT this:
440+
// { "run": {...}, "dependency": [...] }
441+
```
442+
443+
This means:
444+
- Map handlers process individual elements in isolation
445+
- Map handlers cannot access the original flow input (`run`)
446+
- Map handlers cannot access other dependencies
447+
- Map handlers focus solely on transforming their assigned element
448+
449+
#### Map Step Outputs Become Arrays
450+
451+
When a step depends on a map step, it receives the aggregated array output:
452+
453+
```json
454+
// If 'process_users' is a map step that processed ["user1", "user2"]
455+
// and output [{"name": "Alice"}, {"name": "Bob"}]
456+
457+
// A step depending on 'process_users' receives:
458+
{
459+
"run": { /* original flow input */ },
460+
"process_users": [{"name": "Alice"}, {"name": "Bob"}] // Full array
461+
}
462+
```
463+
388464
### Run Completion
389465

390466
When all steps in a run are completed, the run status is automatically updated to 'completed' and its output is set. The output is an aggregation of all the outputs from final steps (steps that have no dependents):

pkgs/dsl/README.md

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,35 +93,71 @@ The standard method for adding steps to a flow. Each step processes input and re
9393

9494
#### `.array()` - Array-Returning Steps
9595

96-
A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps.
96+
A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps that will be processed by map steps.
9797

9898
```typescript
99+
// Fetch an array of items to be processed
99100
.array(
100101
{ slug: 'fetch_items' },
101102
async () => [1, 2, 3, 4, 5]
102103
)
104+
105+
// With dependencies - combining data from multiple sources
106+
.array(
107+
{ slug: 'combine_results', dependsOn: ['source1', 'source2'] },
108+
async (input) => [...input.source1, ...input.source2]
109+
)
103110
```
104111

112+
**Key Points:**
113+
- Return type is enforced to be an array at compile time
114+
- Commonly used as input for subsequent map steps
115+
- Can depend on other steps just like regular steps
116+
105117
#### `.map()` - Array Processing Steps
106118

107119
Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The handler receives individual items instead of the full input object.
108120

121+
**Two Modes of Operation:**
122+
123+
1. **Root Map** (no `array:` property): Processes the flow's input array directly
124+
- The flow input MUST be an array when using root maps
125+
- Omitting the `array:` property tells pgflow to use the flow input
126+
127+
2. **Dependent Map** (with `array:` property): Processes another step's array output
128+
- The `array:` property specifies which step's output to process
129+
- That step must return an array
130+
109131
```typescript
110-
// Root map - processes flow input array
132+
// ROOT MAP - No array: property means use flow input
133+
// Flow input MUST be an array (e.g., ["hello", "world"])
111134
new Flow<string[]>({ slug: 'process_strings' })
112-
.map({ slug: 'uppercase' }, (item) => item.toUpperCase());
135+
.map(
136+
{ slug: 'uppercase' }, // No array: property!
137+
(item) => item.toUpperCase()
138+
);
139+
// Each string in the input array gets uppercased in parallel
113140

114-
// Dependent map - processes another step's output
141+
// DEPENDENT MAP - array: property specifies the source step
115142
new Flow<{}>({ slug: 'data_pipeline' })
116143
.array({ slug: 'numbers' }, () => [1, 2, 3])
117-
.map({ slug: 'double', array: 'numbers' }, (n) => n * 2)
118-
.map({ slug: 'square', array: 'double' }, (n) => n * n);
144+
.map(
145+
{ slug: 'double', array: 'numbers' }, // Processes 'numbers' output
146+
(n) => n * 2
147+
)
148+
.map(
149+
{ slug: 'square', array: 'double' }, // Chains from 'double'
150+
(n) => n * n
151+
);
152+
// Results: numbers: [1,2,3] → double: [2,4,6] → square: [4,16,36]
119153
```
120154

121155
**Key differences from regular steps:**
122-
- Uses `array:` instead of `dependsOn:` for specifying the single array dependency
156+
- Uses `array:` to specify dependency (not `dependsOn:`)
157+
- When `array:` is omitted, uses flow input array (root map)
123158
- Handler signature is `(item, context) => result` instead of `(input, context) => result`
124159
- Return type is always an array
160+
- Map steps can have at most one dependency (the array source)
125161
- Generates SQL with `step_type => 'map'` parameter for pgflow's map processing
126162

127163
**Type Safety:**
@@ -141,9 +177,39 @@ new Flow<{}>({ slug: 'user_flow' })
141177
});
142178
```
143179

180+
**Common Patterns:**
181+
182+
```typescript
183+
// Batch processing - process multiple items in parallel
184+
new Flow<number[]>({ slug: 'batch_processor' })
185+
.map({ slug: 'validate' }, (item) => {
186+
if (item < 0) throw new Error('Invalid item');
187+
return item;
188+
})
189+
.map({ slug: 'process', array: 'validate' }, async (item) => {
190+
// Each item processed in its own task
191+
return await expensiveOperation(item);
192+
});
193+
194+
// Data transformation pipeline
195+
new Flow<{}>({ slug: 'etl_pipeline' })
196+
.step({ slug: 'fetch_urls' }, () => ['url1', 'url2', 'url3'])
197+
.map({ slug: 'scrape', array: 'fetch_urls' }, async (url) => {
198+
return await fetchContent(url);
199+
})
200+
.map({ slug: 'extract', array: 'scrape' }, (html) => {
201+
return extractData(html);
202+
})
203+
.step({ slug: 'aggregate', dependsOn: ['extract'] }, (input) => {
204+
// input.extract is the aggregated array from all map tasks
205+
return consolidateResults(input.extract);
206+
});
207+
```
208+
144209
**Limitations:**
145210
- Can only depend on a single array-returning step
146211
- TypeScript may not track type transformations between chained maps (use type assertions if needed)
212+
- Root maps require the entire flow input to be an array
147213

148214
### Context Object
149215

0 commit comments

Comments
 (0)