Skip to content

Commit

Permalink
Merge pull request #72 from reubenmiller/fix-websocket-timeout
Browse files Browse the repository at this point in the history
fix(notification2): erroneous message parsing due to lack of allocations
  • Loading branch information
reubenmiller authored Oct 2, 2024
2 parents 6acc3d8 + 5bef2cb commit cc6a0cf
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
44 changes: 33 additions & 11 deletions pkg/c8y/notification2/notification2.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,33 @@ func parseMessage(raw []byte) *Message {

i := 0
for scanner.Scan() {
// Note: .Bytes() does not allocate memory, so you need to allocate the data
// and copy the data to another variable if you want it to persist
line := scanner.Bytes()
if len(line) == 0 {
inHeader = false
// empty line is the border between the header and body
continue
}
if inHeader {
if i == 0 {
message.Identifier = line
message.Identifier = make([]byte, len(line))
copy(message.Identifier, line)
} else if i == 1 {
message.Description = line
message.Description = make([]byte, len(line))
copy(message.Description, line)
} else if i == 2 {
message.Action = line
message.Action = make([]byte, len(line))
copy(message.Action, line)
}
// Ignore unknown header indexes
} else {
message.Payload = line
// Copy payload
message.Payload = make([]byte, len(line))
copy(message.Payload, line)
// TODO: Check if a single websocket message can continue multiple messages
// Stop processing further messages
break
}
i++
}
Expand Down Expand Up @@ -374,7 +386,7 @@ func (c *Notification2Client) writeHandler() {
if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil {
Logger.Warnf("Failed to send ping message to server. %s", err)
// go c.reconnect()
return
continue
}
Logger.Debug("Sent ping successfully")
}
Expand Down Expand Up @@ -413,28 +425,38 @@ func (c *Notification2Client) worker() error {
for {
messageType, rawMessage, err := c.ws.ReadMessage()

Logger.Debugf("Received message: type=%d, message=%s, err=%s", messageType, rawMessage, err)
if err == nil {
Logger.Debugf("Received websocket message: type=%d, len=%d", messageType, len(rawMessage))
}

if err != nil {
// Taken from https://github.com/gorilla/websocket/blob/main/examples/chat/client.go
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
Logger.Warnf("unexpected websocket error. %v", err)
} else {
Logger.Warnf("websocket error. %v", err)
Logger.Infof("websocket error. %v", err)
}
go c.reconnect()
break
}

if messageType == websocket.TextMessage {
switch messageType {
case websocket.TextMessage:
Logger.Debugf("Raw notification2 message (len=%d):\n%s", len(rawMessage), rawMessage)
message := parseMessage(rawMessage)

c.hub.broadcast <- *message

Logger.Debugf("message id: %s", message.Identifier)
Logger.Debugf("message description: %s", message.Description)
Logger.Debugf("message action: %s", message.Action)
Logger.Debugf("message payload: %s", message.Payload)
c.hub.broadcast <- *message

case websocket.CloseMessage:
Logger.Warnf("Received close message. %v", rawMessage)
case websocket.PingMessage:
Logger.Debugf("Received ping message. %v", rawMessage)

case websocket.PongMessage:
Logger.Debugf("Received pong message. %v", rawMessage)
}
}
}()
Expand Down
22 changes: 22 additions & 0 deletions pkg/c8y/notification2/notification2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package notification2

import (
"testing"

"github.com/reubenmiller/go-c8y/internal/pkg/testingutils"
)

func Test_ParseMessage(t *testing.T) {
raw := []byte(`CLJuEJgjIAAwAQ==
/t123456/measurements/12345
CREATE
{"self":"https://example.com/measurement/measurements/12345","time":"2024-10-02T12:11:00.000Z","id":"12345","source":{"self":"https://example.com/inventory/managedObjects/11111","id":"11111"},"type":"temperature"}`)

message := parseMessage(raw)

testingutils.Equals(t, "CLJuEJgjIAAwAQ==", string(message.Identifier))
testingutils.Equals(t, "CREATE", string(message.Action))
testingutils.Equals(t, "/t123456/measurements/12345", string(message.Description))
testingutils.Assert(t, len(message.Payload) > 0, "payload size should be larger than zero")
}

0 comments on commit cc6a0cf

Please sign in to comment.