Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka] Enable Trace batch chunking before exporting to kafka #37176

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

shivanshuraj1333
Copy link
Member

Related to: #36982

Currently kafka exporter is not exporting messaging by adhering to the limits set by MaxMessageBytes, which can cause trace packets to drop when exceeding the limit.

This PR aims to fix that by effectively dividing the spans into chunks which have size under the limit set by MaxMessageBytes, and then individually send these chunks.

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
…haler

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
@shivanshuraj1333
Copy link
Member Author

This is almost done, would open it up for review after manually testing it with kafka.

}

var _ TracesMarshaler = (*jaegerMarshaler)(nil)

func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {
if j.maxMessageBytes <= 0 {
Copy link
Member

@yurishkuro yurishkuro Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be checked in Config.Validate method once

return nil
}

bts, err := p.marshaler.MarshalTraces(trace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still trying to marshal one large trace as a single message, it's not going to solve the problem in the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to prepare the msg at line 151, at 159 I'm checking the packet size, and then doing the splitting.
AFAIU, marshaling can happen at bigger trace, but the export has to be chunked, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot break up already marshaled bytes, it creates invalid payload that cannot be parsed by the consumer. This would break the data contract for the topic.

Copy link
Member Author

@shivanshuraj1333 shivanshuraj1333 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some confusion, if I am not wrong.

So, the logic that I am trying to implement is,

  • for Jaeger, I'm adding spans to the chunk while checking the size (easy). (since the spans already have resource info)

For the pdata_marshaler.go and marshaler.go I intend to do the following.

  • get the bytes by marshaling the whole trace in a given batch
    • if the size is under limits, append that to a chunk (*sarama.ProducerMessage)
    • if the size exceeds, split the spans into two halves (with the same resource) for the current batch
      • do this binary search till the size of new split spans after marshaling comes out to be under limits
    • repeat this for all the batches

I have updated the Marshaller here which contains []*sarama.ProducerMessage

Here I'm marshaling the first half and here marshaling the second half, which are the two halves of the original trace.

The above logic ensures that every slice is under the limits. []*ProducerMessageChunks contains -> []*sarama.ProducerMessage

and each value in []*sarama.ProducerMessage is a valid split of initial trace, and then the marshaled value, not the splitting of bytes.

Kindly correct me if I am missing something. (maybe I need to fix some things in the code, but do you agree the above logic works?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro when you get time, can you review my approach?

tldr: I am not appending bytes randomly, rather I am ensuring that the marshled trace when converted to *sarama.ProducerMessage follows the size, if it doesn't, I split the trace into two traces and then marshal again.

}

var _ TracesMarshaler = (*jaegerMarshaler)(nil)

func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this type need to change? The exporter is getting an array of messages and calls SendMessages(msgs []*ProducerMessage) on sarama.Producer. At that point, is the max message size applied to the whole batch or to each individual message? Intuitively it's the latter (I don't see evidence to the contrary), and since each message contains exactly one span per the previous logic, there's really nothing else to do here.

And if my assumption is correct, I don't think the introduction of ProducerMessageChunks struct is necessary - just make sure that each message is not larger than max.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants