forked from francescopepe/formigo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
110 lines (89 loc) · 3.07 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package formigo
import (
"context"
"errors"
"sync"
"github.com/Pod-Point/go-queue-worker/internal/client"
"github.com/Pod-Point/go-queue-worker/internal/messages"
)
type worker struct {
client client.Client
concurrency int
retrievers int
errorConfig ErrorConfiguration
consumer consumer
deleterConfig DeleterConfiguration
}
func (w worker) Run(ctx context.Context) error {
// Create a new context with a cancel function used to stop the worker from the
// controller in case too many errors occur.
ctx, cancel := context.WithCancelCause(ctx)
// Create controller
ctrl := newController(w.errorConfig, cancel)
// Run retrievers and get the message channel
messageCh := w.runRetrievers(ctx, ctrl)
// Run consumer and get the deletion channel.
// Note that the context is not given to them because they will only stop once
// all the messages in the pipeline have been consumed.
deleteCh := w.runConsumer(ctrl, messageCh)
// Run deleter.
// Note that the context is not given to the deleter because it will only stop once
// all the consumed messages in the pipeline have been deleted.
var wg sync.WaitGroup // WaitGroup for the deleter
deleter(&wg, w.client, w.deleterConfig, ctrl, deleteCh)
// Wait for deleter to exit
wg.Wait()
// Get the cause of the cancellation
// If the context was cancelled by the controller, there must be a cause containing
// the error, otherwise the context error by default is `context.Canceled`, which
// means that it was a normal stop request (probably SIGTERM).
err := context.Cause(ctx)
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
// runRetrievers will run a number of retrievers (Go routines) based on the worker's
// configuration.
// It returns a channel where the messages will be published and, only when all the
// retrievers have stopped, it will close it to broadcast the signal to stop to the
// consumers.
func (w worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan messages.Message {
messageCh := make(chan messages.Message)
var wg sync.WaitGroup
wg.Add(w.retrievers)
for i := 0; i < w.retrievers; i++ {
go func() {
defer wg.Done()
retriever(ctx, w.client, ctrl, messageCh)
}()
}
go func() {
wg.Wait()
close(messageCh)
}()
return messageCh
}
// runConsumer runs the worker's consumer.
// It returns a channel where the messages will be published for deletion and,
// only when the consumer has stopped, it will close it to broadcast the
// signal to stop to the deleter.
func (w worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) <-chan messages.Message {
deleteCh := make(chan messages.Message)
go func() {
w.consumer.consume(w.concurrency, ctrl, messageCh, deleteCh)
close(deleteCh)
}()
return deleteCh
}
func NewWorker(config Configuration) worker {
config = setWorkerConfigValues(config)
return worker{
client: config.Client,
concurrency: config.Concurrency,
retrievers: config.Retrievers,
errorConfig: config.ErrorConfig,
consumer: config.Consumer,
deleterConfig: config.DeleterConfig,
}
}