This repository has been archived by the owner on Jun 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
measures.go
105 lines (90 loc) · 2.31 KB
/
measures.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/streadway/amqp"
)
var (
host string
conn *amqp.Connection
exchangeName string = "messages"
)
type RMQMessage struct {
Timestamp int64 `json:"ts"`
Value float64 `json:"value"`
Topic string `json:"topic"`
Scope string `json:"scope"`
Tags map[string]string `json:"tags"`
}
func InitializeMeasureCollector(hostUrl string) {
host = hostUrl
}
func RunMeasureCollector(toKnowledgebase chan RMQMessage) {
for {
InfoLogger.Print("Measure collector (re)starts...")
c, err := getConnection()
if err != nil {
ErrorLogger.Print(err.Error())
continue
}
ch, err := c.Channel()
if err != nil {
ErrorLogger.Print(err.Error())
continue
}
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.QueueBind(
q.Name, // queue name
"*", // routing key
exchangeName, // exchange
false,
nil)
// Start subscription on everything
msgs, err := ch.Consume(
q.Name, // queue name
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
closeNotifyChan := ch.NotifyClose(make(chan *amqp.Error))
go func() {
for msg := range msgs {
// TODO: should drop messages going to Beehive
InfoLogger.Printf("%s received", msg.Body)
InfoLogger.Printf("%s", msg.RoutingKey)
var rmqMessage RMQMessage
json.Unmarshal(msg.Body, &rmqMessage)
InfoLogger.Printf("%v", rmqMessage)
// TODO: We want to filter out ones going to Beehive
// TODO: We should do the filtering by setting a proper routingkey
if rmqMessage.Scope == "node" {
toKnowledgebase <- rmqMessage
}
}
}()
err = <-closeNotifyChan
ErrorLogger.Print(err.Error())
InfoLogger.Print("Measure collector restarting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}
func getCredential(host string, id string, pw string) string {
return fmt.Sprintf("amqp://%s:%s@%s", id, pw, host)
}
func getConnection() (*amqp.Connection, error) {
if conn == nil || conn.IsClosed() {
return amqp.Dial(getCredential(host, "worker", "worker"))
}
return conn, nil
}