Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka] Enable Trace batch chunking before exporting to kafka #37176

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
61 changes: 53 additions & 8 deletions exporter/kafkaexporter/jaeger_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"bytes"
"fmt"

"github.com/IBM/sarama"
"github.com/gogo/protobuf/jsonpb"
Expand All @@ -16,34 +17,78 @@ import (
)

type jaegerMarshaler struct {
marshaler jaegerSpanMarshaler
marshaler jaegerSpanMarshaler
partitionedByTraceID bool
maxMessageBytes int
}

var _ TracesMarshaler = (*jaegerMarshaler)(nil)

func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this type need to change? The exporter is getting an array of messages and calls SendMessages(msgs []*ProducerMessage) on sarama.Producer. At that point, is the max message size applied to the whole batch or to each individual message? Intuitively it's the latter (I don't see evidence to the contrary), and since each message contains exactly one span per the previous logic, there's really nothing else to do here.

And if my assumption is correct, I don't think the introduction of ProducerMessageChunks struct is necessary - just make sure that each message is not larger than max.

if j.maxMessageBytes <= 0 {
Copy link
Member

@yurishkuro yurishkuro Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be checked in Config.Validate method once

return nil, fmt.Errorf("maxMessageBytes must be positive, got %d", j.maxMessageBytes)
}

batches := jaeger.ProtoFromTraces(traces)
var messages []*sarama.ProducerMessage
if len(batches) == 0 {
return []*ProducerMessageChunks{}, nil
}

var messages []*sarama.ProducerMessage
var messageChunks []*ProducerMessageChunks
var packetSize int
var errs error

for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process

bts, err := j.marshaler.marshal(span)
// continue to process spans that can be serialized
if err != nil {
errs = multierr.Append(errs, err)
errs = multierr.Append(errs, fmt.Errorf("failed to marshal span %s: %w", span.SpanID.String(), err))
continue
}

key := []byte(span.TraceID.String())
messages = append(messages, &sarama.ProducerMessage{
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(key),
})
}

// Deriving version from sarama library
// https://github.com/IBM/sarama/blob/main/async_producer.go#L454
currentMsgSize := msg.ByteSize(2)

// Check if message itself exceeds size limit
if currentMsgSize > j.maxMessageBytes {
errs = multierr.Append(errs, fmt.Errorf("span %s exceeds maximum message size: %d > %d",
span.SpanID.String(), currentMsgSize, j.maxMessageBytes))
continue
}

// Check if adding this message would exceed the chunk size limit
if (packetSize + currentMsgSize) <= j.maxMessageBytes {
packetSize += currentMsgSize
messages = append(messages, msg)
} else {
// Current chunk is full, create new chunk
if len(messages) > 0 {
messageChunks = append(messageChunks, &ProducerMessageChunks{messages})
messages = make([]*sarama.ProducerMessage, 0, len(messages)) // Preallocate with previous size
}
messages = append(messages, msg)
packetSize = currentMsgSize
}
}
}
return messages, errs

// Add final chunk if there are remaining messages
if len(messages) > 0 {
messageChunks = append(messageChunks, &ProducerMessageChunks{messages})
}

return messageChunks, errs
}

func (j jaegerMarshaler) Encoding() string {
Expand Down
41 changes: 33 additions & 8 deletions exporter/kafkaexporter/jaeger_marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,57 @@ func TestJaegerMarshaler(t *testing.T) {
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaler TracesMarshaler
name string
unmarshaler jaegerMarshaler
encoding string
messages []*sarama.ProducerMessage
messages []*ProducerMessageChunks
}{
{
name: "proto_marshaler",
unmarshaler: jaegerMarshaler{
marshaler: jaegerProtoSpanMarshaler{},
marshaler: jaegerProtoSpanMarshaler{},
maxMessageBytes: defaultProducerMaxMessageBytes,
},
encoding: "jaeger_proto",
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jaegerProtoBytes), Key: sarama.ByteEncoder(messageKey)}},
messages: []*ProducerMessageChunks{
{
Messages: []*sarama.ProducerMessage{
{
Topic: "topic",
Value: sarama.ByteEncoder(jaegerProtoBytes),
Key: sarama.ByteEncoder(messageKey),
},
},
},
},
},
{
name: "json_marshaler",
unmarshaler: jaegerMarshaler{
marshaler: jaegerJSONSpanMarshaler{
pbMarshaler: &jsonpb.Marshaler{},
},
maxMessageBytes: defaultProducerMaxMessageBytes,
},
encoding: "jaeger_json",
messages: []*sarama.ProducerMessage{{Topic: "topic", Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()), Key: sarama.ByteEncoder(messageKey)}},
messages: []*ProducerMessageChunks{
{
Messages: []*sarama.ProducerMessage{
{
Topic: "topic",
Value: sarama.ByteEncoder(jsonByteBuffer.Bytes()),
Key: sarama.ByteEncoder(messageKey),
},
},
},
},
},
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaler.Marshal(td, "topic")
t.Run(test.name, func(t *testing.T) {
msg, err := test.unmarshaler.Marshal(td, "topic")
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.messages, msg)
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
})
}
Expand Down
53 changes: 42 additions & 11 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -42,20 +43,48 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
messageChunks, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
return consumererror.NewPermanent(
fmt.Errorf("failed to marshal trace data: %w", err),
)
}
err = e.producer.SendMessages(messages)
if err != nil {
var prodErr sarama.ProducerErrors
if errors.As(err, &prodErr) {
if len(prodErr) > 0 {
return kafkaErrors{len(prodErr), prodErr[0].Err.Error()}

var allErrors []string

for i, chunk := range messageChunks {
sendErr := e.producer.SendMessages(chunk.Messages)
if sendErr == nil {
continue
}

var prodErrs sarama.ProducerErrors
if errors.As(sendErr, &prodErrs) {
for _, pErr := range prodErrs {
allErrors = append(allErrors,
fmt.Sprintf(
"chunk[%d] partition=%d offset=%d error=%v",
i,
pErr.Msg.Partition,
pErr.Msg.Offset,
pErr.Err,
),
)
}
} else {
allErrors = append(allErrors,
fmt.Sprintf("chunk[%d] error=%v", i, sendErr),
)
}
return err
}

if len(allErrors) > 0 {
return fmt.Errorf("encountered %d errors sending Kafka messages: %s",
len(allErrors),
strings.Join(allErrors, "; "),
)
}

return nil
}

Expand All @@ -73,8 +102,10 @@ func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) er
e.cfg.Encoding,
); errExt == nil {
e.marshaler = &tracesEncodingMarshaler{
marshaler: *marshaler,
encoding: e.cfg.Encoding,
marshaler: *marshaler,
encoding: e.cfg.Encoding,
partitionedByTraceID: e.cfg.PartitionTracesByID,
maxMessageBytes: e.cfg.Producer.MaxMessageBytes,
}
}
if marshaler, errInt := createTracesMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
Expand Down
10 changes: 5 additions & 5 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestTracesPusher(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -193,7 +193,7 @@ func TestTracesPusher_attr(t *testing.T) {
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -209,7 +209,7 @@ func TestTracesPusher_ctx(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -226,7 +226,7 @@ func TestTracesPusher_err(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
logger: zap.NewNop(),
}
t.Cleanup(func() {
Expand Down Expand Up @@ -432,7 +432,7 @@ func (e metricsErrorMarshaler) Encoding() string {

var _ TracesMarshaler = (*tracesErrorMarshaler)(nil)

func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.ProducerMessage, error) {
func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*ProducerMessageChunks, error) {
return nil, e.err
}

Expand Down
Loading
Loading