Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Reuse the JSON writer to reduce memory allocations #27806

Closed
wants to merge 4 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 58 additions & 46 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -190,7 +189,6 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers

// fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed.
func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) {
var b []byte
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)
Expand All @@ -204,34 +202,44 @@ func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterS
is.record = 0 // Reset record index for next library.
logRecord := sl.LogRecords().At(k)

var err error
var l int
if c.config.ExportRaw {
b = []byte(logRecord.Body().AsString() + "\n")
_, err = buf.Write([]byte(logRecord.Body().AsString() + "\n"))
atoulme marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config)

// JSON encoding event and writing to buffer.
var err error
b, err = marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
jsonStream.Reset(buf)
jsonStream.Error = nil
jsonStream.WriteVal(event)
if jsonStream.Error != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"dropped log event: %v, error: %w", event, err)))
"dropped log event: %v, error: %w", event, jsonStream.Error)))
continue
}
if uint(jsonStream.Buffered()) > c.config.MaxEventSize {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"dropped log event: %v, error: %w", event, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize))))
continue
}
l = jsonStream.Buffered()
err = jsonStream.Flush()
if err == nil {
continue
}
}

// Continue adding events to buffer up to capacity.
_, err := buf.Write(b)
if err == nil {
continue
}

if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{i, j, k, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthLogs)))
" content length %d bytes", l, c.config.MaxContentLengthLogs)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors,
Expand All @@ -247,8 +255,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
jsonStream.Reset(buf)
atoulme marked this conversation as resolved.
Show resolved Hide resolved
for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := is.library; j < rm.ScopeMetrics().Len(); j++ {
Expand All @@ -260,20 +267,24 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS

// Parsing metric record to Splunk event.
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)
tempBuf.Reset()

for _, event := range events {
// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
jsonStream.Error = nil
jsonStream.WriteVal(event)
atoulme marked this conversation as resolved.
Show resolved Hide resolved
if jsonStream.Error != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonStream.Error)))
continue
}
tempBuf.Write(b)
if uint(jsonStream.Buffered()) > c.config.MaxEventSize {
permanentErrors = append(permanentErrors, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize))
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this approach works for metrics. We are not flushing or resetting the stream once this condition is triggered. This means we will be dropping all the events after we have buffered more than c.config.MaxEventSize. This is incorrect because MaxEventSize is supposed to be applied to each individual event.

One way forward I can think of is introducing another field to iterState for "datapoints" and using it in this loop. Having that, we don't need to remove marshalEvent (or whatever we call it) and copy-paste the code. Later we can use it in mapMetricToSplunkEvent as well to avoid redoing translations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let me drop that code path for now, and we can revisit when we have exporterhelper and a way to keep track of event consumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK changes made - the optimization is much less important, we get 3% only, but hopefully it helps for other cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the number of allocations is the same. In some cases, we utilize less memory overall and cpu, but it doesn't seem too much. At the same time, we make the code less readable. I'm not 100% sure if this change is a good improvement, considering a hypothetical error budget


}

// Continue adding events to buffer up to capacity.
b := tempBuf.Bytes()
_, err := buf.Write(b)
err := jsonStream.Flush()
if err == nil {
continue
}
Expand All @@ -283,7 +294,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
" content length %d bytes", jsonStream.Buffered(), c.config.MaxContentLengthMetrics)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
Expand All @@ -299,16 +310,24 @@ func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffe
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)
jsonStream.Reset(buf)
atoulme marked this conversation as resolved.
Show resolved Hide resolved

for i := is.record; i < len(events); i++ {
event := events[i]
// JSON encoding event and writing to buffer.
b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if jsonErr != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr)))

jsonStream.Error = nil
jsonStream.WriteVal(event)
if jsonStream.Error != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonStream.Error)))
continue
}
_, err := buf.Write(b)
if uint(jsonStream.Buffered()) > c.config.MaxEventSize {
permanentErrors = append(permanentErrors, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize))
continue
}
l := jsonStream.Buffered()
err := jsonStream.Flush()
if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{
Expand All @@ -318,7 +337,7 @@ func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffe
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
" content length %d bytes", l, c.config.MaxContentLengthMetrics)))
return iterState{
record: i + 1,
done: i+1 != len(events),
Expand Down Expand Up @@ -350,14 +369,21 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState
event := mapSpanToSplunkEvent(rs.Resource(), span, c.config)

// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err)))
jsonStream.Reset(buf)
jsonStream.Error = nil
jsonStream.WriteVal(event)
if jsonStream.Error != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, jsonStream.Error)))
continue
}
if uint(jsonStream.Buffered()) > c.config.MaxEventSize {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize))))
continue
}

// Continue adding events to buffer up to capacity.
_, err = buf.Write(b)
l := jsonStream.Buffered()
err := jsonStream.Flush()
if err == nil {
continue
}
Expand All @@ -367,7 +393,7 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthTraces)))
" content length %d bytes", l, c.config.MaxContentLengthTraces)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
Expand Down Expand Up @@ -682,17 +708,3 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]
"__splunk_app_version": config.SplunkAppVersion,
}
}

// marshalEvent marshals an event to JSON using a reusable jsoniter stream.
func marshalEvent(event *splunk.Event, sizeLimit uint, stream *jsoniter.Stream) ([]byte, error) {
stream.Reset(nil)
stream.Error = nil
stream.WriteVal(event)
if stream.Error != nil {
return nil, stream.Error
}
if uint(stream.Buffered()) > sizeLimit {
return nil, fmt.Errorf("event size %d exceeds limit %d", stream.Buffered(), sizeLimit)
}
return stream.Buffer(), nil
}