Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kakfa exporter - kafka key by TraceID #25909

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3c3b675
kafka exporter - using batchpersignal to SplitTraces by traceId
arik-dig Aug 13, 2023
3f0ccbb
Merge remote-tracking branch 'origin/main' into feat/kakfa-exporter-k…
arik-dig Aug 20, 2023
19b29d2
using traceutil TraceIDToHexOrEmptyString
arik-dig Aug 21, 2023
21421bc
add KeyData
arik-dig Aug 21, 2023
371ae71
init trace marshalers by KeyData
arik-dig Aug 21, 2023
9bd2428
verify default value for KeyData
arik-dig Aug 21, 2023
6d26b8b
using dedicated pdataTracesMarshalerByTraceId
arik-dig Aug 21, 2023
175b24b
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Aug 21, 2023
c1f045a
create newPdataTracesMarshalerByTraceId
arik-dig Aug 21, 2023
2f7c207
add comment about key_data = traceID
arik-dig Aug 21, 2023
3b9388e
using key_data of traceID
arik-dig Aug 21, 2023
ae8fefe
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Aug 22, 2023
9c7bb5a
add change log yaml file
arik-dig Aug 22, 2023
f577e95
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Aug 23, 2023
ca210ca
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Aug 27, 2023
e09138e
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 3, 2023
6568027
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 4, 2023
709bc5a
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 5, 2023
cb25535
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 6, 2023
2789405
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 7, 2023
dde0fa3
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 10, 2023
495de39
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 12, 2023
62c50c6
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 12, 2023
39c3467
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 13, 2023
4134f9d
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 13, 2023
8550caf
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Sep 14, 2023
e5c230e
Merge branch 'main' into feat/kakfa-exporter-key-by-traceid
arik-dig Oct 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/pr25909-kafka-exporter-by-traceid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add ability to publish kafka messages with key of TraceID - it will allow to partition the kafka Topic and consume it concurrently.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [12318]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ The following settings can be optionally configured:
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.\
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `key_data` (default = none): affects key of the traces sent to kafka. All available key options:
- `none`: no use of key
- `traceID`: taking the TraceID from `ExportTraceServiceRequest` and use it as a string key. note that it affects the partition number, and allows concurrent consumers
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down Expand Up @@ -98,4 +101,5 @@ exporters:
brokers:
- localhost:9092
protocol_version: 2.0.0
key_data: "traceID"
```
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Config struct {
// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// KeyData of messages (default "none")
KeyData string `mapstructure:"key_data"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that "" is also the same as none

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add this to the Validation method as well.


// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestLoadConfig(t *testing.T) {
},
Topic: "spans",
Encoding: "otlp_proto",
KeyData: "none",
Brokers: []string{"foo:123", "bar:456"},
Authentication: Authentication{
PlainText: &PlainTextConfig{
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestLoadConfig(t *testing.T) {
},
Topic: "spans",
Encoding: "otlp_proto",
KeyData: "none",
Brokers: []string{"foo:123", "bar:456"},
Authentication: Authentication{
PlainText: &PlainTextConfig{
Expand Down
12 changes: 11 additions & 1 deletion exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultKeyData = "none"
defaultBroker = "localhost:9092"
// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
Expand All @@ -45,7 +46,7 @@ type FactoryOption func(factory *kafkaExporterFactory)
func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range tracesMarshalers {
factory.tracesMarshalers[marshaler.Encoding()] = marshaler
factory.tracesMarshalers[KeyOfTracerMarshaller(marshaler)] = marshaler
}
}
}
Expand Down Expand Up @@ -96,6 +97,7 @@ func createDefaultConfig() component.Config {
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
KeyData: defaultKeyData,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -118,6 +120,14 @@ type kafkaExporterFactory struct {
logsMarshalers map[string]LogsMarshaler
}

func KeyOfTracerMarshaller(marshaler TracesMarshaler) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't it clear of the intent, it should also not be exported.

Could I ask you to rename add some comments around it to help explain its usage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key used to be only the Encoding, see line 48, now the key is combination of Encoding and KeyData

return KeyOfTracerMarshallerBy(marshaler.Encoding(), marshaler.KeyData())
}

func KeyOfTracerMarshallerBy(encoding string, keyData string) string {
return encoding + "#" + keyData
}

func (f *kafkaExporterFactory) createTracesExporter(
ctx context.Context,
set exporter.CreateSettings,
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type mockMarshaler[Data data] struct {

func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }

func (mm *mockMarshaler[Data]) KeyData() string { return "none" }

func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
if mm.consume != nil {
return mm.consume(d, topic)
Expand All @@ -56,6 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, defaultKeyData, cfg.KeyData)
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
}
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger v1.41.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.86.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.86.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.86.0
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
Expand Down Expand Up @@ -84,6 +85,8 @@ require (

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger

retract (
Expand Down
13 changes: 13 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ func (j jaegerMarshaler) Encoding() string {
return j.marshaler.encoding()
}

func (j jaegerMarshaler) KeyData() string {
return j.marshaler.keyData()
}

type jaegerSpanMarshaler interface {
marshal(span *jaegerproto.Span) ([]byte, error)
encoding() string
keyData() string
}

type jaegerProtoSpanMarshaler struct {
Expand All @@ -71,6 +76,10 @@ func (p jaegerProtoSpanMarshaler) encoding() string {
return "jaeger_proto"
}

func (p jaegerProtoSpanMarshaler) keyData() string {
return "none"
}

type jaegerJSONSpanMarshaler struct {
pbMarshaler *jsonpb.Marshaler
}
Expand All @@ -92,3 +101,7 @@ func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error)
func (p jaegerJSONSpanMarshaler) encoding() string {
return "jaeger_json"
}

func (p jaegerJSONSpanMarshaler) keyData() string {
return "none"
}
6 changes: 5 additions & 1 deletion exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
var errUnrecognizedKeyData = fmt.Errorf("unrecognized key_data")

// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
Expand Down Expand Up @@ -178,8 +179,11 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m

// newTracesExporter creates Kafka exporter.
func newTracesExporter(config Config, set exporter.CreateSettings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) {
marshaler := marshalers[config.Encoding]
marshaler := marshalers[KeyOfTracerMarshallerBy(config.Encoding, config.KeyData)]
if marshaler == nil {
if config.KeyData != "none" && config.KeyData != "traceID" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be done within the config.Validate as well to help make it easier for users to understand the issue.

Copy link
Author

@arik-dig arik-dig Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted to make least changes needed, and mechanism of validation was done in that place so i kept it here.
I'll try to move it to suggested config.Validate

return nil, errUnrecognizedKeyData
}
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
Expand Down
15 changes: 14 additions & 1 deletion exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ func TestNewExporter_err_version(t *testing.T) {
}

func TestNewExporter_err_encoding(t *testing.T) {
c := Config{Encoding: "foo"}
c := Config{Encoding: "foo", KeyData: defaultKeyData}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
assert.Nil(t, texp)
}

func TestNewExporter_err_keyData(t *testing.T) {
c := Config{Encoding: defaultEncoding, KeyData: "foo"}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
assert.EqualError(t, err, errUnrecognizedKeyData.Error())
assert.Nil(t, texp)
}

func TestNewMetricsExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
Expand Down Expand Up @@ -89,6 +96,7 @@ func TestNewExporter_err_auth_type(t *testing.T) {
},
},
Encoding: defaultEncoding,
KeyData: defaultKeyData,
Metadata: Metadata{
Full: false,
},
Expand All @@ -114,6 +122,7 @@ func TestNewExporter_err_auth_type(t *testing.T) {
func TestNewExporter_err_compression(t *testing.T) {
c := Config{
Encoding: defaultEncoding,
KeyData: defaultKeyData,
Producer: Producer{
Compression: "idk",
},
Expand Down Expand Up @@ -295,6 +304,10 @@ func (e tracesErrorMarshaler) Encoding() string {
panic("implement me")
}

func (e tracesErrorMarshaler) KeyData() string {
panic("implement me")
}

func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMessage, error) {
return nil, e.err
}
Expand Down
15 changes: 11 additions & 4 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type TracesMarshaler interface {

// Encoding returns encoding name
Encoding() string

// KeyData returns keyData name
KeyData() string
}

// MetricsMarshaler marshals metrics into Message array
Expand All @@ -40,14 +43,18 @@ type LogsMarshaler interface {
// tracesMarshalers returns map of supported encodings with TracesMarshaler.
func tracesMarshalers() map[string]TracesMarshaler {
otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding)
otlpPbByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.ProtoMarshaler{}, defaultEncoding)
otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json")
otlpJsonByTraceId := newPdataTracesMarshalerByTraceId(&ptrace.JSONMarshaler{}, "otlp_json")
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
return map[string]TracesMarshaler{
otlpPb.Encoding(): otlpPb,
otlpJSON.Encoding(): otlpJSON,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
KeyOfTracerMarshaller(otlpPb): otlpPb,
KeyOfTracerMarshaller(otlpPbByTraceId): otlpPbByTraceId,
KeyOfTracerMarshaller(otlpJSON): otlpJSON,
KeyOfTracerMarshaller(otlpJsonByTraceId): otlpJsonByTraceId,
KeyOfTracerMarshaller(jaegerProto): jaegerProto,
KeyOfTracerMarshaller(jaegerJSON): jaegerJSON,
}
}

Expand Down
18 changes: 10 additions & 8 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
)

func TestDefaultTracesMarshalers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"otlp_json",
"jaeger_proto",
"jaeger_json",
expectedKeys := []string{
KeyOfTracerMarshallerBy("otlp_proto", "none"),
KeyOfTracerMarshallerBy("otlp_proto", "traceID"),
KeyOfTracerMarshallerBy("otlp_json", "none"),
KeyOfTracerMarshallerBy("otlp_json", "traceID"),
KeyOfTracerMarshallerBy("jaeger_proto", "none"),
KeyOfTracerMarshallerBy("jaeger_json", "none"),
}
marshalers := tracesMarshalers()
assert.Equal(t, len(expectedEncodings), len(marshalers))
for _, e := range expectedEncodings {
assert.Equal(t, len(expectedKeys), len(marshalers))
for _, e := range expectedKeys {
t.Run(e, func(t *testing.T) {
m, ok := marshalers[e]
require.True(t, ok)
Expand Down Expand Up @@ -91,7 +93,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
span.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})

marshaler, ok := tracesMarshalers()["otlp_json"]
marshaler, ok := tracesMarshalers()[KeyOfTracerMarshallerBy("otlp_json", defaultKeyData)]
require.True(t, ok, "Must have otlp json marshaller")

msg, err := marshaler.Marshal(traces, t.Name())
Expand Down
43 changes: 43 additions & 0 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
)

type pdataLogsMarshaler struct {
Expand Down Expand Up @@ -90,9 +93,49 @@ func (p pdataTracesMarshaler) Encoding() string {
return p.encoding
}

func (p pdataTracesMarshaler) KeyData() string {
return "none"
}

type pdataTracesMarshalerByTraceId pdataTracesMarshaler

func (p pdataTracesMarshalerByTraceId) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
var messages []*sarama.ProducerMessage

for _, tracesById := range batchpersignal.SplitTraces(td) {
bts, err := p.marshaler.MarshalTraces(tracesById)
if err != nil {
return nil, err
}
var traceID = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
key := traceutil.TraceIDToHexOrEmptyString(traceID)
messages = append(messages, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(key),
})
}
return messages, nil
}

func (p pdataTracesMarshalerByTraceId) Encoding() string {
return p.encoding
}

func (p pdataTracesMarshalerByTraceId) KeyData() string {
return "traceID"
}

func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler {
return pdataTracesMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

func newPdataTracesMarshalerByTraceId(marshaler ptrace.Marshaler, encoding string) TracesMarshaler {
return pdataTracesMarshalerByTraceId{
marshaler: marshaler,
encoding: encoding,
}
Comment on lines +100 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this new code contributions to a new file to make it easier to maintain.

}