Skip to content

Commit

Permalink
fix: parser for azureeventhubs message without time field (#1990)
Browse files Browse the repository at this point in the history
* fix: parser for azureeventhubs message without time field

* update CHANGELOG.md

* Update CHANGELOG.md

Co-authored-by: William Dumont <william.dumont@grafana.com>

* update CHANGELOG.md

* update link to the corresponded latest change in the Loki project

---------

Co-authored-by: William Dumont <william.dumont@grafana.com>
  • Loading branch information
andriikushch and wildum authored Oct 30, 2024
1 parent ca0ba7b commit 1c2ec56
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Main (unreleased)

- Add Prometheus bearer authentication to a `prometheus.write.queue` component (@freak12techno)

- Support logs that have a `timestamp` field instead of a `time` field for the `loki.source.azure_event_hubs` component. (@andriikushch)

- Add `proxy_url` to `otelcol.exporter.otlphttp`. (@wildum)

### Bugfixes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package parser

// This code is copied from Promtail (https://github.com/grafana/loki/commit/065bee7e72b00d800431f4b70f0d673d6e0e7a2b). The parser package is used to
// This code is copied from Promtail (https://github.com/grafana/loki/commit/2e62abbf47c47041027baf240722b3d76e7bd9a3). The parser package is used to
// enable parsing entries from Azure Event Hubs entries and forward them
// to other loki components.

Expand Down Expand Up @@ -36,15 +36,17 @@ func (l azureMonitorResourceLogs) validate() error {
// azureMonitorResourceLog used to unmarshal common schema for Azure resource logs
// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema
type azureMonitorResourceLog struct {
Time string `json:"time"`
Time string `json:"time"`
// Some logs have `time` field, some have `timeStamp` field : https://github.com/grafana/loki/issues/14176
TimeStamp string `json:"timeStamp"`
Category string `json:"category"`
ResourceID string `json:"resourceId"`
OperationName string `json:"operationName"`
}

// validate check if fields marked as required by schema for Azure resource log are not empty
func (l azureMonitorResourceLog) validate() error {
valid := len(l.Time) != 0 &&
valid := l.isTimeOrTimeStampFieldSet() &&
len(l.Category) != 0 &&
len(l.ResourceID) != 0 &&
len(l.OperationName) != 0
Expand All @@ -56,6 +58,34 @@ func (l azureMonitorResourceLog) validate() error {
return nil
}

func (l azureMonitorResourceLog) isTimeOrTimeStampFieldSet() bool {
return len(l.Time) != 0 || len(l.TimeStamp) != 0
}

// getTime returns time from `time` or `timeStamp` field. If both fields are set, `time` is used. If both fields are empty, error is returned.
func (l azureMonitorResourceLog) getTime() (time.Time, error) {
if len(l.Time) == 0 && len(l.TimeStamp) == 0 {
var t time.Time
return t, errors.New("time and timeStamp fields are empty")
}

if len(l.Time) != 0 {
t, err := time.Parse(time.RFC3339, l.Time)
if err != nil {
return t, err
}

return t.UTC(), nil
}

t, err := time.Parse(time.RFC3339, l.TimeStamp)
if err != nil {
return t, err
}

return t.UTC(), nil
}

type AzureEventHubsTargetMessageParser struct {
DisallowCustomMessages bool
}
Expand Down Expand Up @@ -156,11 +186,11 @@ func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet
}

func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time {
if !useIncomingTimestamp || logRecord.Time == "" {
if !useIncomingTimestamp || !logRecord.isTimeOrTimeStampFieldSet() {
return messageTime
}

recordTime, err := time.Parse(time.RFC3339, logRecord.Time)
recordTime, err := logRecord.getTime()
if err != nil {
return messageTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,40 @@ func Test_parseMessage_custom_message_and_logic_app_logs_disallowCustomMessages(
assert.Error(t, err)
}

func Test_parseMessage_message_without_time_with_time_stamp(t *testing.T) {
messageParser := &AzureEventHubsTargetMessageParser{
DisallowCustomMessages: true,
}

message := &sarama.ConsumerMessage{
Value: readFile(t, "testdata/message_without_time_with_time_stamp.json"),
Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC),
}

entries, err := messageParser.Parse(message, nil, nil, true)
assert.NoError(t, err)
assert.Len(t, entries, 1)

expectedLine1 := "{\n \"timeStamp\": \"2024-09-18T00:45:09+00:00\",\n \"resourceId\": \"/RESOURCE_ID\",\n \"operationName\": \"ApplicationGatewayAccess\",\n \"category\": \"ApplicationGatewayAccessLog\"\n }"
assert.Equal(t, expectedLine1, entries[0].Line)

assert.Equal(t, time.Date(2024, time.September, 18, 00, 45, 9, 0, time.UTC), entries[0].Timestamp)
}

func Test_parseMessage_message_without_time_and_time_stamp(t *testing.T) {
messageParser := &AzureEventHubsTargetMessageParser{
DisallowCustomMessages: true,
}

message := &sarama.ConsumerMessage{
Value: readFile(t, "testdata/message_without_time_and_time_stamp.json"),
Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC),
}

_, err := messageParser.Parse(message, nil, nil, true)
assert.EqualError(t, err, "required field or fields is empty")
}

func readFile(t *testing.T, filename string) []byte {
data, err := os.ReadFile(filename)
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"records": [
{
"resourceId": "/RESOURCE_ID",
"operationName": "ApplicationGatewayAccess",
"category": "ApplicationGatewayAccessLog"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"records": [
{
"timeStamp": "2024-09-18T00:45:09+00:00",
"resourceId": "/RESOURCE_ID",
"operationName": "ApplicationGatewayAccess",
"category": "ApplicationGatewayAccessLog"
}
]
}

0 comments on commit 1c2ec56

Please sign in to comment.