Skip to content

Commit d02d969

Browse files
authored
Merge pull request #28 from redBorder/release/1.2.0
Release/1.2.0
2 parents ab2aaa0 + afe0951 commit d02d969

File tree

8 files changed

+184
-66
lines changed

8 files changed

+184
-66
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ coverage:
3737
@printf "$(MKL_BLUE)[COVERAGE]$(MKL_CLR_RESET) Computing coverage\n"
3838
@echo "mode: count" > coverage.out
3939
@go test -covermode=count -coverprofile=counter.part ./counter
40+
@go test -covermode=count -coverprofile=monitor.part ./monitor
41+
@go test -covermode=count -coverprofile=producer.part ./producer
4042
@grep -h -v "mode: count" *.part >> coverage.out
4143
@go tool cover -func coverage.out
4244

cmd/counters_monitor.go

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package main
2020

2121
import (
22+
"os"
23+
"os/signal"
24+
"syscall"
2225
"time"
2326

2427
rdkafka "github.com/confluentinc/confluent-kafka-go/kafka"
@@ -33,6 +36,12 @@ var reset = make(chan struct{})
3336
func CountersMonitor(config *AppConfig) {
3437
log := log.WithField("prefix", "monitor")
3538

39+
reload := make(chan os.Signal, 1)
40+
signal.Notify(reload, syscall.SIGHUP)
41+
42+
usr := make(chan os.Signal, 1)
43+
signal.Notify(usr, syscall.SIGUSR1)
44+
3645
wg.Add(1)
3746
go func() {
3847
for {
@@ -48,29 +57,59 @@ func CountersMonitor(config *AppConfig) {
4857
log.Fatalln("Error loading licenses: " + err.Error())
4958
}
5059

60+
for k, v := range limitBytes.getOrganizationLimits() {
61+
log.
62+
WithField("Total bytes", v).
63+
Infof("Organization %s", k)
64+
}
65+
5166
pipeline := BootstrapMonitorPipeline(config, limitBytes)
5267
StartConsumingMonitor(pipeline, config)
5368

5469
log.
5570
WithField("Time", intervalEnd.String()).
5671
Infof("Next reset set")
5772

58-
<-time.After(remaining)
73+
loop:
74+
for {
75+
select {
76+
case <-usr:
77+
pipeline.Produce(nil, map[string]interface{}{"show_total": true}, nil)
78+
continue loop
5979

60-
for org, bytes := range limitBytes {
61-
if bytes > 0 {
62-
pipeline.Produce(nil, map[string]interface{}{
63-
"reset_notification": true,
64-
"organization_uuid": org,
65-
}, nil)
80+
case <-time.After(remaining):
81+
notify(pipeline, limitBytes, true)
82+
break loop
83+
84+
case <-reload:
85+
notify(pipeline, limitBytes, false)
86+
break loop
6687
}
6788
}
68-
69-
reset <- struct{}{}
7089
}
7190
}()
7291
}
7392

93+
func notify(
94+
pipeline *rbforwarder.RBForwarder, limitBytes LimitBytes, resetCounters bool,
95+
) {
96+
var uuids []string
97+
98+
for uuid, license := range limitBytes {
99+
if !license.Expired {
100+
uuids = append(uuids, uuid)
101+
}
102+
}
103+
104+
pipeline.Produce(nil, map[string]interface{}{
105+
"allowed_licenses": true,
106+
"licenses": uuids,
107+
"reset_counters": resetCounters,
108+
}, nil)
109+
110+
reset <- struct{}{}
111+
}
112+
74113
// BootstrapMonitorPipeline bootstrap a RBForwarder pipeline
75114
func BootstrapMonitorPipeline(config *AppConfig, limits LimitBytes) *rbforwarder.RBForwarder {
76115
p, err := BootstrapRdKafkaProducer(config.Monitor.Kafka.Attributes)
@@ -90,7 +129,7 @@ func BootstrapMonitorPipeline(config *AppConfig, limits LimitBytes) *rbforwarder
90129
components = append(components, &monitor.CountersMonitor{
91130
Config: monitor.Config{
92131
Workers: 1,
93-
Limits: limits,
132+
Limits: limits.getOrganizationLimits(),
94133
Period: config.Monitor.Timer.Period,
95134
Offset: config.Monitor.Timer.Offset,
96135
Log: log,

cmd/kafka.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ func BootstrapRdKafkaConsumer(
2525
attributes map[string]string, topicAttributes map[string]string,
2626
) (*rdkafka.Consumer, error) {
2727

28-
ta := rdkafka.ConfigMap{}
28+
ta := rdkafka.ConfigMap{
29+
"auto.offset.reset": "smallest",
30+
}
2931
for key, value := range topicAttributes {
3032
ta.SetKey(key, value)
3133
}

cmd/license.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,26 @@ import (
3838
// PubKey should be set on compile time
3939
var PubKey string
4040

41+
type licenseStatus struct {
42+
OrganizationUUID string
43+
Amount uint64
44+
Expired bool
45+
}
46+
4147
// LimitBytes is a map containing the max bytes for each organization
42-
type LimitBytes map[string]uint64
48+
type LimitBytes map[string]*licenseStatus
49+
50+
func (l LimitBytes) getOrganizationLimits() map[string]uint64 {
51+
total := make(map[string]uint64)
52+
53+
for _, v := range l {
54+
if !v.Expired {
55+
total[v.OrganizationUUID] += v.Amount
56+
}
57+
}
58+
59+
return total
60+
}
4361

4462
// License contains information about the limits of bytes that an organization
4563
// can send.
@@ -55,7 +73,8 @@ type License struct {
5573
// LoadLicenses reads a redBorder license from a file and returns a License
5674
// struct holding the decoded information.
5775
func LoadLicenses(config *AppConfig) (LimitBytes, error) {
58-
var licenses []License
76+
limits := make(LimitBytes)
77+
5978
log := log.WithField("prefix", "license")
6079

6180
files, err := ioutil.ReadDir(config.LicensesDirectory)
@@ -93,47 +112,36 @@ func LoadLicenses(config *AppConfig) (LimitBytes, error) {
93112
return nil, err
94113
}
95114

115+
limits[license.UUID] = &licenseStatus{}
116+
96117
expires := time.Unix(license.ExpireAt, 0)
97118
if expires.Before(time.Now()) {
98119
log.Warnf("License %s has expired", license.UUID)
120+
limits[license.UUID].Expired = true
99121
continue
100122
}
101123

102124
log.Infoln(FormatLicense(license))
103125

104-
licenses = append(licenses, *license)
105-
}
106-
107-
limits := make(LimitBytes)
108-
if !config.OrganizationMode {
109-
var totalBytes uint64
110-
for _, license := range licenses {
111-
if license.Organization != "" {
112-
log.Warnf("Ignoring license WITH organization %s", license.UUID)
126+
if config.OrganizationMode {
127+
if license.Organization == "" {
128+
log.Warnf("Ignoring license WITH NO organization %s", license.UUID)
113129
continue
114130
}
115131

116-
totalBytes += license.LimitBytes
117-
}
118-
119-
limits["*"] = totalBytes
120-
} else {
121-
for _, license := range licenses {
122-
if license.Organization == "" {
123-
log.Warnf("Ignoring license WITH NO organization %s", license.UUID)
132+
limits[license.UUID].Amount += license.LimitBytes
133+
limits[license.UUID].OrganizationUUID = license.Organization
134+
} else {
135+
if license.Organization != "" {
136+
log.Warnf("Ignoring license WITH organization %s", license.UUID)
124137
continue
125138
}
126139

127-
limits[license.Organization] = limits[license.Organization] + license.LimitBytes
140+
limits[license.UUID].Amount += license.LimitBytes
141+
limits[license.UUID].OrganizationUUID = "*"
128142
}
129143
}
130144

131-
for k, v := range limits {
132-
log.
133-
WithField("Total bytes", v).
134-
Infof("Organization %s", k)
135-
}
136-
137145
return limits, nil
138146
}
139147

@@ -237,12 +245,14 @@ func FormatLicense(license *License) string {
237245
keyColor := color.New(color.FgYellow, color.Bold).SprintFunc()
238246

239247
return fmt.Sprintf("Using license \n"+
240-
keyColor("\tUUID: ")+"%s\n"+
241-
keyColor("\tCluster UUID: ")+"%s\n"+
242-
keyColor("\tExpires: ")+"%s\n"+
243-
keyColor("\tLimit bytes: ")+"%d\n",
248+
keyColor("\tUUID: ")+"%s\n"+
249+
keyColor("\tCluster UUID: ")+"%s\n"+
250+
keyColor("\tOrganization UUID: ")+"%s\n"+
251+
keyColor("\tExpires: ")+"%s\n"+
252+
keyColor("\tLimit bytes: ")+"%d",
244253
license.UUID,
245254
license.ClusterUUID,
255+
license.Organization,
246256
time.Unix(license.ExpireAt, 0).String(),
247257
license.LimitBytes,
248258
)

monitor/messages.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ type Count struct {
3030
// Alert contains the information abot a message alerting that the
3131
// maximum number of messages has been reached.
3232
type Alert struct {
33-
Monitor string `json:"monitor"`
34-
Type string `json:"type"`
35-
UUID string `json:"uuid,omitempty"`
36-
CurrentBytes uint64 `json:"current_bytes,omitempty"`
37-
Limit uint64 `json:"limit"`
38-
Timestamp int64 `json:"timestamp"`
33+
Monitor string `json:"monitor"`
34+
Type string `json:"type"`
35+
UUID string `json:"uuid,omitempty"`
36+
CurrentBytes uint64 `json:"current_bytes,omitempty"`
37+
Limit uint64 `json:"limit,omitempty"`
38+
Timestamp int64 `json:"timestamp"`
39+
Licenses []string `json:"licenses,omitempty"`
3940
}

monitor/monitor.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ import (
2525

2626
type logger interface {
2727
Debugf(format string, args ...interface{})
28+
Infof(format string, args ...interface{})
29+
Infoln(args ...interface{})
2830
}
2931

3032
type nullLogger struct{}
3133

3234
func (n *nullLogger) Debugf(format string, args ...interface{}) {}
35+
func (n *nullLogger) Infof(format string, args ...interface{}) {}
36+
func (n *nullLogger) Infoln(args ...interface{}) {}
3337

3438
// Config contains the configuration for a Monitor.
3539
type Config struct {
@@ -64,6 +68,7 @@ func (mon *CountersMonitor) Spawn(id int) utils.Composer {
6468
monitor.Log = new(nullLogger)
6569
}
6670
monitor.db = bootstrapDB(mon.Limits)
71+
6772
return monitor
6873
}
6974

@@ -76,19 +81,40 @@ func (mon *CountersMonitor) Spawn(id int) utils.Composer {
7681
// does, send an alert to Kafka.
7782
func (mon *CountersMonitor) OnMessage(m *utils.Message, done utils.Done) {
7883
var (
79-
ok bool
8084
payload []byte
8185
err error
8286
bytes uint64
8387
)
8488

85-
if _, ok = m.Opts.Get("reset_notification"); ok {
86-
org, _ := m.Opts.Get("organization_uuid")
87-
mon.db[org.(string)] = 0
89+
if showTotal, ok := m.Opts.Get("show_total"); ok {
90+
if showTotalBool, ok := showTotal.(bool); ok {
91+
if showTotalBool {
92+
for k, v := range mon.db {
93+
mon.Log.Infof("[%s] Consumed bytes %d", k, v)
94+
}
95+
}
96+
}
97+
98+
done(m, 0, "Show total")
99+
return
100+
}
88101

89-
m.PushPayload(createResetNotificationMessage(org.(string)))
90-
mon.Log.Debugf("Sending reset notification")
91-
done(m, 0, "Reset notification")
102+
if _, ok := m.Opts.Get("allowed_licenses"); ok {
103+
if resetCounters, ok := m.Opts.Get("reset_counters"); ok {
104+
if shouldReset, ok := resetCounters.(bool); ok {
105+
if shouldReset {
106+
for organization := range mon.db {
107+
mon.db[organization] = 0
108+
}
109+
mon.Log.Infoln("Counters has been reset")
110+
}
111+
}
112+
}
113+
114+
licenses, _ := m.Opts.Get("licenses")
115+
// FIXME check assertion
116+
m.PushPayload(createLicensesAllowedMessage(licenses.([]string)))
117+
done(m, 0, "Allowed licenses")
92118
return
93119
}
94120

@@ -103,11 +129,12 @@ func (mon *CountersMonitor) OnMessage(m *utils.Message, done utils.Done) {
103129
return
104130
}
105131

106-
if ok = belongsToInterval(count.Timestamp, mon.Period, mon.Offset, mon.clk.Now().Unix()); !ok {
132+
if ok := belongsToInterval(count.Timestamp, mon.Period, mon.Offset, mon.clk.Now().Unix()); !ok {
107133
done(m, 0, "Message too old")
108134
return
109135
}
110136

137+
var ok bool
111138
if bytes, ok = mon.db[count.UUID]; !ok {
112139
m.PushPayload(createUknownUUIDMessage(count.UUID))
113140
done(m, 0, "Unknown UUID: \""+count.UUID+"\"")

0 commit comments

Comments
 (0)