-
Notifications
You must be signed in to change notification settings - Fork 0
/
statusreceiver.go
102 lines (85 loc) · 2.82 KB
/
statusreceiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/*
* Copyright 2024 Johan Stenstam, johan.stenstam@internetstiftelsen.se
*/
package main
import (
"encoding/json"
"log"
"path/filepath"
"strings"
"github.com/dnstapir/tapir"
"github.com/spf13/viper"
)
type StatusReceiver struct {
engine *tapir.MqttEngine
logger *Logger
StatusCh chan tapir.MqttPkgIn
PopStatus map[string]tapir.TapirFunctionStatus // map[id]status
EdmStatus map[string]tapir.TapirFunctionStatus // map[id]status
// ...
}
func NewStatusReceiver(config *Config, logger *Logger) (*StatusReceiver, error) {
statusCh := make(chan tapir.MqttPkgIn, 100)
return &StatusReceiver{
engine: config.MqttEngine,
logger: logger,
StatusCh: statusCh,
PopStatus: make(map[string]tapir.TapirFunctionStatus),
EdmStatus: make(map[string]tapir.TapirFunctionStatus),
}, nil
}
func (h *StatusReceiver) Start() {
statusTopic := viper.GetString("tapir.status.topic")
if statusTopic == "" {
TEMExiter("MQTT Engine %s: MQTT status topic not set", h.engine.Creator)
}
keyfile := viper.GetString("tapir.status.validatorkey")
if keyfile == "" {
TEMExiter("MQTT Engine %s: MQTT status validator key not set", h.engine.Creator)
}
keyfile = filepath.Clean(keyfile)
validatorkey, err := tapir.FetchMqttValidatorKey(statusTopic, keyfile)
if err != nil {
TEMExiter("MQTT Engine %s: Error fetching MQTT validator key for topic %s: %v", h.engine.Creator, statusTopic, err)
}
log.Printf("MQTT Engine %s: Adding topic '%s' to MQTT Engine", h.engine.Creator, statusTopic)
subch := make(chan tapir.MqttPkgIn, 100)
_, err = h.engine.SubToTopic(statusTopic, validatorkey, h.StatusCh, "struct", true)
if err != nil {
TEMExiter("Error adding sub topic %s to MQTT Engine: %v", statusTopic, err)
}
for pkg := range subch {
log.Printf("TAPIR-SLOGGER Status Receiver: Received message on topic %s", pkg.Topic)
switch {
case strings.HasPrefix(pkg.Topic, "status/up/"):
parts := strings.Split(pkg.Topic, "/")
if len(parts) == 4 {
edgeId := parts[2]
edgeComponent := parts[3]
var tfs tapir.TapirFunctionStatus
err := json.Unmarshal(pkg.Payload, &tfs)
if err != nil {
log.Printf("MQTT: failed to decode json: %v", err)
continue
}
log.Printf("Received status update from sender: %s, component: %s", edgeId, edgeComponent)
h.logger.LogStatus(edgeId, edgeComponent, tfs)
h.updateStatus(tfs)
} else {
log.Printf("TAPIR-SLOGGER MQTT Handler: Invalid topic format: %s", pkg.Topic)
}
default:
log.Printf("TAPIR-SLOGGER MQTT Handler: Received message on unknown topic %s", pkg.Topic)
}
}
}
func (h *StatusReceiver) updateStatus(status tapir.TapirFunctionStatus) {
h.PopStatus[status.FunctionID] = status
}
func (h *StatusReceiver) GetStatus() tapir.SloggerCmdResponse {
resp := tapir.SloggerCmdResponse{
PopStatus: h.PopStatus,
EdmStatus: h.EdmStatus,
}
return resp
}