-
Notifications
You must be signed in to change notification settings - Fork 20
/
dag.go
83 lines (62 loc) · 1.21 KB
/
dag.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package dag
// Dag represents directed acyclic graph
type Dag struct {
jobs []*Job
}
// New creates new DAG
func New() *Dag {
return &Dag{
jobs: make([]*Job, 0),
}
}
func (dag *Dag) lastJob() *Job {
jobsCount := len(dag.jobs)
if jobsCount == 0 {
return nil
}
return dag.jobs[jobsCount-1]
}
// Run starts the tasks
// It will block until all functions are done
func (dag *Dag) Run() {
for _, job := range dag.jobs {
run(job)
}
}
// RunAsync executes Run on another goroutine
func (dag *Dag) RunAsync(onComplete func()) {
go func() {
dag.Run()
if onComplete != nil {
onComplete()
}
}()
}
// Pipeline executes tasks sequentially
func (dag *Dag) Pipeline(tasks ...func()) *pipelineResult {
job := &Job{
tasks: make([]func(), len(tasks)),
sequential: true,
}
for i, task := range tasks {
job.tasks[i] = task
}
dag.jobs = append(dag.jobs, job)
return &pipelineResult{
dag,
}
}
// Spawns executes tasks concurrently
func (dag *Dag) Spawns(tasks ...func()) *spawnsResult {
job := &Job{
tasks: make([]func(), len(tasks)),
sequential: false,
}
for i, task := range tasks {
job.tasks[i] = task
}
dag.jobs = append(dag.jobs, job)
return &spawnsResult{
dag,
}
}