Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func main() {
connectionTopic := connection_topic.NewUpdateTopic()
orderTopic := order_topic.NewSendTopic()
loggerTopic := logger_topic.NewEnableTopic()
loggerTopic.SetDataLogger(subloggers[data_logger.Name].(*data_logger.Logger))

messageTopic := message_topic.NewUpdateTopic()
stateOrderTopic := order_topic.NewState(idToBoard, trace.Logger)
Expand All @@ -191,6 +192,7 @@ func main() {
broker.AddTopic(order_topic.StateName, stateOrderTopic)
broker.AddTopic(logger_topic.EnableName, loggerTopic)
broker.AddTopic(logger_topic.ResponseName, loggerTopic)
broker.AddTopic(logger_topic.VariablesName, loggerTopic)
broker.AddTopic(message_topic.UpdateName, messageTopic)

connections := make(chan *websocket.Client)
Expand Down
23 changes: 23 additions & 0 deletions backend/pkg/broker/topics/logger/enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import (
"sync/atomic"

"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
data_logger "github.com/HyperloopUPV-H8/h9-backend/pkg/logger/data"
"github.com/HyperloopUPV-H8/h9-backend/pkg/websocket"
"github.com/google/uuid"
ws "github.com/gorilla/websocket"
)

const EnableName abstraction.BrokerTopic = "logger/enable"
const ResponseName abstraction.BrokerTopic = "logger/response"
const VariablesName abstraction.BrokerTopic = "logger/variables"

type Enable struct {
isRunning *atomic.Bool
pool *websocket.Pool
connectionMx *sync.Mutex
subscribers map[websocket.ClientId]struct{}
api abstraction.BrokerAPI
data_logger *data_logger.Logger
}

func NewEnableTopic() *Enable {
Expand Down Expand Up @@ -58,6 +61,12 @@ func (enable *Enable) ClientMessage(id websocket.ClientId, message *websocket.Me

fmt.Printf("logger/response subscribed %s\n", uuid.UUID(id).String())
enable.subscribers[id] = struct{}{}

case VariablesName:
err := enable.handleVariables(id, message)
if err != nil {
fmt.Printf("error handling logger/variables: %v\n", err)
}
default:
enable.connectionMx.Lock()
defer enable.connectionMx.Unlock()
Expand Down Expand Up @@ -85,6 +94,16 @@ func (enable *Enable) handleToggle(_ websocket.ClientId, message *websocket.Mess
return nil
}

func (enable *Enable) handleVariables(_ websocket.ClientId, message *websocket.Message) error {
var allowedVars []string
err := json.Unmarshal(message.Payload, &allowedVars)
if err != nil {
return err
}
enable.data_logger.SetAllowedVars(allowedVars)
return nil
}

func (enable *Enable) broadcastState() error {
payload, err := json.Marshal(enable.isRunning.Load())
if err != nil {
Expand Down Expand Up @@ -123,6 +142,10 @@ func (enable *Enable) SetAPI(api abstraction.BrokerAPI) {
enable.api = api
}

func (enable *Enable) SetDataLogger(logger *data_logger.Logger) {
enable.data_logger = logger
}

type Status struct {
request bool
response chan bool
Expand Down
28 changes: 23 additions & 5 deletions backend/pkg/logger/data/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Logger struct {
fileLock *sync.RWMutex
// saveFiles is a map that contains the file of each value
saveFiles map[data.ValueName]*file.CSV
// allowedVars contains the full names (board/valueName) to be logged
allowedVars map[string]struct{}
}

// Record is a struct that implements the abstraction.LoggerRecord interface
Expand All @@ -41,15 +43,25 @@ func (*Record) Name() abstraction.LoggerName { return Name }

func NewLogger() *Logger {
logger := &Logger{
saveFiles: make(map[data.ValueName]*file.CSV),
running: &atomic.Bool{},
fileLock: &sync.RWMutex{},
saveFiles: make(map[data.ValueName]*file.CSV),
running: &atomic.Bool{},
fileLock: &sync.RWMutex{},
allowedVars: nil, // no filter by default
}

logger.running.Store(false)
return logger
}

// SetAllowedVars allows updating the list of allowed variables at runtime
func (sublogger *Logger) SetAllowedVars(allowed []string) {
allowedMap := make(map[string]struct{}, len(allowed))
for _, v := range allowed {
allowedMap[v] = struct{}{}
}
sublogger.allowedVars = allowedMap
}

func (sublogger *Logger) Start() error {
if !sublogger.running.CompareAndSwap(false, true) {
fmt.Println("Logger already running")
Expand Down Expand Up @@ -85,7 +97,13 @@ func (sublogger *Logger) PushRecord(record abstraction.LoggerRecord) error {

writeErr := error(nil)
for valueName, value := range dataRecord.Packet.GetValues() {

// Filter: only log allowed variables
if sublogger.allowedVars != nil {
key := dataRecord.From + "/" + string(valueName)
if _, ok := sublogger.allowedVars[key]; !ok {
continue
}
}
var valueRepresentation string
switch value := value.(type) {
case numeric:
Expand All @@ -102,7 +120,7 @@ func (sublogger *Logger) PushRecord(record abstraction.LoggerRecord) error {
}

err = saveFile.Write([]string{
fmt.Sprint(dataRecord.Packet.Timestamp().UnixMilli()),
fmt.Sprint(dataRecord.Packet.Timestamp().Format(time.StampMicro)),
dataRecord.From,
dataRecord.To,
valueRepresentation,
Expand Down
Loading