forked from blind-oracle/cortex-tenant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher.go
89 lines (79 loc) · 2.05 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"context"
"time"
"github.com/prometheus/prometheus/prompb"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type dispatcher struct {
clientset *kubernetes.Clientset
nstenant map[string]string // namespace_name: tenant_name
nstschan map[string]chan prompb.TimeSeries
labelName string
interval int
proc *processor
}
func newdispatcher(labelName string, interval int, proc *processor) (*dispatcher, error) {
k := &dispatcher{
nstenant: make(map[string]string),
nstschan: make(map[string]chan prompb.TimeSeries),
labelName: labelName,
proc: proc,
interval: interval,
}
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
// creates the clientset
k.clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return k, nil
}
func (d *dispatcher) updateMap() (err error) {
nsList, err := d.clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return (err)
}
for _, ns := range nsList.Items {
if ns.ObjectMeta.Labels[d.labelName] != "" {
d.nstenant[ns.ObjectMeta.Name] = ns.ObjectMeta.Labels[d.labelName]
} else {
delete(d.nstenant, ns.ObjectMeta.Name)
}
}
return nil
}
func (d *dispatcher) updateBatchers() {
set := make(map[string]struct{})
set[d.proc.cfg.Tenant.Default] = struct{}{} // create batcher for default tenant
for _, v := range d.nstenant {
set[v] = struct{}{}
}
for tenant := range set {
_, ok := d.nstschan[tenant]
if !ok {
wk := createWorker(tenant, d.proc)
tschan := make(chan prompb.TimeSeries)
log.Debugf("Create batcher for tenant: %s", tenant)
go wk.run(tschan)
d.nstschan[tenant] = tschan
}
}
}
func (d *dispatcher) run() {
ticker := time.NewTicker(time.Duration(d.interval) * time.Second)
for ; true; <-ticker.C {
log.Debug("Call k8s for update ns labels")
err := d.updateMap()
if err != nil {
log.Errorf("Unable to call Api-Server: %s", err)
}
d.updateBatchers()
}
}