-
Notifications
You must be signed in to change notification settings - Fork 2
/
context.ts
200 lines (183 loc) · 5.28 KB
/
context.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import {
WorkflowExecution,
WorkflowExecutionBase,
} from "./backends/backend.ts";
import { ClientOptions, signal, start } from "./client/init.ts";
import { PromiseOrValue } from "./promise.ts";
import { makeRandomWithSeed } from "./randomSeed.ts";
import {
AwaitableCommands,
InvokeHttpEndpointCommand,
LocalActivityCommand,
ScheduleActivityCommand,
SleepCommand,
WaitAllCommand,
WaitAnyCommand,
WaitForSignalCommand,
} from "./runtime/core/commands.ts";
import { Arg } from "./types.ts";
export type ActivityResult<T> = PromiseOrValue<T>;
/**
* Returns if the given activity result is a generator or not.
* @param value the activity result
* @returns a typeguard for activity result.
*/
export const isValue = <T>(value: ActivityResult<T>): value is T => {
return (
(value as Generator).next === undefined &&
(value as Promise<T>).then === undefined
);
};
/**
* Activity is the signature of any activity.
*/
export type Activity<TArgs extends Arg, TResult> = (
...args: [...TArgs]
) => ActivityResult<TResult>;
/**
* Activity executor receives an activity and executes it.
*/
export type ActivityExecutor<TArgs extends Arg, TResult> = (
activity: Activity<TArgs, TResult>,
...args: [...TArgs]
) => ActivityResult<TResult>;
// deno-lint-ignore no-empty-interface
export interface Metadata {
}
/**
* WorkflowContext is used for providing api access to the workflow engine.
*/
export class WorkflowContext<TMetadata extends Metadata = Metadata> {
private rand: () => number;
constructor(
public execution: WorkflowExecution<Arg, unknown, TMetadata>,
) {
this.rand = makeRandomWithSeed(execution.id);
}
/**
* Start a new workflow execution
*/
public startExecution(
exec: WorkflowExecutionBase,
opts?: ClientOptions,
): LocalActivityCommand {
return this.callLocalActivity(() => start(exec, true, opts));
}
/**
* Send a signal for the given workflow execution
*/
public sendSignal(
executionId: string,
_signal: string,
payload: unknown,
opts?: ClientOptions,
): LocalActivityCommand {
return this.callLocalActivity(() =>
signal(executionId, _signal, payload, opts)
);
}
/**
* waitForSignal wait for the given signal to be occurred.
* @param signal the signal name
*/
public waitForSignal(signal: string): WaitForSignalCommand {
return { name: "wait_signal", signal };
}
/**
* Executes the activity for the given context and args.
* @param activity the activity that should be executed
* @param input the activity args (optionally)
*/
public callActivity<TArgs extends Arg = Arg, TResult = unknown>(
activity: Activity<TArgs, TResult>,
...input: [...TArgs]
): ScheduleActivityCommand<TArgs, TResult> {
return { name: "schedule_activity", activity, input };
}
/**
* Executes the activity for the given context and args.
* @param activity the activity that should be executed
*/
public callLocalActivity<TResult = unknown>(
activity: () => PromiseOrValue<TResult>,
): LocalActivityCommand<TResult> {
return { name: "local_activity", fn: activity };
}
/**
* Executes the http request for the given context and args.
* @param url the fetch url
*/
public fetch(
url: string,
options?: {
body?: string;
headers?: Record<string, string>;
method?: string;
},
format?: InvokeHttpEndpointCommand["responseFormat"],
): InvokeHttpEndpointCommand {
return {
name: "invoke_http_endpoint",
url,
...options,
responseFormat: format,
};
}
/**
* stop the current workflow execution and sleep the given miliseconds time.
* @param sleepMs the time in miliseconds
*/
public sleep(sleepMs: number): SleepCommand {
// get the current date & time (as milliseconds since Epoch)
const currentTimeAsMs = Date.now();
const adjustedTimeAsMs = currentTimeAsMs + sleepMs;
return this.sleepUntil(new Date(adjustedTimeAsMs));
}
/**
* UNDER TEST, wait all has a bug where the items where delivered out of the order.
* Wait until all commands has completed and return an array of results.
*/
public _experimentalWaitAll(commands: AwaitableCommands[]): WaitAllCommand {
return {
name: "wait_all",
commands,
};
}
/**
* Wait until any of commands has completed and return its result.
*/
public waitAny(commands: AwaitableCommands[]): WaitAnyCommand {
return {
name: "wait_any",
commands,
};
}
/**
* stops the current workflow execution and sleep until the given date.
* @param until the date that should sleep.
*/
public sleepUntil(until: Date): SleepCommand {
return { name: "sleep", until };
}
/**
* Returns a random consistent with the given workflow execution
* @returns a random float value.
*/
public random(): number {
return this.rand();
}
/**
* Logs at least once with additional workflow information
*/
public log(message: any, ...optionalParams: any[]): LocalActivityCommand {
const executionId = this.execution.id;
const fn = function () {
console.log(
`[${new Date().toISOString()}][${executionId}]: ${message}`,
...optionalParams,
);
};
Object.defineProperty(fn, "name", { value: "log" });
return this.callLocalActivity(fn);
}
}