-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkerpool.go
55 lines (44 loc) · 863 Bytes
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main
import (
"fmt"
"log/slog"
"sync"
)
type Worker interface {
worker()
Run()
}
type DefaultWorker struct {
Tasks []Task
Concurrency int
taskch chan Task
wg sync.WaitGroup
Worker
}
func NewDefaultWorker(tasks []Task, concurrency int) *DefaultWorker {
return &DefaultWorker{
Tasks: tasks,
Concurrency: concurrency,
}
}
func (cw *DefaultWorker) worker() {
for task := range cw.taskch {
if err := task.Process(); err != nil {
slog.Error(fmt.Sprintf("error while processing task: %v", err))
}
cw.wg.Done()
}
}
func (cw *DefaultWorker) Run() {
cw.taskch = make(chan Task, len(cw.Tasks))
defer close(cw.taskch)
for i := 0; i < cw.Concurrency; i++ {
go cw.worker()
}
cw.wg = sync.WaitGroup{}
cw.wg.Add(len(cw.Tasks))
for _, task := range cw.Tasks {
cw.taskch <- task
}
cw.wg.Wait()
}