-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathdynamic-workflow.ts
More file actions
157 lines (141 loc) · 4.21 KB
/
dynamic-workflow.ts
File metadata and controls
157 lines (141 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/**
* Dynamic Workflow — Build workflows programmatically at runtime
*
* Shows how to construct workflows using the ConductorWorkflow fluent builder
* with dynamic task composition, conditional logic, and looping.
*
* Run:
* CONDUCTOR_SERVER_URL=http://localhost:8080 npx ts-node examples/dynamic-workflow.ts
*/
import {
OrkesClients,
ConductorWorkflow,
TaskHandler,
worker,
simpleTask,
switchTask,
doWhileTask,
} from "../src/sdk";
import type { Task } from "../src/open-api";
// ── Workers ─────────────────────────────────────────────────────────
const _fetchData = worker({ taskDefName: "fetch_data", registerTaskDef: true })(
async (task: Task) => {
const source = (task.inputData?.source as string) ?? "default";
return {
status: "COMPLETED",
outputData: {
records: [
{ id: 1, value: "alpha" },
{ id: 2, value: "beta" },
],
source,
},
};
}
);
const _processRecord = worker({ taskDefName: "process_record", registerTaskDef: true })(
async (task: Task) => {
const record = task.inputData?.record as Record<string, unknown>;
return {
status: "COMPLETED",
outputData: {
processed: true,
id: record?.id,
result: `processed-${record?.value}`,
},
};
}
);
const _sendNotification = worker({ taskDefName: "send_notification", registerTaskDef: true })(
async (task: Task) => {
const channel = (task.inputData?.channel as string) ?? "email";
const message = (task.inputData?.message as string) ?? "";
console.log(` [Notification] channel=${channel} message="${message}"`);
return {
status: "COMPLETED",
outputData: { sent: true, channel },
};
}
);
// ── Build the workflow dynamically ──────────────────────────────────
async function main() {
const clients = await OrkesClients.from();
const workflowClient = clients.getWorkflowClient();
const client = clients.getClient();
const wf = new ConductorWorkflow(
workflowClient,
"dynamic_workflow_example"
).description("Programmatically built workflow with conditional + loop");
// Step 1: Fetch data
wf.add(
simpleTask("fetch_ref", "fetch_data", {
source: "${workflow.input.dataSource}",
})
);
// Step 2: Loop through records
wf.add(
doWhileTask(
"process_loop",
'if ($.process_record_ref["processed"] == true) { false; } else { true; }',
[
simpleTask("process_record_ref", "process_record", {
record: "${fetch_ref.output.records[0]}",
}),
]
)
);
// Step 3: Choose notification channel based on input
wf.add(
switchTask(
"notification_switch",
"${workflow.input.notifyChannel}",
{
email: [
simpleTask("notify_email_ref", "send_notification", {
channel: "email",
message: "Processing complete via email",
}),
],
slack: [
simpleTask("notify_slack_ref", "send_notification", {
channel: "slack",
message: "Processing complete via Slack",
}),
],
},
[
simpleTask("notify_default_ref", "send_notification", {
channel: "log",
message: "Processing complete (default channel)",
}),
]
)
);
// Step 4: Simple summary task
wf.add(
simpleTask("summary_ref", "fetch_data", {
source: "summary",
})
);
wf.outputParameters({
fetchResult: "${fetch_ref.output}",
notifyChannel: "${workflow.input.notifyChannel}",
});
// Register and execute
await wf.register(true);
console.log("Registered dynamic workflow:", wf.getName());
const handler = new TaskHandler({ client, scanForDecorated: true });
await handler.startWorkers();
const run = await wf.execute({
dataSource: "api",
notifyChannel: "email",
});
console.log("Status:", run.status);
console.log("Output:", JSON.stringify(run.output, null, 2));
await handler.stopWorkers();
process.exit(0);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});