-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworkflows_runs.go
314 lines (258 loc) · 9.09 KB
/
workflows_runs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package coze
import (
"bufio"
"context"
"encoding/json"
"net/http"
"strconv"
"strings"
)
func (r *workflowRuns) Create(ctx context.Context, req *RunWorkflowsReq) (*RunWorkflowsResp, error) {
method := http.MethodPost
uri := "/v1/workflow/run"
resp := &runWorkflowsResp{}
err := r.client.Request(ctx, method, uri, req, resp)
if err != nil {
return nil, err
}
resp.RunWorkflowsResp.setHTTPResponse(resp.HTTPResponse)
return resp.RunWorkflowsResp, nil
}
func (r *workflowRuns) Resume(ctx context.Context, req *ResumeRunWorkflowsReq) (Stream[WorkflowEvent], error) {
method := http.MethodPost
uri := "/v1/workflow/stream_resume"
resp, err := r.client.StreamRequest(ctx, method, uri, req)
if err != nil {
return nil, err
}
return &streamReader[WorkflowEvent]{
ctx: ctx,
response: resp,
reader: bufio.NewReader(resp.Body),
processor: parseWorkflowEvent,
httpResponse: newHTTPResponse(resp),
}, nil
}
func (r *workflowRuns) Stream(ctx context.Context, req *RunWorkflowsReq) (Stream[WorkflowEvent], error) {
method := http.MethodPost
uri := "/v1/workflow/stream_run"
resp, err := r.client.StreamRequest(ctx, method, uri, req)
if err != nil {
return nil, err
}
return &streamReader[WorkflowEvent]{
ctx: ctx,
response: resp,
reader: bufio.NewReader(resp.Body),
processor: parseWorkflowEvent,
httpResponse: newHTTPResponse(resp),
}, nil
}
type workflowRuns struct {
client *core
Histories *workflowRunsHistories
}
func newWorkflowRun(core *core) *workflowRuns {
return &workflowRuns{
client: core,
Histories: newWorkflowRunsHistories(core),
}
}
func parseWorkflowEvent(lineBytes []byte, reader *bufio.Reader) (*WorkflowEvent, bool, error) {
line := string(lineBytes)
if strings.HasPrefix(line, "id:") {
id := strings.TrimSpace(line[3:])
data, err := reader.ReadString('\n')
if err != nil {
return nil, false, err
}
event := strings.TrimSpace(data[6:])
data, err = reader.ReadString('\n')
if err != nil {
return nil, false, err
}
data = strings.TrimSpace(data[5:])
eventLine := map[string]string{
"id": id,
"event": event,
"data": data,
}
eventData, err := doParseWorkflowEvent(eventLine)
if err != nil {
return nil, false, err
}
return eventData, eventData.IsDone(), nil
}
return nil, false, nil
}
// WorkflowRunResult represents the result of a workflow runs
type WorkflowRunResult struct {
DebugUrl string `json:"debug_url"`
// Workflow execution result, usually a JSON serialized string. In some scenarios, a string with a
// non-JSON structure may be returned.
Data string `json:"data"`
// Execution ID of asynchronous execution. Only returned when the workflow is executed
// asynchronously (is_async=true). You can use execute_id to call the Query Workflow Asynchronous
// Execution Result API to obtain the final execution result of the workflow.
ExecuteID string `json:"execute_id"`
}
// WorkflowEvent represents an event in a workflow
type WorkflowEvent struct {
// The event ID of this message in the interface response. It starts from 0.
ID int `json:"id"`
// The current streaming data packet event.
Event WorkflowEventType `json:"event"`
Message *WorkflowEventMessage `json:"message,omitempty"`
Interrupt *WorkflowEventInterrupt `json:"interrupt,omitempty"`
Error *WorkflowEventError `json:"error,omitempty"`
}
func parseWorkflowEventMessage(id int, data string) (*WorkflowEvent, error) {
var message WorkflowEventMessage
if err := json.Unmarshal([]byte(data), &message); err != nil {
return nil, err
}
return &WorkflowEvent{
ID: id,
Event: WorkflowEventTypeMessage,
Message: &message,
}, nil
}
func parseWorkflowEventInterrupt(id int, data string) (*WorkflowEvent, error) {
var interrupt WorkflowEventInterrupt
if err := json.Unmarshal([]byte(data), &interrupt); err != nil {
return nil, err
}
return &WorkflowEvent{
ID: id,
Event: WorkflowEventTypeInterrupt,
Interrupt: &interrupt,
}, nil
}
func parseWorkflowEventError(id int, data string) (*WorkflowEvent, error) {
var errorEvent WorkflowEventError
if err := json.Unmarshal([]byte(data), &errorEvent); err != nil {
return nil, err
}
return &WorkflowEvent{
ID: id,
Event: WorkflowEventTypeError,
Error: &errorEvent,
}, nil
}
func parseWorkflowEventDone(id int) *WorkflowEvent {
return &WorkflowEvent{
ID: id,
Event: WorkflowEventTypeDone,
}
}
func doParseWorkflowEvent(eventLine map[string]string) (*WorkflowEvent, error) {
id, _ := strconv.Atoi(eventLine["id"])
event := WorkflowEventType(eventLine["event"])
data := eventLine["data"]
switch event {
case WorkflowEventTypeMessage:
return parseWorkflowEventMessage(id, data)
case WorkflowEventTypeInterrupt:
return parseWorkflowEventInterrupt(id, data)
case WorkflowEventTypeError:
return parseWorkflowEventError(id, data)
case WorkflowEventTypeDone:
return parseWorkflowEventDone(id), nil
default:
return parseWorkflowEventMessage(id, data)
}
}
func (e *WorkflowEvent) IsDone() bool {
return e.Event == WorkflowEventTypeDone
}
// WorkflowEventError represents an error event in a workflow
type WorkflowEventError struct {
// Status code. 0 represents a successful API call. Other values indicate that the call has
// failed. You can determine the detailed reason for the error through the error_message field.
ErrorCode int `json:"error_code"`
// Status message. You can get detailed error information when the API call fails.
ErrorMessage string `json:"error_message"`
}
// WorkflowEventInterrupt represents an interruption event in a workflow
type WorkflowEventInterrupt struct {
// The content of interruption event.
InterruptData *WorkflowEventInterruptData `json:"interrupt_data"`
// The name of the node that outputs the message, such as "Question".
NodeTitle string `json:"node_title"`
}
// WorkflowEventInterruptData represents the data of an interruption event
type WorkflowEventInterruptData struct {
// The workflow interruption event ID, which should be passed back when resuming the workflow.
EventID string `json:"event_id"`
// The type of workflow interruption, which should be passed back when resuming the workflow.
Type int `json:"type"`
}
// ParseWorkflowEventError parses JSON string to WorkflowEventError
func ParseWorkflowEventError(data string) (*WorkflowEventError, error) {
var err WorkflowEventError
if err := json.Unmarshal([]byte(data), &err); err != nil {
return nil, err
}
return &err, nil
}
// ParseWorkflowEventInterrupt parses JSON string to WorkflowEventInterrupt
func ParseWorkflowEventInterrupt(data string) (*WorkflowEventInterrupt, error) {
var interrupt WorkflowEventInterrupt
if err := json.Unmarshal([]byte(data), &interrupt); err != nil {
return nil, err
}
return &interrupt, nil
}
// WorkflowEventMessage represents a message event in a workflow
type WorkflowEventMessage struct {
// The content of the streamed output message.
Content string `json:"content"`
// The name of the node that outputs the message, such as the message node or end node.
NodeTitle string `json:"node_title"`
// The message ID of this message within the node, starting at 0.
NodeSeqID string `json:"node_seq_id"`
// Whether the current message is the last data packet for this node.
NodeIsFinish bool `json:"node_is_finish"`
// Additional fields.
Ext map[string]any `json:"ext,omitempty"`
}
// WorkflowEventType represents the type of workflow event
type WorkflowEventType string
const (
// The output message from the workflow node, such as the output message from the message node or
// end node. You can view the specific message content in data.
WorkflowEventTypeMessage WorkflowEventType = "Message"
// An error has occurred. You can view the error_code and error_message in data to troubleshoot
// the issue.
WorkflowEventTypeError WorkflowEventType = "Error"
// End. Indicates the end of the workflow execution, where data is empty.
WorkflowEventTypeDone WorkflowEventType = "Done"
// Interruption. Indicates the workflow has been interrupted, where the data field contains
// specific interruption information.
WorkflowEventTypeInterrupt WorkflowEventType = "Interrupt"
)
// RunWorkflowsReq represents request for running workflow
type RunWorkflowsReq struct {
// The ID of the workflow, which should have been published.
WorkflowID string `json:"workflow_id"`
// Input parameters and their values for the starting node of the workflow.
Parameters map[string]any `json:"parameters,omitempty"`
// The associated Bot ID required for some workflow executions.
BotID string `json:"bot_id,omitempty"`
// Used to specify some additional fields.
Ext map[string]string `json:"ext,omitempty"`
// Whether to runs asynchronously.
IsAsync bool `json:"is_async,omitempty"`
AppID string `json:"app_id,omitempty"`
}
// ResumeRunWorkflowsReq represents request for resuming workflow runs
type ResumeRunWorkflowsReq struct {
// The ID of the workflow, which should have been published.
WorkflowID string `json:"workflow_id"`
// Event ID
EventID string `json:"event_id"`
// Resume data
ResumeData string `json:"resume_data"`
// Interrupt type
InterruptType int `json:"interrupt_type"`
}