-
Notifications
You must be signed in to change notification settings - Fork 0
/
progress_map.go
127 lines (116 loc) · 2.4 KB
/
progress_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package promise
import "context"
// ProgressMap is map of Progress es for bulk parallel routines
type ProgressMap[T any] interface {
Cancel()
Await() *map[interface{}]Output[T]
Race() (key interface{}, pay T, err error)
await() *map[interface{}]Output[T]
race() (key interface{}, pay T, err error)
}
type progressMap[T any] struct {
data map[interface{}]Progress[T]
fulfilmentChannel chan Output[T]
cancelableContext context.Context
cancel context.CancelFunc
}
func (pm *progressMap[T]) Cancel() {
defer pm.cancel()
}
func (pm *progressMap[T]) Await() *map[interface{}]Output[T] {
return pm.await()
}
func (pm *progressMap[T]) Race() (key interface{}, pay T, err error) {
return pm.race()
}
func (pm *progressMap[T]) await() *map[interface{}]Output[T] {
defer pm.cancel()
var z T
om := map[interface{}]Output[T]{}
var d bool
var o bool
var nio Output[T]
som := len(pm.data)
for {
for k, p := range pm.data {
c := p.(cancelableContextProvider).getContext()
select {
case <-c.Done():
_, ok := om[k]
if !ok {
e := c.Err()
switch e {
case context.Canceled:
p.abandon()
case context.DeadlineExceeded:
p.leave()
}
om[k] = newOutput(z, e)
}
if len(om) >= som {
return &om
}
case nio, o = <-pm.fulfilmentChannel:
if o {
if !d {
d = true
defer close(pm.fulfilmentChannel)
}
}
om[nio.(keyProvider).Key()] = newOutput(
nio.Payload(),
nio.Error(),
)
if len(om) >= som {
return &om
}
}
}
}
}
func (pm *progressMap[T]) race() (key interface{}, pay T, err error) {
defer pm.cancel()
om := map[interface{}]bool{}
var d bool
som := len(pm.data)
for {
for k, p := range pm.data {
c := p.(cancelableContextProvider).getContext()
select {
case <-c.Done():
key = nil
err = c.Err()
switch err {
case context.Canceled:
p.abandon()
case context.DeadlineExceeded:
p.leave()
}
om[k] = true
if len(om) >= som {
return
}
case nio, o := <-pm.fulfilmentChannel:
if o {
if !d {
d = true
defer close(pm.fulfilmentChannel)
}
}
key = nio.(keyProvider).Key()
pay = nio.Payload()
err = nio.Error()
om[key] = true
for kk, pp := range pm.data {
if kk != key {
pp.eliminate()
om[kk] = true
}
}
if len(om) >= som {
return
}
}
}
}
}