From 460893f5d05061d0616b56fc63731916372fada7 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 13:11:58 +0200 Subject: [PATCH 1/9] log all types of websocket messages --- pkg/c8y/notification2/notification2.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 2b81e1e..102beb7 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -426,15 +426,24 @@ func (c *Notification2Client) worker() error { break } - if messageType == websocket.TextMessage { + switch messageType { + case websocket.TextMessage: message := parseMessage(rawMessage) - c.hub.broadcast <- *message - Logger.Debugf("message id: %s", message.Identifier) Logger.Debugf("message description: %s", message.Description) Logger.Debugf("message action: %s", message.Action) Logger.Debugf("message payload: %s", message.Payload) + + case websocket.CloseMessage: + Logger.Debugf("Received close message. %v", rawMessage) + go c.reconnect() + break + case websocket.PingMessage: + Logger.Debugf("Received ping message. %v", rawMessage) + + case websocket.PongMessage: + Logger.Debugf("Received pong message. %v", rawMessage) } } }() From b2cbc4b4cc3d2e45b0d55009e2f9495b8d9b7c0d Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 13:13:16 +0200 Subject: [PATCH 2/9] don't stop websocket writehandler on ping failures --- pkg/c8y/notification2/notification2.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 102beb7..7374e0c 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -374,7 +374,7 @@ func (c *Notification2Client) writeHandler() { if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil { Logger.Warnf("Failed to send ping message to server. %s", err) // go c.reconnect() - return + continue } Logger.Debug("Sent ping successfully") } From 9888fe03ed5134491552b876b0500314522d3d7b Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 13:38:22 +0200 Subject: [PATCH 3/9] log message before publishing it to hub --- pkg/c8y/notification2/notification2.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 7374e0c..658d3c6 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -429,11 +429,12 @@ func (c *Notification2Client) worker() error { switch messageType { case websocket.TextMessage: message := parseMessage(rawMessage) - c.hub.broadcast <- *message + Logger.Debugf("message len: %d", len(rawMessage)) Logger.Debugf("message id: %s", message.Identifier) Logger.Debugf("message description: %s", message.Description) Logger.Debugf("message action: %s", message.Action) Logger.Debugf("message payload: %s", message.Payload) + c.hub.broadcast <- *message case websocket.CloseMessage: Logger.Debugf("Received close message. %v", rawMessage) From 3ce59558faa969dbf0f09416258fe563f18a058e Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 13:52:00 +0200 Subject: [PATCH 4/9] improve logging of raw messages --- pkg/c8y/notification2/notification2.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 658d3c6..29e542a 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -413,7 +413,9 @@ func (c *Notification2Client) worker() error { for { messageType, rawMessage, err := c.ws.ReadMessage() - Logger.Debugf("Received message: type=%d, message=%s, err=%s", messageType, rawMessage, err) + if err == nil { + Logger.Debugf("Received websocket message: type=%d, len=%d", messageType, len(rawMessage)) + } if err != nil { // Taken from https://github.com/gorilla/websocket/blob/main/examples/chat/client.go @@ -428,8 +430,8 @@ func (c *Notification2Client) worker() error { switch messageType { case websocket.TextMessage: + Logger.Debugf("Raw notification2 message (len=%d):\n%s", len(rawMessage), rawMessage) message := parseMessage(rawMessage) - Logger.Debugf("message len: %d", len(rawMessage)) Logger.Debugf("message id: %s", message.Identifier) Logger.Debugf("message description: %s", message.Description) Logger.Debugf("message action: %s", message.Action) From b16fc54543bdd9eb2c125a932cb1651ae3a28a17 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 13:52:32 +0200 Subject: [PATCH 5/9] stop parsing after first message --- pkg/c8y/notification2/notification2.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 29e542a..f244186 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -324,6 +324,8 @@ func parseMessage(raw []byte) *Message { line := scanner.Bytes() if len(line) == 0 { inHeader = false + // empty line is the border between the header and body + continue } if inHeader { if i == 0 { @@ -336,6 +338,9 @@ func parseMessage(raw []byte) *Message { // Ignore unknown header indexes } else { message.Payload = line + // TODO: Check if a single websocket message can continue multiple messages + // Stop processing further messages + break } i++ } From 7ec2483fb3713c5a3d04b519b48ac8c3e1f7d9ea Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 14:04:47 +0200 Subject: [PATCH 6/9] allocate new byte slice when parsing raw input --- pkg/c8y/notification2/notification2.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index f244186..2e1f46f 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -337,7 +337,9 @@ func parseMessage(raw []byte) *Message { } // Ignore unknown header indexes } else { - message.Payload = line + // Copy payload + message.Payload = make([]byte, len(line)) + copy(message.Payload, line) // TODO: Check if a single websocket message can continue multiple messages // Stop processing further messages break From 20fada630411346c68742c95952805b8904c9be3 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 14:31:49 +0200 Subject: [PATCH 7/9] fix notification2 parsing bug due not allocating new memory before assigning the values --- pkg/c8y/notification2/notification2.go | 11 ++++++++--- pkg/c8y/notification2/notification2_test.go | 22 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 pkg/c8y/notification2/notification2_test.go diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 2e1f46f..9375059 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -321,6 +321,8 @@ func parseMessage(raw []byte) *Message { i := 0 for scanner.Scan() { + // Note: .Bytes() does not allocate memory, so you need to allocate the data + // and copy the data to another variable if you want it to persist line := scanner.Bytes() if len(line) == 0 { inHeader = false @@ -329,11 +331,14 @@ func parseMessage(raw []byte) *Message { } if inHeader { if i == 0 { - message.Identifier = line + message.Identifier = make([]byte, len(line)) + copy(message.Identifier, line) } else if i == 1 { - message.Description = line + message.Description = make([]byte, len(line)) + copy(message.Description, line) } else if i == 2 { - message.Action = line + message.Action = make([]byte, len(line)) + copy(message.Action, line) } // Ignore unknown header indexes } else { diff --git a/pkg/c8y/notification2/notification2_test.go b/pkg/c8y/notification2/notification2_test.go new file mode 100644 index 0000000..d868162 --- /dev/null +++ b/pkg/c8y/notification2/notification2_test.go @@ -0,0 +1,22 @@ +package notification2 + +import ( + "testing" + + "github.com/reubenmiller/go-c8y/internal/pkg/testingutils" +) + +func Test_ParseMessage(t *testing.T) { + raw := []byte(`CLJuEJgjIAAwAQ== +/t123456/measurements/12345 +CREATE + +{"self":"https://example.com/measurement/measurements/12345","time":"2024-10-02T12:11:00.000Z","id":"12345","source":{"self":"https://example.com/inventory/managedObjects/11111","id":"11111"},"type":"temperature"}`) + + message := parseMessage(raw) + + testingutils.Equals(t, "CLJuEJgjIAAwAQ==", string(message.Identifier)) + testingutils.Equals(t, "CREATE", string(message.Action)) + testingutils.Equals(t, "/t123456/measurements/12345", string(message.Description)) + testingutils.Assert(t, len(message.Payload) > 0, "payload size should be larger than zero") +} From 2fd1a6d00ba77f0cdba4b787c85ceca2e6fbd624 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 14:32:04 +0200 Subject: [PATCH 8/9] fix linting --- pkg/c8y/notification2/notification2.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 9375059..30868d5 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -453,7 +453,6 @@ func (c *Notification2Client) worker() error { case websocket.CloseMessage: Logger.Debugf("Received close message. %v", rawMessage) go c.reconnect() - break case websocket.PingMessage: Logger.Debugf("Received ping message. %v", rawMessage) From 5bef2cb8ec413926fd0bab4e586b8feab23e3702 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Wed, 2 Oct 2024 14:51:02 +0200 Subject: [PATCH 9/9] log expected errors to info level --- pkg/c8y/notification2/notification2.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/c8y/notification2/notification2.go b/pkg/c8y/notification2/notification2.go index 30868d5..f73ffab 100644 --- a/pkg/c8y/notification2/notification2.go +++ b/pkg/c8y/notification2/notification2.go @@ -434,7 +434,7 @@ func (c *Notification2Client) worker() error { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { Logger.Warnf("unexpected websocket error. %v", err) } else { - Logger.Warnf("websocket error. %v", err) + Logger.Infof("websocket error. %v", err) } go c.reconnect() break @@ -451,8 +451,7 @@ func (c *Notification2Client) worker() error { c.hub.broadcast <- *message case websocket.CloseMessage: - Logger.Debugf("Received close message. %v", rawMessage) - go c.reconnect() + Logger.Warnf("Received close message. %v", rawMessage) case websocket.PingMessage: Logger.Debugf("Received ping message. %v", rawMessage)