From 72d0a014ff08f42165a7c20eab31d5fd0c586abd Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Mon, 6 May 2024 07:57:44 +0200 Subject: [PATCH 1/2] [receiver/googlepubsub] fix span and trace id parsing crash --- .../googlepubsubreceiver-logenhance.yaml | 29 + receiver/googlecloudpubsubreceiver/README.md | 59 +- .../internal/log_entry.go | 622 +++++------------- .../internal/log_entry_test.go | 350 +++++++++- .../{common_protos.go => log_proto_common.go} | 0 .../internal/log_proto_payload.go | 323 +++++++++ .../internal/log_proto_payload_test.go | 70 ++ .../internal/log_translate_options.go | 40 ++ .../googlecloudpubsubreceiver/receiver.go | 2 +- 9 files changed, 1030 insertions(+), 465 deletions(-) create mode 100644 .chloggen/googlepubsubreceiver-logenhance.yaml rename receiver/googlecloudpubsubreceiver/internal/{common_protos.go => log_proto_common.go} (100%) create mode 100644 receiver/googlecloudpubsubreceiver/internal/log_proto_payload.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/log_proto_payload_test.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/log_translate_options.go diff --git a/.chloggen/googlepubsubreceiver-logenhance.yaml b/.chloggen/googlepubsubreceiver-logenhance.yaml new file mode 100644 index 0000000000000..9354d76b0aeac --- /dev/null +++ b/.chloggen/googlepubsubreceiver-logenhance.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fix span and trace id parsing crash when encountering corrupt data in `cloud_logging` encoding + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [ 32007 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Fixed span and trace id parsing crash when encountering corrupt data. Enhanced the `cloud_logging` parsing so it's + better testable and include additional tests. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/googlecloudpubsubreceiver/README.md b/receiver/googlecloudpubsubreceiver/README.md index 5e5ee6a2ccc9d..980115ed87f1d 100644 --- a/receiver/googlecloudpubsubreceiver/README.md +++ b/receiver/googlecloudpubsubreceiver/README.md @@ -48,10 +48,10 @@ must the `encoding` field in the configuration be set. |-----------------------------------|----------------------|-------------------|------------------------------------------------| | org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | | org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | -| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | +| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message | | - | - | otlp_proto_trace | Decode OTLP trace message | -| - | - | otlp_proto_metric | Decode OTLP trace message | -| - | - | otlp_proto_log | Decode OTLP trace message | +| - | - | otlp_proto_metric | Decode OTLP metric message | +| - | - | otlp_proto_log | Decode OTLP log message | | - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type | | - | - | raw_text | Wrap in an OTLP log message | @@ -89,3 +89,56 @@ AND attributes.content-type = "application/protobuf" ``` +# Google Cloud Logging + +**Status**: Experimental + +Google Cloud logging uses the [LogEntry] to cary log information. In this section the mapping of the fields to +OpenTelemetry fields and attributes are documented. + +## Semantic Mapping + +Some of the attributes can be moved to OpenTelemetry Semantic Conventions. Note however that all the attributes are +considered experimental and are subject to change. + +| Field | Type | Description | Maps to Unified Model Field | +|-------------|-----------|----------------------------------------|--------------------------------| +| `insert_id` | `boolean` | A unique identifier for the log entry. | `Attributes["log.record.uid"]` | + +## Attribute and Field mapping + +The rest of the JSON body is either mapped to attributes with an `gcp` prefix or directly mapped to the LogRecord. + +| Field | Type | Description | Maps to Unified Model Field | +|--------------------|--------------------------|------------------------------------------------------------------------------------------------------------------|-----------------------------------------------| +| `timestamp` | `string` | The time the event described by the log entry occurred. | `Timestamp` | +| `receiveTimestamp` | `string` | The time the log entry was received. | `ObservedTimestamp` | +| `logName` | `string` | The URL-encoded log ID suffix of the log_name field identifies which log stream this entry belongs to. | `Attributes["gcp.log_name"]` (string) | +| `jsonPayload` | `google.protobuf.Struct` | The log entry payload, represented as a structure that is expressed as a JSON object. | `Body` (KVList) | +| `protoPayload` | `google.protobuf.Any` | The log entry payload, represented as a protocol buffer. | `Body` (KVList, key from JSON representation) | +| `textPayload` | `string` | The log entry payload, represented as a Unicode string (UTF-8). | `Body` (string) | +| `trace` | `string` | The trace associated with the log entry, if any. | `TraceId` | +| `spanId` | `string` | The span ID within the trace associated with the log entry. | `SpanId` | +| `traceSampled` | `boolean` | The sampling decision of the trace associated with the log entry. | `TraceFlags.SAMPLED` | +| `labels` | `map` | A set of user-defined (key, value) data that provides additional information about the log entry. | `Attributes["gcp.*"]` | +| `resource` | `MonitoredResource` | The monitored resource that produced this log entry. | `Resource["gcp.*"]` | +| `httpRequest` | `HttpRequest` | The HTTP request associated with the log entry, if any. | `Attributes["gcp.http_request"]` (KVList) | +| `operation` | `LogEntryOperation` | Information about a operation associated with the log entry. | `Attributes["gcp.operation"]` (KVList) | +| `sourceLocation` | `LogEntrySourceLocation` | Source code location information associated with the log entry. | `Attributes["gcp.source_location"]` (KVList) | +| `split` | `LogSplit` | Information indicating this LogEntry is part of a sequence of multiple log entries split from a single LogEntry. | `Attributes["gcp.log_split"]` (KVList) | + +## Severity Mapping + +The severity is mapping from [Google Cloud Log Severity](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity) to the OpenTelemetry Severity Number + +| CloudLog | Severity Number | CloudLog Description | +|------------------|------------------|----------------------------------------------------------------------------------------| +| `DEFAULT`(0) | `UNSPECIFIED`(0) | The log entry has no assigned severity level. | +| `DEBUG`(100) | `DEBUG`(5) | Debug or trace information. | +| `INFO`(200) | `INFO`(9) | Routine information, such as ongoing status or performance. | +| `NOTICE`(300) | `INFO2`(10) | Normal but significant events, such as start up, shut down, or a configuration change. | +| `WARNING`(400) | `WARN`(13) | Warning events might cause problems. | +| `ERROR`(500) | `ERROR`(17) | Error events are likely to cause problems. | +| `CRITICAL`(600) | `FATAL`(21) | Critical events cause more severe problems or outages. | +| `ALERT`(700) | `FATAL2`(22) | A person must take an action immediately. | +| `EMERGENCY`(800) | `FATAL4`(24) | One or more systems are unusable. | diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry.go b/receiver/googlecloudpubsubreceiver/internal/log_entry.go index e2b86f4ecf1d1..4eb6feca9c69e 100644 --- a/receiver/googlecloudpubsubreceiver/internal/log_entry.go +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry.go @@ -4,13 +4,11 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" import ( - "bytes" "context" "encoding/hex" stdjson "encoding/json" "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -24,44 +22,45 @@ import ( "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/reflect/protoregistry" - "google.golang.org/protobuf/types/known/anypb" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary var invalidTraceID = [16]byte{} var invalidSpanID = [8]byte{} +var errorParsingLogItem = errors.New("error parsing log item") -func cloudLoggingTraceToTraceIDBytes(trace string) [16]byte { +func cloudLoggingTraceToTraceIDBytes(trace string) ([16]byte, error) { // Format: projects/my-gcp-project/traces/4ebc71f1def9274798cac4e8960d0095 lastSlashIdx := strings.LastIndex(trace, "/") if lastSlashIdx == -1 { - return invalidTraceID + return invalidTraceID, errorParsingLogItem } traceIDStr := trace[lastSlashIdx+1:] - return traceIDStrTotraceIDBytes(traceIDStr) + return traceIDStrToTraceIDBytes(traceIDStr) } -func traceIDStrTotraceIDBytes(traceIDStr string) [16]byte { - traceIDSlice := [16]byte{} - decoded, err := hex.Decode(traceIDSlice[:], []byte(traceIDStr)) - if err != nil || decoded != 16 { - return invalidTraceID +func traceIDStrToTraceIDBytes(traceIDStr string) ([16]byte, error) { + decoded, err := hex.DecodeString(traceIDStr) + if err != nil { + return invalidTraceID, err } - - return traceIDSlice + if len(decoded) != 16 { + return invalidTraceID, errorParsingLogItem + } + return [16]byte(decoded), nil } -func spanIDStrToSpanIDBytes(spanIDStr string) [8]byte { - spanIDSlice := [8]byte{} - decoded, err := hex.Decode(spanIDSlice[:], []byte(spanIDStr)) - if err != nil || decoded != 8 { - return invalidSpanID +func spanIDStrToSpanIDBytes(spanIDStr string) ([8]byte, error) { + decoded, err := hex.DecodeString(spanIDStr) + if err != nil { + return invalidSpanID, err } - - return spanIDSlice + if len(decoded) != 8 { + return invalidSpanID, errorParsingLogItem + } + return [8]byte(decoded), nil } func cloudLoggingSeverityToNumber(severity string) plog.SeverityNumber { @@ -97,502 +96,231 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor { desc = logEntry.ProtoReflect().Descriptor() }) - return desc } -// TranslateLogEntry translates a JSON-encoded LogEntry message into a pair of -// pcommon.Resource and plog.LogRecord, trying to keep as close as possible to -// the semantic conventions. -// -// For maximum fidelity, the decoding is done according to the protobuf message -// schema; this ensures that a numeric value in the input is correctly -// translated to either an integer or a double in the output. It falls back to -// plain JSON decoding if payload type is not available in the proto registry. -func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) { - lr := plog.NewLogRecord() - res := pcommon.NewResource() - - var src map[string]stdjson.RawMessage - err := json.Unmarshal(data, &src) - - if err != nil { - return res, lr, err - } - - resAttrs := res.Attributes() - attrs := lr.Attributes() +type extractFn func(pcommon.Map, plog.LogRecord, pcommon.Map, string, stdjson.RawMessage, Options) error - for k, v := range src { - // Pick out some keys for special handling, and let the rest - // pass through to be translated according to the schema. - switch k { - // Unpack as suggested in the logs data model appendix - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model-appendix.md#google-cloud-logging - case "insertId": - // timestamp -> Attributes[“log.record.uid”] - // see: https://github.com/open-telemetry/semantic-conventions/blob/main/model/logs/general.yaml - var insertID string - err = json.Unmarshal(v, &insertID) - if err != nil { - return res, lr, err - } - attrs.PutStr("log.record.uid", insertID) - delete(src, k) - case "timestamp": - // timestamp -> Timestamp - var t time.Time - err = json.Unmarshal(v, &t) - if err != nil { - return res, lr, err - } - lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) - delete(src, k) - case "receiveTimestamp": - // timestamp -> Timestamp - var t time.Time - err = json.Unmarshal(v, &t) - if err != nil { - return res, lr, err - } - lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) - delete(src, k) - case "resource": - // resource -> Resource - // mapping type -> gcp.resource_type - // labels -> gcp.