diff --git a/.chloggen/pr25909-kafka-exporter-by-traceid.yaml b/.chloggen/pr25909-kafka-exporter-by-traceid.yaml new file mode 100644 index 000000000000..5c711704c176 --- /dev/null +++ b/.chloggen/pr25909-kafka-exporter-by-traceid.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add ability to publish kafka messages with key of TraceID - it will allow to partition the kafka Topic and consume it concurrently. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [12318] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index d2a9d40a1612..94d9781cb505 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -34,6 +34,9 @@ The following settings can be optionally configured: - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\ - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. +- `key_data` (default = none): affects key of the traces sent to kafka. All available key options: + - `none`: no use of key + - `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key. note that it affects the partition number, and allows concurrent consumers - `auth` - `plain_text` - `username`: The username to use. @@ -98,4 +101,5 @@ exporters: brokers: - localhost:9092 protocol_version: 2.0.0 + key_data: "traceID" ``` diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 4ddffe6d9b41..80301e8f0869 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -28,6 +28,9 @@ type Config struct { // Encoding of messages (default "otlp_proto") Encoding string `mapstructure:"encoding"` + // KeyData of messages (default "none") + KeyData string `mapstructure:"key_data"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index e3fd449d3482..8408976cfec8 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -55,6 +55,7 @@ func TestLoadConfig(t *testing.T) { }, Topic: "spans", Encoding: "otlp_proto", + KeyData: "none", Brokers: []string{"foo:123", "bar:456"}, Authentication: Authentication{ PlainText: &PlainTextConfig{ @@ -107,6 +108,7 @@ func TestLoadConfig(t *testing.T) { }, Topic: "spans", Encoding: "otlp_proto", + KeyData: "none", Brokers: []string{"foo:123", "bar:456"}, Authentication: Authentication{ PlainText: &PlainTextConfig{ diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 15d317652048..41c6d02a8670 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -21,6 +21,7 @@ const ( defaultMetricsTopic = "otlp_metrics" defaultLogsTopic = "otlp_logs" defaultEncoding = "otlp_proto" + defaultKeyData = "none" defaultBroker = "localhost:9092" // default from sarama.NewConfig() defaultMetadataRetryMax = 3 @@ -45,7 +46,7 @@ type FactoryOption func(factory *kafkaExporterFactory) func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption { return func(factory *kafkaExporterFactory) { for _, marshaler := range tracesMarshalers { - factory.tracesMarshalers[marshaler.Encoding()] = marshaler + factory.tracesMarshalers[KeyOfTracerMarshaller(marshaler)] = marshaler } } } @@ -96,6 +97,7 @@ func createDefaultConfig() component.Config { // using an empty topic to track when it has not been set by user, default is based on traces or metrics. Topic: "", Encoding: defaultEncoding, + KeyData: defaultKeyData, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ @@ -118,6 +120,14 @@ type kafkaExporterFactory struct { logsMarshalers map[string]LogsMarshaler } +func KeyOfTracerMarshaller(marshaler TracesMarshaler) string { + return KeyOfTracerMarshallerBy(marshaler.Encoding(), marshaler.KeyData()) +} + +func KeyOfTracerMarshallerBy(encoding string, keyData string) string { + return encoding + "#" + keyData +} + func (f *kafkaExporterFactory) createTracesExporter( ctx context.Context, set exporter.CreateSettings, diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 4451dbb4de68..62f649f4b4d5 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -32,6 +32,8 @@ type mockMarshaler[Data data] struct { func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding } +func (mm *mockMarshaler[Data]) KeyData() string { return "none" } + func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) { if mm.consume != nil { return mm.consume(d, topic) @@ -56,6 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.Equal(t, defaultKeyData, cfg.KeyData) assert.Equal(t, []string{defaultBroker}, cfg.Brokers) assert.Equal(t, "", cfg.Topic) } diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 534b973e8ef2..695d90325a24 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -9,6 +9,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/jaegertracing/jaeger v1.41.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.86.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.86.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0 github.com/stretchr/testify v1.8.4 github.com/xdg-go/scram v1.1.2 @@ -84,6 +85,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger retract ( diff --git a/exporter/kafkaexporter/jaeger_marshaler.go b/exporter/kafkaexporter/jaeger_marshaler.go index abc73c22f18a..eed21b73e1b3 100644 --- a/exporter/kafkaexporter/jaeger_marshaler.go +++ b/exporter/kafkaexporter/jaeger_marshaler.go @@ -53,9 +53,14 @@ func (j jaegerMarshaler) Encoding() string { return j.marshaler.encoding() } +func (j jaegerMarshaler) KeyData() string { + return j.marshaler.keyData() +} + type jaegerSpanMarshaler interface { marshal(span *jaegerproto.Span) ([]byte, error) encoding() string + keyData() string } type jaegerProtoSpanMarshaler struct { @@ -71,6 +76,10 @@ func (p jaegerProtoSpanMarshaler) encoding() string { return "jaeger_proto" } +func (p jaegerProtoSpanMarshaler) keyData() string { + return "none" +} + type jaegerJSONSpanMarshaler struct { pbMarshaler *jsonpb.Marshaler } @@ -92,3 +101,7 @@ func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error) func (p jaegerJSONSpanMarshaler) encoding() string { return "jaeger_json" } + +func (p jaegerJSONSpanMarshaler) keyData() string { + return "none" +} diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 638dfc439570..d99ec9b6d6b5 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -18,6 +18,7 @@ import ( ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") +var errUnrecognizedKeyData = fmt.Errorf("unrecognized key_data") // kafkaTracesProducer uses sarama to produce trace messages to Kafka. type kafkaTracesProducer struct { @@ -178,8 +179,11 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m // newTracesExporter creates Kafka exporter. func newTracesExporter(config Config, set exporter.CreateSettings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) { - marshaler := marshalers[config.Encoding] + marshaler := marshalers[KeyOfTracerMarshallerBy(config.Encoding, config.KeyData)] if marshaler == nil { + if config.KeyData != "none" && config.KeyData != "traceID" { + return nil, errUnrecognizedKeyData + } return nil, errUnrecognizedEncoding } producer, err := newSaramaProducer(config) diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index f812842b8a7a..276dfbcc0537 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -30,12 +30,19 @@ func TestNewExporter_err_version(t *testing.T) { } func TestNewExporter_err_encoding(t *testing.T) { - c := Config{Encoding: "foo"} + c := Config{Encoding: "foo", KeyData: defaultKeyData} texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, texp) } +func TestNewExporter_err_keyData(t *testing.T) { + c := Config{Encoding: defaultEncoding, KeyData: "foo"} + texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) + assert.EqualError(t, err, errUnrecognizedKeyData.Error()) + assert.Nil(t, texp) +} + func TestNewMetricsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers()) @@ -89,6 +96,7 @@ func TestNewExporter_err_auth_type(t *testing.T) { }, }, Encoding: defaultEncoding, + KeyData: defaultKeyData, Metadata: Metadata{ Full: false, }, @@ -114,6 +122,7 @@ func TestNewExporter_err_auth_type(t *testing.T) { func TestNewExporter_err_compression(t *testing.T) { c := Config{ Encoding: defaultEncoding, + KeyData: defaultKeyData, Producer: Producer{ Compression: "idk", }, @@ -295,6 +304,10 @@ func (e tracesErrorMarshaler) Encoding() string { panic("implement me") } +func (e tracesErrorMarshaler) KeyData() string { + panic("implement me") +} + func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMessage, error) { return nil, e.err } diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index 38525f9fbec7..ff7405642a56 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -17,6 +17,9 @@ type TracesMarshaler interface { // Encoding returns encoding name Encoding() string + + // KeyData returns keyData name + KeyData() string } // MetricsMarshaler marshals metrics into Message array @@ -40,14 +43,18 @@ type LogsMarshaler interface { // tracesMarshalers returns map of supported encodings with TracesMarshaler. func tracesMarshalers() map[string]TracesMarshaler { otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding) + otlpPbByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.ProtoMarshaler{}, defaultEncoding) otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json") + otlpJsonByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.JSONMarshaler{}, "otlp_json") jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} return map[string]TracesMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, - jaegerProto.Encoding(): jaegerProto, - jaegerJSON.Encoding(): jaegerJSON, + KeyOfTracerMarshaller(otlpPb): otlpPb, + KeyOfTracerMarshaller(otlpPbByTraceId): otlpPbByTraceId, + KeyOfTracerMarshaller(otlpJSON): otlpJSON, + KeyOfTracerMarshaller(otlpJsonByTraceId): otlpJsonByTraceId, + KeyOfTracerMarshaller(jaegerProto): jaegerProto, + KeyOfTracerMarshaller(jaegerJSON): jaegerJSON, } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index d1a70f47a04d..55989ff63be7 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -17,15 +17,17 @@ import ( ) func TestDefaultTracesMarshalers(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - "otlp_json", - "jaeger_proto", - "jaeger_json", + expectedKeys := []string{ + KeyOfTracerMarshallerBy("otlp_proto", "none"), + KeyOfTracerMarshallerBy("otlp_proto", "traceID"), + KeyOfTracerMarshallerBy("otlp_json", "none"), + KeyOfTracerMarshallerBy("otlp_json", "traceID"), + KeyOfTracerMarshallerBy("jaeger_proto", "none"), + KeyOfTracerMarshallerBy("jaeger_json", "none"), } marshalers := tracesMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { + assert.Equal(t, len(expectedKeys), len(marshalers)) + for _, e := range expectedKeys { t.Run(e, func(t *testing.T) { m, ok := marshalers[e] require.True(t, ok) @@ -91,7 +93,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14}) - marshaler, ok := tracesMarshalers()["otlp_json"] + marshaler, ok := tracesMarshalers()[KeyOfTracerMarshallerBy("otlp_json", defaultKeyData)] require.True(t, ok, "Must have otlp json marshaller") msg, err := marshaler.Marshal(traces, t.Name()) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index d4511946b3bc..579ca76d397e 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -8,6 +8,9 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) type pdataLogsMarshaler struct { @@ -90,9 +93,49 @@ func (p pdataTracesMarshaler) Encoding() string { return p.encoding } +func (p pdataTracesMarshaler) KeyData() string { + return "none" +} + +type pdataTracesMarshalerByTraceId pdataTracesMarshaler + +func (p pdataTracesMarshalerByTraceId) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { + var messages []*sarama.ProducerMessage + + for _, tracesById := range batchpersignal.SplitTraces(td) { + bts, err := p.marshaler.MarshalTraces(tracesById) + if err != nil { + return nil, err + } + var traceID = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + key := traceutil.TraceIDToHexOrEmptyString(traceID) + messages = append(messages, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(key), + }) + } + return messages, nil +} + +func (p pdataTracesMarshalerByTraceId) Encoding() string { + return p.encoding +} + +func (p pdataTracesMarshalerByTraceId) KeyData() string { + return "traceID" +} + func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { return pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, } } + +func newPdataTracesMarshalerByTraceId(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { + return pdataTracesMarshalerByTraceId{ + marshaler: marshaler, + encoding: encoding, + } +}