From 691860f34b120cb351b16419cc3f2aa6ae003dc7 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Tue, 14 Jan 2025 01:19:50 +0530 Subject: [PATCH 1/8] feat: process message chunks for kafka exporter Signed-off-by: Shivanshu Raj Shrivastava --- exporter/kafkaexporter/jaeger_marshaler.go | 15 ++++-- .../kafkaexporter/jaeger_marshaler_test.go | 4 +- exporter/kafkaexporter/kafka_exporter.go | 53 +++++++++++++++---- exporter/kafkaexporter/kafka_exporter_test.go | 10 ++-- exporter/kafkaexporter/marshaler.go | 34 ++++++++---- exporter/kafkaexporter/marshaler_test.go | 2 +- exporter/kafkaexporter/pdata_marshaler.go | 14 +++-- 7 files changed, 96 insertions(+), 36 deletions(-) diff --git a/exporter/kafkaexporter/jaeger_marshaler.go b/exporter/kafkaexporter/jaeger_marshaler.go index 50640c83763f..94e5e89e3d2c 100644 --- a/exporter/kafkaexporter/jaeger_marshaler.go +++ b/exporter/kafkaexporter/jaeger_marshaler.go @@ -16,21 +16,27 @@ import ( ) type jaegerMarshaler struct { - marshaler jaegerSpanMarshaler + marshaler jaegerSpanMarshaler + partitionedByTraceID bool + maxMessageBytes int } var _ TracesMarshaler = (*jaegerMarshaler)(nil) -func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { +func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { + // ToDo: implement partitionedByTraceID + batches := jaeger.ProtoFromTraces(traces) var messages []*sarama.ProducerMessage + // ToDo: effectively chunk the spans adhering to j.maxMessageBytes + var messageChunks []*ProducerMessageChunks + var errs error for _, batch := range batches { for _, span := range batch.Spans { span.Process = batch.Process bts, err := j.marshaler.marshal(span) - // continue to process spans that can be serialized if err != nil { errs = multierr.Append(errs, err) continue @@ -43,7 +49,8 @@ func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama. }) } } - return messages, errs + messageChunks = append(messageChunks, &ProducerMessageChunks{messages}) + return messageChunks, errs } func (j jaegerMarshaler) Encoding() string { diff --git a/exporter/kafkaexporter/jaeger_marshaler_test.go b/exporter/kafkaexporter/jaeger_marshaler_test.go index ca4cd7e7440e..1e6d2f0bc3a8 100644 --- a/exporter/kafkaexporter/jaeger_marshaler_test.go +++ b/exporter/kafkaexporter/jaeger_marshaler_test.go @@ -61,9 +61,9 @@ func TestJaegerMarshaler(t *testing.T) { } for _, test := range tests { t.Run(test.encoding, func(t *testing.T) { - messages, err := test.unmarshaler.Marshal(td, "topic") + msg, err := test.unmarshaler.Marshal(td, "topic") require.NoError(t, err) - assert.Equal(t, test.messages, messages) + assert.Equal(t, test.messages, msg[0]) assert.Equal(t, test.encoding, test.unmarshaler.Encoding()) }) } diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 6720237fabe2..229576769866 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/IBM/sarama" "go.opentelemetry.io/collector/component" @@ -42,20 +43,48 @@ func (ke kafkaErrors) Error() string { } func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error { - messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans())) + messageChunks, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans())) if err != nil { - return consumererror.NewPermanent(err) + return consumererror.NewPermanent( + fmt.Errorf("failed to marshal trace data: %w", err), + ) } - err = e.producer.SendMessages(messages) - if err != nil { - var prodErr sarama.ProducerErrors - if errors.As(err, &prodErr) { - if len(prodErr) > 0 { - return kafkaErrors{len(prodErr), prodErr[0].Err.Error()} + + var allErrors []string + + for i, chunk := range messageChunks { + sendErr := e.producer.SendMessages(chunk.msg) + if sendErr == nil { + continue + } + + var prodErrs sarama.ProducerErrors + if errors.As(sendErr, &prodErrs) { + for _, pErr := range prodErrs { + allErrors = append(allErrors, + fmt.Sprintf( + "chunk[%d] partition=%d offset=%d error=%v", + i, + pErr.Msg.Partition, + pErr.Msg.Offset, + pErr.Err, + ), + ) } + } else { + allErrors = append(allErrors, + fmt.Sprintf("chunk[%d] error=%v", i, sendErr), + ) } - return err } + + if len(allErrors) > 0 { + return fmt.Errorf("encountered %d errors sending Kafka messages: %s", + len(allErrors), + strings.Join(allErrors, "; "), + ) + } + return nil } @@ -73,8 +102,10 @@ func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) er e.cfg.Encoding, ); errExt == nil { e.marshaler = &tracesEncodingMarshaler{ - marshaler: *marshaler, - encoding: e.cfg.Encoding, + marshaler: *marshaler, + encoding: e.cfg.Encoding, + partitionedByTraceID: e.cfg.PartitionTracesByID, + maxMessageBytes: e.cfg.Producer.MaxMessageBytes, } } if marshaler, errInt := createTracesMarshaler(e.cfg); e.marshaler == nil && errInt == nil { diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 80b56d05dbc8..bea46e4cd2de 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -174,7 +174,7 @@ func TestTracesPusher(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -193,7 +193,7 @@ func TestTracesPusher_attr(t *testing.T) { TopicFromAttribute: "kafka_topic", }, producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -209,7 +209,7 @@ func TestTracesPusher_ctx(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -226,7 +226,7 @@ func TestTracesPusher_err(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0), logger: zap.NewNop(), } t.Cleanup(func() { @@ -432,7 +432,7 @@ func (e metricsErrorMarshaler) Encoding() string { var _ TracesMarshaler = (*tracesErrorMarshaler)(nil) -func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.ProducerMessage, error) { +func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*ProducerMessageChunks, error) { return nil, e.err } diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index ba8f00ff5cf4..bc1fb2b799c0 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -14,10 +14,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" ) +type ProducerMessageChunks struct { + msg []*sarama.ProducerMessage +} + // TracesMarshaler marshals traces into Message array. type TracesMarshaler interface { // Marshal serializes spans into sarama's ProducerMessages - Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) + Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) // Encoding returns encoding name Encoding() string @@ -45,20 +49,21 @@ type LogsMarshaler interface { func createTracesMarshaler(config Config) (TracesMarshaler, error) { encoding := config.Encoding partitionTracesByID := config.PartitionTracesByID + maxMessageBytes := config.Producer.MaxMessageBytes jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} switch encoding { case defaultEncoding: - return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID), nil + return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID, maxMessageBytes), nil case "otlp_json": - return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID), nil + return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID, maxMessageBytes), nil case "zipkin_proto": - return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID), nil + return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID, maxMessageBytes), nil case "zipkin_json": - return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID), nil - case jaegerProtoSpanMarshaler{}.encoding(): + return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID, maxMessageBytes), nil + case jaegerProto.Encoding(): return jaegerProto, nil case jaegerJSON.Encoding(): return jaegerJSON, nil @@ -101,12 +106,20 @@ func createLogMarshaler(config Config) (LogsMarshaler, error) { // tracesEncodingMarshaler is a wrapper around ptrace.Marshaler that implements TracesMarshaler. type tracesEncodingMarshaler struct { - marshaler ptrace.Marshaler - encoding string + marshaler ptrace.Marshaler + encoding string + partitionedByTraceID bool + maxMessageBytes int } -func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { +func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { + // ToDo: implement partitionedByTraceID + var messages []*sarama.ProducerMessage + + // ToDo: effectively chunk the spans adhering to j.maxMessageBytes + var messageChunks []*ProducerMessageChunks + data, err := t.marshaler.MarshalTraces(traces) if err != nil { return nil, fmt.Errorf("failed to marshal traces: %w", err) @@ -115,7 +128,8 @@ func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([ Topic: topic, Value: sarama.ByteEncoder(data), }) - return messages, nil + messageChunks = append(messageChunks, &ProducerMessageChunks{msg: messages}) + return messageChunks, nil } func (t *tracesEncodingMarshaler) Encoding() string { diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index d68c6332768b..8be6eb9b13b7 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -466,7 +466,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { require.NoError(t, err, "Must have marshaled the data without error") require.Len(t, msg, test.numExpectedMessages, "Expected number of messages in the message") - for idx, singleMsg := range msg { + for idx, singleMsg := range msg[0].msg { data, err := singleMsg.Value.Encode() require.NoError(t, err, "Must not error when encoding value") require.NotNil(t, data, "Must have valid data to test") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index ae9726f2cbe8..458e53b2f70a 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -125,9 +125,14 @@ type pdataTracesMarshaler struct { marshaler ptrace.Marshaler encoding string partitionedByTraceID bool + maxMessageBytes int } -func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { +func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { + + // ToDo: effectively chunk the spans adhering to j.maxMessageBytes + var messageChunks []*ProducerMessageChunks + var msgs []*sarama.ProducerMessage if p.partitionedByTraceID { for _, trace := range batchpersignal.SplitTraces(td) { @@ -152,17 +157,20 @@ func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*saram }) } - return msgs, nil + messageChunks = append(messageChunks, &ProducerMessageChunks{msg: msgs}) + + return messageChunks, nil } func (p *pdataTracesMarshaler) Encoding() string { return p.encoding } -func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool) TracesMarshaler { +func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool, maxMessageBytes int) TracesMarshaler { return &pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, partitionedByTraceID: partitionedByTraceID, + maxMessageBytes: maxMessageBytes, } } From 45c15b8e267759c5bab893d07e3c78fd97b88094 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 04:59:43 +0530 Subject: [PATCH 2/8] feat: implement size based chunking for jaeger marshaler Signed-off-by: Shivanshu Raj Shrivastava --- exporter/kafkaexporter/jaeger_marshaler.go | 54 ++++++++++++++++++---- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/exporter/kafkaexporter/jaeger_marshaler.go b/exporter/kafkaexporter/jaeger_marshaler.go index 94e5e89e3d2c..60e348a04b2d 100644 --- a/exporter/kafkaexporter/jaeger_marshaler.go +++ b/exporter/kafkaexporter/jaeger_marshaler.go @@ -5,6 +5,7 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "bytes" + "fmt" "github.com/IBM/sarama" "github.com/gogo/protobuf/jsonpb" @@ -24,32 +25,69 @@ type jaegerMarshaler struct { var _ TracesMarshaler = (*jaegerMarshaler)(nil) func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { - // ToDo: implement partitionedByTraceID + if j.maxMessageBytes <= 0 { + return nil, fmt.Errorf("maxMessageBytes must be positive, got %d", j.maxMessageBytes) + } batches := jaeger.ProtoFromTraces(traces) - var messages []*sarama.ProducerMessage + if len(batches) == 0 { + return []*ProducerMessageChunks{}, nil + } - // ToDo: effectively chunk the spans adhering to j.maxMessageBytes + var messages []*sarama.ProducerMessage var messageChunks []*ProducerMessageChunks - + var packetSize int var errs error + for _, batch := range batches { for _, span := range batch.Spans { span.Process = batch.Process + bts, err := j.marshaler.marshal(span) if err != nil { - errs = multierr.Append(errs, err) + errs = multierr.Append(errs, fmt.Errorf("failed to marshal span %s: %w", span.SpanID.String(), err)) continue } + key := []byte(span.TraceID.String()) - messages = append(messages, &sarama.ProducerMessage{ + msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), Key: sarama.ByteEncoder(key), - }) + } + + // Deriving version from sarama library + // https://github.com/IBM/sarama/blob/main/async_producer.go#L454 + currentMsgSize := msg.ByteSize(2) + + // Check if message itself exceeds size limit + if currentMsgSize > j.maxMessageBytes { + errs = multierr.Append(errs, fmt.Errorf("span %s exceeds maximum message size: %d > %d", + span.SpanID.String(), currentMsgSize, j.maxMessageBytes)) + continue + } + + // Check if adding this message would exceed the chunk size limit + if (packetSize + currentMsgSize) <= j.maxMessageBytes { + packetSize += currentMsgSize + messages = append(messages, msg) + } else { + // Current chunk is full, create new chunk + if len(messages) > 0 { + messageChunks = append(messageChunks, &ProducerMessageChunks{messages}) + messages = make([]*sarama.ProducerMessage, 0, len(messages)) // Preallocate with previous size + } + messages = append(messages, msg) + packetSize = currentMsgSize + } } } - messageChunks = append(messageChunks, &ProducerMessageChunks{messages}) + + // Add final chunk if there are remaining messages + if len(messages) > 0 { + messageChunks = append(messageChunks, &ProducerMessageChunks{messages}) + } + return messageChunks, errs } From 52a48bc90563102b4fe235058de990368f243d98 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:00:33 +0530 Subject: [PATCH 3/8] test: add unit tests for jaeger marshaler Signed-off-by: Shivanshu Raj Shrivastava --- .../kafkaexporter/jaeger_marshaler_test.go | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/exporter/kafkaexporter/jaeger_marshaler_test.go b/exporter/kafkaexporter/jaeger_marshaler_test.go index 1e6d2f0bc3a8..993abffd0c47 100644 --- a/exporter/kafkaexporter/jaeger_marshaler_test.go +++ b/exporter/kafkaexporter/jaeger_marshaler_test.go @@ -38,32 +38,57 @@ func TestJaegerMarshaler(t *testing.T) { require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer, batches[0].Spans[0])) tests := []struct { - unmarshaler TracesMarshaler + name string + unmarshaler jaegerMarshaler encoding string - messages []*sarama.ProducerMessage + messages []*ProducerMessageChunks }{ { + name: "proto_marshaler", unmarshaler: jaegerMarshaler{ - marshaler: jaegerProtoSpanMarshaler{}, + marshaler: jaegerProtoSpanMarshaler{}, + maxMessageBytes: defaultProducerMaxMessageBytes, }, encoding: "jaeger_proto", - messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes), Key: sarama.ByteEncoder(messageKey)}}, + messages: []*ProducerMessageChunks{ + { + Messages: []*sarama.ProducerMessage{ + { + Topic: "topic", + Value: sarama.ByteEncoder(jaegerProtoBytes), + Key: sarama.ByteEncoder(messageKey), + }, + }, + }, + }, }, { + name: "json_marshaler", unmarshaler: jaegerMarshaler{ marshaler: jaegerJSONSpanMarshaler{ pbMarshaler: &jsonpb.Marshaler{}, }, + maxMessageBytes: defaultProducerMaxMessageBytes, }, encoding: "jaeger_json", - messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), Key: sarama.ByteEncoder(messageKey)}}, + messages: []*ProducerMessageChunks{ + { + Messages: []*sarama.ProducerMessage{ + { + Topic: "topic", + Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), + Key: sarama.ByteEncoder(messageKey), + }, + }, + }, + }, }, } for _, test := range tests { - t.Run(test.encoding, func(t *testing.T) { + t.Run(test.name, func(t *testing.T) { msg, err := test.unmarshaler.Marshal(td, "topic") require.NoError(t, err) - assert.Equal(t, test.messages, msg[0]) + assert.Equal(t, test.messages, msg) assert.Equal(t, test.encoding, test.unmarshaler.Encoding()) }) } From 09bdc642f7672b2c494118cccedc3836aad31ddb Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:01:07 +0530 Subject: [PATCH 4/8] feat: implement size based chunking with binary search for marshaler Signed-off-by: Shivanshu Raj Shrivastava --- exporter/kafkaexporter/marshaler.go | 91 +++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 10 deletions(-) diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index bc1fb2b799c0..c6b423589d80 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -15,7 +15,7 @@ import ( ) type ProducerMessageChunks struct { - msg []*sarama.ProducerMessage + Messages []*sarama.ProducerMessage } // TracesMarshaler marshals traces into Message array. @@ -115,20 +115,91 @@ type tracesEncodingMarshaler struct { func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { // ToDo: implement partitionedByTraceID - var messages []*sarama.ProducerMessage - - // ToDo: effectively chunk the spans adhering to j.maxMessageBytes - var messageChunks []*ProducerMessageChunks + messageChunks := make([]*ProducerMessageChunks, 0) + spanCount := traces.SpanCount() + // If the entire trace data fits within maxMessageBytes, send it as a single message + msg := &sarama.ProducerMessage{ + Topic: topic, + } data, err := t.marshaler.MarshalTraces(traces) if err != nil { return nil, fmt.Errorf("failed to marshal traces: %w", err) } - messages = append(messages, &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(data), - }) - messageChunks = append(messageChunks, &ProducerMessageChunks{msg: messages}) + msg.Value = sarama.ByteEncoder(data) + + // Deriving version from sarama library + // https://github.com/IBM/sarama/blob/main/async_producer.go#L454 + currentMsgSize := msg.ByteSize(2) + + if currentMsgSize <= t.maxMessageBytes || spanCount <= 1 { + messageChunks = append(messageChunks, &ProducerMessageChunks{[]*sarama.ProducerMessage{msg}}) + return messageChunks, nil + } + + // Split traces using binary search + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + spans := ss.Spans() + start := 0 + + for start < spans.Len() { + // Binary search to find the maximum number of spans that fit within maxMessageBytes + left, right := 1, spans.Len()-start + for left < right { + mid := (left + right + 1) / 2 + chunk := ptrace.NewTraces() + rs.CopyTo(chunk.ResourceSpans().AppendEmpty()) + newSS := chunk.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + ss.Scope().CopyTo(newSS.Scope()) + for k := 0; k < mid; k++ { + spans.At(start + k).CopyTo(newSS.Spans().AppendEmpty()) + } + + chunkData, err := t.marshaler.MarshalTraces(chunk) + if err != nil { + return nil, fmt.Errorf("failed to marshal traces chunk: %w", err) + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(chunkData), + } + currentMsgSize := msg.ByteSize(2) + + if currentMsgSize <= t.maxMessageBytes { + left = mid + } else { + right = mid - 1 + } + } + + // Create chunk with the found number of spans + chunk := ptrace.NewTraces() + rs.CopyTo(chunk.ResourceSpans().AppendEmpty()) + newSS := chunk.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + ss.Scope().CopyTo(newSS.Scope()) + for k := 0; k < left; k++ { + spans.At(start + k).CopyTo(newSS.Spans().AppendEmpty()) + } + + chunkData, err := t.marshaler.MarshalTraces(chunk) + if err != nil { + return nil, fmt.Errorf("failed to marshal traces chunk: %w", err) + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(chunkData), + } + messageChunks = append(messageChunks, &ProducerMessageChunks{[]*sarama.ProducerMessage{msg}}) + start += left + } + } + } + return messageChunks, nil } From 8a6f66f1eaea870eddbf211f2a5d680349c492ea Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:02:04 +0530 Subject: [PATCH 5/8] test: add unit tests for marshaler Signed-off-by: Shivanshu Raj Shrivastava --- exporter/kafkaexporter/marshaler_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 8be6eb9b13b7..03331dc98eda 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -330,7 +330,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { }, } - unkeyedMessageKey := []sarama.Encoder{nil} + unkeyedMessageKey := []sarama.ByteEncoder{nil} keyedOtlpJSON2 := map[string]any{ "resourceSpans": []any{ @@ -363,7 +363,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { keyedOtlpJSONResult[0] = keyedOtlpJSON1 keyedOtlpJSONResult[1] = keyedOtlpJSON2 - keyedMessageKey := []sarama.Encoder{sarama.ByteEncoder("0102030405060708090a0b0c0d0e0f10"), sarama.ByteEncoder("1112131415161718191a1b1c1d1e1f20")} + keyedMessageKey := []sarama.ByteEncoder{sarama.ByteEncoder("0102030405060708090a0b0c0d0e0f10"), sarama.ByteEncoder("1112131415161718191a1b1c1d1e1f20")} unkeyedZipkinJSON := []any{ map[string]any{ @@ -446,7 +446,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { partitionTracesByID bool numExpectedMessages int expectedJSON []any - expectedMessageKey []sarama.Encoder + expectedMessageKey []sarama.ByteEncoder unmarshaled any }{ {encoding: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}}, @@ -459,14 +459,19 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { marshaler, err := createTracesMarshaler(Config{ Encoding: test.encoding, PartitionTracesByID: test.partitionTracesByID, + Producer: Producer{MaxMessageBytes: defaultProducerMaxMessageBytes}, }) require.NoErrorf(t, err, "Must have %s marshaler", test.encoding) msg, err := marshaler.Marshal(traces, t.Name()) + var messages []*sarama.ProducerMessage + if len(msg) > 0 { + messages = msg[0].Messages + } require.NoError(t, err, "Must have marshaled the data without error") - require.Len(t, msg, test.numExpectedMessages, "Expected number of messages in the message") + require.Len(t, messages, test.numExpectedMessages, "Expected number of messages in the message") - for idx, singleMsg := range msg[0].msg { + for idx, singleMsg := range messages { data, err := singleMsg.Value.Encode() require.NoError(t, err, "Must not error when encoding value") require.NotNil(t, data, "Must have valid data to test") From 9e06f7d2609d55a272e484683af67c094e307220 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:02:27 +0530 Subject: [PATCH 6/8] feat: implement size based chunking with binary search for pdata marshaler Signed-off-by: Shivanshu Raj Shrivastava --- exporter/kafkaexporter/pdata_marshaler.go | 271 ++++++++++++++++++++-- 1 file changed, 251 insertions(+), 20 deletions(-) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 458e53b2f70a..bd668612fe25 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -4,6 +4,8 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" import ( + "fmt" + "github.com/IBM/sarama" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -12,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" + "go.uber.org/multierr" ) type pdataLogsMarshaler struct { @@ -128,38 +131,154 @@ type pdataTracesMarshaler struct { maxMessageBytes int } -func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { +func (p *pdataTracesMarshaler) processTrace( + trace ptrace.Traces, + topic string, + key []byte, + messages *[]*sarama.ProducerMessage, + messageChunks *[]*ProducerMessageChunks, + packetSize *int) error { - // ToDo: effectively chunk the spans adhering to j.maxMessageBytes - var messageChunks []*ProducerMessageChunks + if trace.ResourceSpans().Len() == 0 { + return nil + } - var msgs []*sarama.ProducerMessage - if p.partitionedByTraceID { - for _, trace := range batchpersignal.SplitTraces(td) { - bts, err := p.marshaler.MarshalTraces(trace) + bts, err := p.marshaler.MarshalTraces(trace) + if err != nil { + return fmt.Errorf("failed to marshal traces: %w", err) + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(key), + } + + // Deriving version from sarama library + // https://github.com/IBM/sarama/blob/main/async_producer.go#L454 + currentMsgSize := msg.ByteSize(2) + + if (*packetSize + currentMsgSize) <= p.maxMessageBytes { + *packetSize += currentMsgSize + *messages = append(*messages, msg) + return nil + } + + if len(*messages) > 0 { + *messageChunks = append(*messageChunks, &ProducerMessageChunks{*messages}) + *messages = make([]*sarama.ProducerMessage, 0) + *packetSize = 0 + } + + // If current message itself exceeds limit, split it + if currentMsgSize > p.maxMessageBytes { + remaining := trace + for { + splitPoint, err := p.findSplitPoint(remaining) if err != nil { - return nil, err + return fmt.Errorf("failed to find split point: %w", err) } - msgs = append(msgs, &sarama.ProducerMessage{ + + // If we can't split further (single span exceeds limit) + if splitPoint < 1 { + return fmt.Errorf("single span size %d exceeds maximum message size %d", currentMsgSize, p.maxMessageBytes) + } + + // Split and process first half + firstHalf, secondHalf := p.splitTraceBySpans(remaining, 0, splitPoint) + + bts, err := p.marshaler.MarshalTraces(firstHalf) + if err != nil { + return fmt.Errorf("failed to marshal first half: %w", err) + } + + splitMsg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - Key: sarama.ByteEncoder(traceutil.TraceIDToHexOrEmptyString(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID())), - }) + Key: sarama.ByteEncoder(key), + } + + *messageChunks = append(*messageChunks, &ProducerMessageChunks{[]*sarama.ProducerMessage{splitMsg}}) + + remaining = secondHalf + if remaining.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len() == 0 { + break + } + + bts, err = p.marshaler.MarshalTraces(remaining) + if err != nil { + return fmt.Errorf("failed to marshal remaining half: %w", err) + } + + remainingSize := (&sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(key), + }).ByteSize(2) + + if remainingSize <= p.maxMessageBytes { + *messages = append(*messages, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(key), + }) + *packetSize = remainingSize + break + } + } + return nil + } + + *messages = append(*messages, msg) + *packetSize = currentMsgSize + return nil +} + +func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) { + if td.ResourceSpans().Len() == 0 { + return []*ProducerMessageChunks{}, nil + } + + if p.maxMessageBytes <= 0 { + return nil, fmt.Errorf("maxMessageBytes must be positive, got %d", p.maxMessageBytes) + } + + var messageChunks []*ProducerMessageChunks + var messages []*sarama.ProducerMessage + var packetSize int + var errs error + + if p.partitionedByTraceID { + traces := batchpersignal.SplitTraces(td) + if len(traces) == 0 { + return []*ProducerMessageChunks{}, nil + } + + for _, trace := range traces { + if trace.ResourceSpans().Len() == 0 || + trace.ResourceSpans().At(0).ScopeSpans().Len() == 0 || + trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len() == 0 { + continue + } + + key := traceutil.TraceIDToHexOrEmptyString(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()) + if err := p.processTrace(trace, topic, []byte(key), &messages, &messageChunks, &packetSize); err != nil { + errs = multierr.Append(errs, fmt.Errorf("failed to process trace with ID %s: %w", key, err)) + continue + } } } else { - bts, err := p.marshaler.MarshalTraces(td) - if err != nil { - return nil, err + if err := p.processTrace(td, topic, nil, &messages, &messageChunks, &packetSize); err != nil { + errs = multierr.Append(errs, fmt.Errorf("failed to process batch: %w", err)) } - msgs = append(msgs, &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(bts), - }) } - messageChunks = append(messageChunks, &ProducerMessageChunks{msg: msgs}) + // Add any remaining messages as final chunk + if len(messages) > 0 { + messageChunks = append(messageChunks, &ProducerMessageChunks{messages}) + } - return messageChunks, nil + return messageChunks, errs } func (p *pdataTracesMarshaler) Encoding() string { @@ -174,3 +293,115 @@ func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partit maxMessageBytes: maxMessageBytes, } } + +// findSplitPoint uses binary search to find the maximum number of spans that can fit within maxMessageBytes +func (p *pdataTracesMarshaler) findSplitPoint(trace ptrace.Traces) (int, error) { + if trace.ResourceSpans().Len() == 0 || + trace.ResourceSpans().At(0).ScopeSpans().Len() == 0 { + return 0, fmt.Errorf("trace contains no spans") + } + + spans := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + totalSpans := spans.Len() + + if totalSpans == 0 { + return 0, fmt.Errorf("scope contains no spans") + } + + // If single span, check if it fits + if totalSpans == 1 { + testTrace := ptrace.NewTraces() + trace.ResourceSpans().At(0).Resource().CopyTo(testTrace.ResourceSpans().AppendEmpty().Resource()) + scope := testTrace.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + trace.ResourceSpans().At(0).ScopeSpans().At(0).Scope().CopyTo(scope.Scope()) + spans.At(0).CopyTo(scope.Spans().AppendEmpty()) + + bts, err := p.marshaler.MarshalTraces(testTrace) + if err != nil { + return 0, err + } + + if (&sarama.ProducerMessage{Value: sarama.ByteEncoder(bts)}).ByteSize(2) <= p.maxMessageBytes { + return 1, nil + } + return 0, fmt.Errorf("single span exceeds maximum message size") + } + + left := 1 + right := totalSpans + + // Binary search to find the largest number of spans that fits within maxMessageBytes + for left < right { + mid := (left + right + 1) / 2 + + testTrace := ptrace.NewTraces() + trace.ResourceSpans().At(0).Resource().CopyTo(testTrace.ResourceSpans().AppendEmpty().Resource()) + scope := testTrace.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + trace.ResourceSpans().At(0).ScopeSpans().At(0).Scope().CopyTo(scope.Scope()) + + for i := 0; i < mid; i++ { + spans.At(i).CopyTo(scope.Spans().AppendEmpty()) + } + + bts, err := p.marshaler.MarshalTraces(testTrace) + if err != nil { + return 0, err + } + + msgSize := (&sarama.ProducerMessage{Value: sarama.ByteEncoder(bts)}).ByteSize(2) + + if msgSize <= p.maxMessageBytes { + left = mid + } else { + right = mid - 1 + } + } + + if left == 0 { + return 0, fmt.Errorf("cannot find valid split point") + } + + return left, nil +} + +// splitTraceBySpans splits a trace into two parts based on span indices [0, splitPoint) and [splitPoint, end] +func (p *pdataTracesMarshaler) splitTraceBySpans(trace ptrace.Traces, low, high int) (ptrace.Traces, ptrace.Traces) { + firstHalf := ptrace.NewTraces() + secondHalf := ptrace.NewTraces() + + // Get the original resource spans + rSpans := trace.ResourceSpans() + if rSpans.Len() == 0 { + return firstHalf, secondHalf + } + + // Copy resource to both traces + rSpans.At(0).Resource().CopyTo(firstHalf.ResourceSpans().AppendEmpty().Resource()) + rSpans.At(0).Resource().CopyTo(secondHalf.ResourceSpans().AppendEmpty().Resource()) + + // For each scope spans in the original trace + originalScope := rSpans.At(0).ScopeSpans() + for i := 0; i < originalScope.Len(); i++ { + scope := originalScope.At(i) + spans := scope.Spans() + + // Create scope spans in both halves + firstScope := firstHalf.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + secondScope := secondHalf.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + + // Copy scope information + scope.Scope().CopyTo(firstScope.Scope()) + scope.Scope().CopyTo(secondScope.Scope()) + + // Split spans between first and second half + for j := 0; j < spans.Len(); j++ { + if j < low || j >= high { + spans.At(j).CopyTo(secondScope.Spans().AppendEmpty()) + } else { + spans.At(j).CopyTo(firstScope.Spans().AppendEmpty()) + } + } + } + + return firstHalf, secondHalf +} From 04655095aa09497d99f3884bc51237405ccb0865 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:02:51 +0530 Subject: [PATCH 7/8] test: add unit tests for pdata marshaler Signed-off-by: Shivanshu Raj Shrivastava --- .../kafkaexporter/pdata_marshaler_test.go | 282 ++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 exporter/kafkaexporter/pdata_marshaler_test.go diff --git a/exporter/kafkaexporter/pdata_marshaler_test.go b/exporter/kafkaexporter/pdata_marshaler_test.go new file mode 100644 index 000000000000..405b0a077259 --- /dev/null +++ b/exporter/kafkaexporter/pdata_marshaler_test.go @@ -0,0 +1,282 @@ +package kafkaexporter + +import ( + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestPdataTracesMarshaler_Marshal(t *testing.T) { + tests := []struct { + name string + maxMessageBytes int + partitionedByTraceID bool + traces ptrace.Traces + wantChunks int + wantError bool + }{ + { + name: "empty_trace", + maxMessageBytes: 1000, + partitionedByTraceID: false, + traces: ptrace.NewTraces(), + wantChunks: 0, + wantError: false, + }, + { + name: "single_small_trace", + maxMessageBytes: 1000, + partitionedByTraceID: false, + traces: createTracesWithSize(1, 1, 10), // 1 resource, 1 span, small attributes + wantChunks: 1, + wantError: false, + }, + { + name: "single_large_trace_needs_splitting", + maxMessageBytes: 10000, + partitionedByTraceID: false, + traces: createTracesWithSize(1, 10, 50), // 1 resource, 10 spans, larger attributes + wantChunks: 2, // Exact number depends on size + wantError: false, + }, + { + name: "multiple_traces_partitioned", + maxMessageBytes: 1000, + partitionedByTraceID: true, + traces: createTracesWithMultipleTraceIDs(3, 1, 10), // 3 traces, 1 span each, small attributes + wantChunks: 1, + wantError: false, + }, + { + name: "oversized_single_span", + maxMessageBytes: 10, + partitionedByTraceID: false, + traces: createTracesWithSize(1, 1, 1000), // 1 resource, 1 span, very large attributes + wantChunks: 0, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + marshaler := newPdataTracesMarshaler( + &ptrace.ProtoMarshaler{}, + "proto", + tt.partitionedByTraceID, + tt.maxMessageBytes, + ) + + chunks, err := marshaler.Marshal(tt.traces, "test-topic") + + if tt.wantError { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.wantChunks, len(chunks)) + + for _, chunk := range chunks { + for _, msg := range chunk.Messages { + assert.LessOrEqual(t, msg.ByteSize(2), tt.maxMessageBytes) + } + } + }) + } +} + +func TestPdataTracesMarshaler_FindSplitPoint(t *testing.T) { + tests := []struct { + name string + maxMessageBytes int + trace ptrace.Traces + wantSplitPoint int + wantError bool + }{ + { + name: "empty_trace", + maxMessageBytes: 1000, + trace: ptrace.NewTraces(), + wantSplitPoint: 0, + wantError: true, + }, + { + name: "single_span_fits", + maxMessageBytes: 1000, + trace: createTracesWithSize(1, 1, 10), + wantSplitPoint: 1, + wantError: false, + }, + { + name: "single_span_too_large", + maxMessageBytes: 10, + trace: createTracesWithSize(1, 1, 1000), + wantSplitPoint: 0, + wantError: true, + }, + { + name: "multiple_spans_partial_fit", + maxMessageBytes: 3000, + trace: createTracesWithSize(1, 10, 100), + wantSplitPoint: 1, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + marshaler := newPdataTracesMarshaler( + &ptrace.ProtoMarshaler{}, + "proto", + false, + tt.maxMessageBytes, + ) + + splitPoint, err := marshaler.(*pdataTracesMarshaler).findSplitPoint(tt.trace) + + if tt.wantError { + assert.Error(t, err) + return + } + + require.NoError(t, err) + if tt.wantSplitPoint > 0 { + assert.Equal(t, tt.wantSplitPoint, splitPoint) + } + + // Verify split point produces valid sized messages + protoMarshaler := &ptrace.ProtoMarshaler{} + if splitPoint > 0 { + firstHalf, _ := marshaler.(*pdataTracesMarshaler).splitTraceBySpans(tt.trace, 0, splitPoint) + bts, err := protoMarshaler.MarshalTraces(firstHalf) + require.NoError(t, err) + + msgSize := (&sarama.ProducerMessage{ + Value: sarama.ByteEncoder(bts), + }).ByteSize(2) + + assert.LessOrEqual(t, msgSize, tt.maxMessageBytes) + } + }) + } +} + +func TestPdataTracesMarshaler_SplitTraceBySpans(t *testing.T) { + tests := []struct { + name string + trace ptrace.Traces + low int + high int + wantFirst int + wantSecond int + }{ + { + name: "split_empty_trace", + trace: ptrace.NewTraces(), + low: 0, + high: 0, + wantFirst: 0, + wantSecond: 0, + }, + { + name: "split_single_span", + trace: createTracesWithSize(1, 1, 10), + low: 0, + high: 1, + wantFirst: 1, + wantSecond: 0, + }, + { + name: "split_multiple_spans", + trace: createTracesWithSize(1, 10, 10), + low: 0, + high: 5, + wantFirst: 5, + wantSecond: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + marshaler := newPdataTracesMarshaler( + &ptrace.ProtoMarshaler{}, + "proto", + false, + 1000, + ) + + first, second := marshaler.(*pdataTracesMarshaler).splitTraceBySpans(tt.trace, tt.low, tt.high) + + if tt.trace.ResourceSpans().Len() > 0 { + firstSpans := first.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + secondSpans := second.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + assert.Equal(t, tt.wantFirst, firstSpans.Len()) + assert.Equal(t, tt.wantSecond, secondSpans.Len()) + + if tt.wantFirst > 0 { + assert.Equal(t, + tt.trace.ResourceSpans().At(0).Resource().Attributes().AsRaw(), + first.ResourceSpans().At(0).Resource().Attributes().AsRaw(), + ) + } + if tt.wantSecond > 0 { + assert.Equal(t, + tt.trace.ResourceSpans().At(0).Resource().Attributes().AsRaw(), + second.ResourceSpans().At(0).Resource().Attributes().AsRaw(), + ) + } + } + }) + } +} + +// Helper function + +func createTracesWithSize(resourceSpans, spansPerResource int, attrSize int) ptrace.Traces { + traces := ptrace.NewTraces() + + for i := 0; i < resourceSpans; i++ { + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resource", "resource-"+string(rune(i))) + + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().SetName("scope-" + string(rune(i))) + + for j := 0; j < spansPerResource; j++ { + span := ss.Spans().AppendEmpty() + span.SetName("span-" + string(rune(j))) + attrs := span.Attributes() + for k := 0; k < attrSize; k++ { + attrs.PutStr("key-"+string(rune(k)), "value-"+string(rune(k))) + } + } + } + + return traces +} + +func createTracesWithMultipleTraceIDs(numTraces, spansPerTrace, attrSize int) ptrace.Traces { + traces := ptrace.NewTraces() + + for i := 0; i < numTraces; i++ { + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + + for j := 0; j < spansPerTrace; j++ { + span := ss.Spans().AppendEmpty() + traceID := pcommon.TraceID([16]byte{byte(i), byte(j)}) + span.SetTraceID(traceID) + + attrs := span.Attributes() + for k := 0; k < attrSize; k++ { + attrs.PutStr("key-"+string(rune(k)), "value-"+string(rune(k))) + } + } + } + + return traces +} From 90814f2f9154700d1a1da417058087af14f625d6 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 15 Jan 2025 05:03:55 +0530 Subject: [PATCH 8/8] chore: refactor --- exporter/kafkaexporter/kafka_exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 229576769866..98f979b895fc 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -53,7 +53,7 @@ func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces var allErrors []string for i, chunk := range messageChunks { - sendErr := e.producer.SendMessages(chunk.msg) + sendErr := e.producer.SendMessages(chunk.Messages) if sendErr == nil { continue }