diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 57c8f5e2fd74..122ccb2d913e 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -4,6 +4,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( + "bytes" "context" "errors" "fmt" @@ -259,7 +260,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) - jsonStream.Reset(buf) + tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) for j := is.library; j < rm.ScopeMetrics().Len(); j++ { @@ -271,24 +272,20 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS // Parsing metric record to Splunk event. events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) - + tempBuf.Reset() for _, event := range events { // JSON encoding event and writing to buffer. - jsonStream.Error = nil - jsonStream.WriteVal(event) - if jsonStream.Error != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonStream.Error))) - continue - } - if uint(jsonStream.Buffered()) > c.config.MaxEventSize { - permanentErrors = append(permanentErrors, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize)) + b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) continue } - + tempBuf.Write(b) } // Continue adding events to buffer up to capacity. - err := jsonStream.Flush() + b := tempBuf.Bytes() + _, err := buf.Write(b) if err == nil { continue } @@ -298,11 +295,9 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", jsonStream.Buffered(), c.config.MaxContentLengthMetrics))) + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{i, j, k + 1, false}, permanentErrors } - // flush exited before resetting the buffer of the stream. - jsonStream.Reset(buf) permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) } @@ -718,3 +713,17 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string] "__splunk_app_version": config.SplunkAppVersion, } } + +// marshalEvent marshals an event to JSON using a reusable jsoniter stream. +func marshalEvent(event *splunk.Event, sizeLimit uint, stream *jsoniter.Stream) ([]byte, error) { + stream.Reset(nil) + stream.Error = nil + stream.WriteVal(event) + if stream.Error != nil { + return nil, stream.Error + } + if uint(stream.Buffered()) > sizeLimit { + return nil, fmt.Errorf("event size %d exceeds limit %d", stream.Buffered(), sizeLimit) + } + return stream.Buffer(), nil +}