-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathhuman_interaction.ts
More file actions
135 lines (115 loc) · 4.27 KB
/
human_interaction.ts
File metadata and controls
135 lines (115 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import {
TaskHubGrpcClient,
TaskHubGrpcWorker,
ActivityContext,
OrchestrationContext,
Task,
TOrchestrator,
whenAny,
// Logger types: ConsoleLogger (default), NoOpLogger (silent)
ConsoleLogger,
} from "@microsoft/durabletask-js";
import * as readlineSync from "readline-sync";
// Wrap the entire code in an immediately-invoked async function
(async () => {
class Order {
cost: number;
product: string;
quantity: number;
constructor(cost: number, product: string, quantity: number) {
this.cost = cost;
this.product = product;
this.quantity = quantity;
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Update the gRPC client and worker to use a local address and port
const grpcServerAddress = "localhost:4001";
// Optional: Use a custom logger (ConsoleLogger is the default)
const logger = new ConsoleLogger();
const taskHubClient: TaskHubGrpcClient = new TaskHubGrpcClient(
grpcServerAddress,
undefined,
undefined,
undefined,
undefined,
logger,
);
const taskHubWorker: TaskHubGrpcWorker = new TaskHubGrpcWorker(
grpcServerAddress,
undefined,
undefined,
undefined,
undefined,
logger,
);
//Activity function that sends an approval request to the manager
const sendApprovalRequest = async (_: ActivityContext, order: Order) => {
// Simulate some work that takes an amount of time
await sleep(3000);
logger.info(`Sending approval request for order: ${order.product}`);
};
// Activity function that places an order
const placeOrder = async (_: ActivityContext, order: Order) => {
logger.info(`Placing order: ${order.product}`);
};
// Orchestrator function that represents a purchase order workflow
const purchaseOrderWorkflow: TOrchestrator = async function* (ctx: OrchestrationContext, order: Order): any {
// Orders under $1000 are auto-approved
if (order.cost < 1000) {
return "Auto-approved";
}
// Orders of $1000 or more require manager approval
yield ctx.callActivity(sendApprovalRequest, order);
// Approvals must be received within 24 hours or they will be cancled.
const tasks: Task<any>[] = [];
const approvalEvent = ctx.waitForExternalEvent("approval_received");
const timeoutEvent = ctx.createTimer(24 * 60 * 60);
tasks.push(approvalEvent);
tasks.push(timeoutEvent);
const winner = whenAny(tasks);
if (winner == timeoutEvent) {
return "Cancelled";
}
yield ctx.callActivity(placeOrder, order);
const approvalDetails = approvalEvent.getResult();
return `Approved by ${approvalDetails.approver}`;
};
taskHubWorker.addOrchestrator(purchaseOrderWorkflow);
taskHubWorker.addActivity(sendApprovalRequest);
taskHubWorker.addActivity(placeOrder);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await taskHubWorker.start();
logger.info("Worker started successfully");
} catch (error) {
logger.error("Error starting worker:", error);
}
// Schedule a new orchestration
try {
const cost = readlineSync.question("Cost of your order:");
const approver = readlineSync.question("Approver of your order:");
const timeout = readlineSync.question("Timeout for your order in seconds:");
const order = new Order(cost, "MyProduct", 1);
const id = await taskHubClient.scheduleNewOrchestration(purchaseOrderWorkflow, order);
logger.info(`Orchestration scheduled with ID: ${id}`);
if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
const approvalEvent = { approver: approver };
await taskHubClient.raiseOrchestrationEvent(id, "approval_received", approvalEvent);
} else {
return "Order rejected";
}
// Wait for orchestration completion
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, timeout + 2);
logger.info(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
logger.error("Error scheduling or waiting for orchestration:", error);
}
// stop worker and client
await taskHubWorker.stop();
await taskHubClient.stop();
})();