forked from microsoft/durabletask-go
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparallel.go
137 lines (114 loc) · 4.02 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"context"
"encoding/json"
"log"
"math/rand"
"time"
"github.com/google/uuid"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/backend/sqlite"
"github.com/dapr/durabletask-go/task"
)
func main() {
// Create a new task registry and add the orchestrator and activities
r := task.NewTaskRegistry()
r.AddOrchestrator(UpdateDevicesOrchestrator)
r.AddActivity(GetDevicesToUpdate)
r.AddActivity(UpdateDevice)
// Init the client
ctx := context.Background()
client, worker, err := Init(ctx, r)
if err != nil {
log.Fatalf("Failed to initialize the client: %v", err)
}
defer worker.Shutdown(ctx)
// Start a new orchestration
id, err := client.ScheduleNewOrchestration(ctx, UpdateDevicesOrchestrator)
if err != nil {
log.Fatalf("Failed to schedule new orchestration: %v", err)
}
// Wait for the orchestration to complete
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
if err != nil {
log.Fatalf("Failed to wait for orchestration to complete: %v", err)
}
// Print the results
metadataEnc, err := json.MarshalIndent(metadata, "", " ")
if err != nil {
log.Fatalf("Failed to encode result to JSON: %v", err)
}
log.Printf("Orchestration completed: %v", string(metadataEnc))
}
// Init creates and initializes an in-memory client and worker pair with default configuration.
func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, backend.TaskHubWorker, error) {
logger := backend.DefaultLogger()
// Create an executor
executor := task.NewTaskExecutor(r)
// Create a new backend
// Use the in-memory sqlite provider by specifying ""
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
// Start the worker
err := taskHubWorker.Start(ctx)
if err != nil {
return nil, nil, err
}
// Get the client to the backend
taskHubClient := backend.NewTaskHubClient(be)
return taskHubClient, taskHubWorker, nil
}
// UpdateDevicesOrchestrator is an orchestrator that runs activities in parallel
func UpdateDevicesOrchestrator(ctx *task.OrchestrationContext) (any, error) {
// Get a dynamic list of devices to perform updates on
var devices []string
if err := ctx.CallActivity(GetDevicesToUpdate).Await(&devices); err != nil {
return nil, err
}
// Start a dynamic number of tasks in parallel, not waiting for any to complete (yet)
tasks := make([]task.Task, len(devices))
for i, id := range devices {
tasks[i] = ctx.CallActivity(UpdateDevice, task.WithActivityInput(id))
}
// Now that all are started, wait for them to complete and then return the success rate
successCount := 0
for _, task := range tasks {
var succeeded bool
if err := task.Await(&succeeded); err == nil && succeeded {
successCount++
}
}
return float32(successCount) / float32(len(devices)), nil
}
// GetDevicesToUpdate is an activity that returns a list of random device IDs to an orchestration.
func GetDevicesToUpdate(task.ActivityContext) (any, error) {
// Return a fake list of device IDs
const deviceCount = 10
deviceIDs := make([]string, deviceCount)
for i := 0; i < deviceCount; i++ {
deviceIDs[i] = uuid.NewString()
}
return deviceIDs, nil
}
// UpdateDevice is an activity that takes a device ID (string) and pretends to perform an update
// on the corresponding device, with a random 67% success rate.
func UpdateDevice(ctx task.ActivityContext) (any, error) {
var deviceID string
if err := ctx.GetInput(&deviceID); err != nil {
return nil, err
}
log.Printf("updating device: %s", deviceID)
// Delay and success results are randomly generated
delay := time.Duration(rand.Int31n(500)) * time.Millisecond
select {
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
case <-time.After(delay):
// All good, continue
}
// Simulate random failures
success := rand.Intn(3) > 0
return success, nil
}