-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathcomposite_unit.go
129 lines (112 loc) · 3.06 KB
/
composite_unit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
Copyright © 2024 Acronis International GmbH.
Released under MIT license.
*/
package service
import (
"strings"
"sync"
"sync/atomic"
)
// CompositeUnit represents a composition of service units and implements Composite design pattern.
type CompositeUnit struct {
Units []Unit
}
// NewCompositeUnit creates a new composite unit.
func NewCompositeUnit(units ...Unit) *CompositeUnit {
return &CompositeUnit{units}
}
// Start starts all units in the composition (each in its own separate goroutine) and blocks.
// If fatal error occurs in any unit, it tries to stop (not gracefully) other ones
// and sends CompositeUnitError (may contain errors caused by stopping too) to passed channel.
func (cu *CompositeUnit) Start(fatalError chan<- error) {
fatalErrs := make([]chan error, len(cu.Units))
for i := 0; i < len(fatalErrs); i++ {
fatalErrs[i] = make(chan error, 1)
}
done := make(chan bool, len(cu.Units))
runningUnits := int32(len(cu.Units))
for i := 0; i < len(cu.Units); i++ {
go func(i int) {
cu.Units[i].Start(fatalErrs[i])
atomic.AddInt32(&runningUnits, -1)
if len(fatalErrs[i]) != 0 {
done <- false
return
}
if atomic.LoadInt32(&runningUnits) == 0 {
done <- true
}
}(i)
}
if <-done {
return
}
stopErr := cu.Stop(false)
var errs []error
for _, fatalErr := range fatalErrs {
select {
case err := <-fatalErr:
errs = append(errs, err)
default:
}
}
if stopErr != nil {
errs = append(errs, stopErr.(*CompositeUnitError).UnitErrors...)
}
if len(errs) > 0 {
fatalError <- &CompositeUnitError{errs}
}
}
// Stop stops all units in the composition (each in its own separate goroutine).
// Errors that occurred while stopping the units are collected and single CompositeUnitError is returned.
func (cu *CompositeUnit) Stop(gracefully bool) error {
results := make(chan error, len(cu.Units))
var wg sync.WaitGroup
wg.Add(len(cu.Units))
for _, s := range cu.Units {
go func(s Unit) {
defer wg.Done()
results <- s.Stop(gracefully)
}(s)
}
wg.Wait()
var errs []error
for i := 0; i < len(cu.Units); i++ {
if err := <-results; err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return &CompositeUnitError{errs}
}
return nil
}
// MustRegisterMetrics registers metrics in Prometheus client and panics if any error occurs.
func (cu *CompositeUnit) MustRegisterMetrics() {
for _, s := range cu.Units {
if mr, ok := s.(MetricsRegisterer); ok {
mr.MustRegisterMetrics()
}
}
}
// UnregisterMetrics unregisters metrics in Prometheus client.
func (cu *CompositeUnit) UnregisterMetrics() {
for _, s := range cu.Units {
if mr, ok := s.(MetricsRegisterer); ok {
mr.UnregisterMetrics()
}
}
}
// CompositeUnitError is an error which may occurs in CompositeUnit's methods.
type CompositeUnitError struct {
UnitErrors []error
}
// Error returns a string representation of a units composition error.
func (cue *CompositeUnitError) Error() string {
msgs := make([]string, 0, len(cue.UnitErrors))
for _, err := range cue.UnitErrors {
msgs = append(msgs, err.Error())
}
return strings.Join(msgs, "; ")
}