-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmetrics.ts
More file actions
107 lines (94 loc) · 3.13 KB
/
metrics.ts
File metadata and controls
107 lines (94 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* Metrics — Prometheus metrics collection and HTTP exposition
*
* Demonstrates MetricsCollector + MetricsServer:
* - Auto-starts HTTP server on port 9090
* - Collects poll, execution, and error metrics
* - Exposes /metrics (Prometheus text format) and /health endpoints
* - Supports optional prom-client integration
*
* Run:
* CONDUCTOR_SERVER_URL=http://localhost:8080 npx ts-node examples/metrics.ts
*
* Then visit:
* http://localhost:9090/metrics — Prometheus metrics
* http://localhost:9090/health — Health check
*/
import {
OrkesClients,
ConductorWorkflow,
TaskHandler,
worker,
simpleTask,
MetricsCollector,
MetricsServer,
} from "../src/sdk";
import type { Task } from "../src/open-api";
const _metricsTask = worker({ taskDefName: "metrics_task", registerTaskDef: true })(
async (_task: Task) => {
// Simulate variable processing time
const delay = Math.random() * 50;
await new Promise((resolve) => setTimeout(resolve, delay));
return {
status: "COMPLETED",
outputData: { processed: true, durationMs: delay },
};
}
);
async function main() {
const clients = await OrkesClients.from();
const workflowClient = clients.getWorkflowClient();
const client = clients.getClient();
// Create metrics collector (no httpPort — avoids dynamic import issue with ts-node)
const metrics = new MetricsCollector({
prefix: "conductor_worker",
});
// Start the HTTP server manually
const server = new MetricsServer(metrics, 9090);
await server.start();
console.log("Metrics server started on http://localhost:9090");
console.log(" GET /metrics — Prometheus text format");
console.log(" GET /health — Health check");
// Register workflow
const wf = new ConductorWorkflow(workflowClient, "metrics_example")
.description("Workflow for metrics collection demo")
.add(simpleTask("m1_ref", "metrics_task", { step: 1 }))
.add(simpleTask("m2_ref", "metrics_task", { step: 2 }))
.add(simpleTask("m3_ref", "metrics_task", { step: 3 }))
.outputParameters({ done: true });
await wf.register(true);
// Start workers with metrics listener
const handler = new TaskHandler({
client,
scanForDecorated: true,
eventListeners: [metrics],
});
await handler.startWorkers();
// Execute workflow multiple times
console.log("\nExecuting workflow 3 times...");
for (let i = 0; i < 3; i++) {
const run = await wf.execute({ iteration: i });
console.log(` Run ${i + 1}: ${run.status}`);
}
// Print metrics snapshot
console.log("\n--- Metrics Snapshot ---");
const snapshot = metrics.getMetrics();
console.log("Poll totals:");
for (const [task, count] of snapshot.pollTotal) {
console.log(` ${task}: ${count}`);
}
console.log("Execution totals:");
for (const [task, count] of snapshot.taskExecutionTotal) {
console.log(` ${task}: ${count}`);
}
// Print Prometheus text format
console.log("\n--- Prometheus Text Format ---");
console.log(metrics.toPrometheusText());
await handler.stopWorkers();
await server.stop();
process.exit(0);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});