-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathworker-configuration.ts
More file actions
134 lines (122 loc) · 4.09 KB
/
worker-configuration.ts
File metadata and controls
134 lines (122 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* Worker Configuration — Environment variable hierarchy and per-worker settings
*
* Shows how worker configuration is resolved:
* 1. Decorator options (highest priority)
* 2. Environment variables (per-worker and global)
* 3. SDK defaults (lowest priority)
*
* Environment variables:
* CONDUCTOR_WORKER_POLL_INTERVAL — Global poll interval (ms)
* CONDUCTOR_WORKER_CONCURRENCY — Global concurrency
* CONDUCTOR_WORKER_DOMAIN — Global domain
* CONDUCTOR_WORKER_<TASK>_POLL_INTERVAL — Per-task poll interval
* CONDUCTOR_WORKER_<TASK>_CONCURRENCY — Per-task concurrency
* CONDUCTOR_WORKER_<TASK>_DOMAIN — Per-task domain
*
* Run:
* CONDUCTOR_SERVER_URL=http://localhost:8080 \
* CONDUCTOR_WORKER_POLL_INTERVAL=500 \
* CONDUCTOR_WORKER_CONCURRENCY=3 \
* npx ts-node examples/worker-configuration.ts
*/
import {
OrkesClients,
ConductorWorkflow,
TaskHandler,
worker,
simpleTask,
getRegisteredWorkers,
} from "../src/sdk";
import type { Task } from "../src/open-api";
// ── Worker with defaults (uses env vars or SDK defaults) ────────────
const _defaultWorker = worker({
taskDefName: "config_default_worker",
registerTaskDef: true,
})(
async (task: Task) => {
return {
status: "COMPLETED",
outputData: { worker: "default", input: task.inputData },
};
}
);
// ── Worker with explicit concurrency ────────────────────────────────
const _highConcurrencyWorker = worker({
taskDefName: "config_high_concurrency",
registerTaskDef: true,
concurrency: 10,
pollInterval: 200,
})(
async (task: Task) => {
return {
status: "COMPLETED",
outputData: { worker: "high_concurrency", input: task.inputData },
};
}
);
// ── Worker with domain isolation ────────────────────────────────────
const _domainWorker = worker({
taskDefName: "config_domain_worker",
registerTaskDef: true,
domain: "staging",
concurrency: 2,
pollInterval: 1000,
})(
async (task: Task) => {
return {
status: "COMPLETED",
outputData: { worker: "domain", domain: "staging", input: task.inputData },
};
}
);
// ── Worker with custom poll timeout ─────────────────────────────────
const _longPollWorker = worker({
taskDefName: "config_long_poll",
registerTaskDef: true,
pollTimeout: 5000,
concurrency: 1,
})(
async (task: Task) => {
return {
status: "COMPLETED",
outputData: { worker: "long_poll", input: task.inputData },
};
}
);
async function main() {
const clients = await OrkesClients.from();
const workflowClient = clients.getWorkflowClient();
const client = clients.getClient();
// Print registered worker configurations
console.log("Registered workers:");
for (const w of getRegisteredWorkers()) {
console.log(` ${w.taskDefName}:`);
console.log(` concurrency: ${w.concurrency ?? "default"}`);
console.log(` pollInterval: ${w.pollInterval ?? "default"}ms`);
console.log(` domain: ${w.domain ?? "none"}`);
console.log(` pollTimeout: ${w.pollTimeout ?? "default"}ms`);
}
// Build workflow using the default worker
const wf = new ConductorWorkflow(workflowClient, "config_demo_workflow")
.description("Demonstrates worker configuration options")
.add(
simpleTask("step_ref", "config_default_worker", {
message: "${workflow.input.message}",
})
)
.outputParameters({ result: "${step_ref.output.worker}" });
await wf.register(true);
// Start workers and execute
const handler = new TaskHandler({ client, scanForDecorated: true });
await handler.startWorkers();
const run = await wf.execute({ message: "testing config" });
console.log("\nWorkflow status:", run.status);
console.log("Output:", JSON.stringify(run.output, null, 2));
await handler.stopWorkers();
process.exit(0);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});