Skip to content

Commit

Permalink
revert the changes for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Oct 19, 2023
1 parent dc447ea commit 193ccd6
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -259,7 +260,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)
jsonStream.Reset(buf)
tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for j := is.library; j < rm.ScopeMetrics().Len(); j++ {
Expand All @@ -271,24 +272,20 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS

// Parsing metric record to Splunk event.
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)

tempBuf.Reset()
for _, event := range events {
// JSON encoding event and writing to buffer.
jsonStream.Error = nil
jsonStream.WriteVal(event)
if jsonStream.Error != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonStream.Error)))
continue
}
if uint(jsonStream.Buffered()) > c.config.MaxEventSize {
permanentErrors = append(permanentErrors, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize))
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
continue
}

tempBuf.Write(b)
}

// Continue adding events to buffer up to capacity.
err := jsonStream.Flush()
b := tempBuf.Bytes()
_, err := buf.Write(b)
if err == nil {
continue
}
Expand All @@ -298,11 +295,9 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", jsonStream.Buffered(), c.config.MaxContentLengthMetrics)))
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
return iterState{i, j, k + 1, false}, permanentErrors
}
// flush exited before resetting the buffer of the stream.
jsonStream.Reset(buf)
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", err)))
}
Expand Down Expand Up @@ -718,3 +713,17 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]
"__splunk_app_version": config.SplunkAppVersion,
}
}

// marshalEvent marshals an event to JSON using a reusable jsoniter stream.
func marshalEvent(event *splunk.Event, sizeLimit uint, stream *jsoniter.Stream) ([]byte, error) {
stream.Reset(nil)
stream.Error = nil
stream.WriteVal(event)
if stream.Error != nil {
return nil, stream.Error
}
if uint(stream.Buffered()) > sizeLimit {
return nil, fmt.Errorf("event size %d exceeds limit %d", stream.Buffered(), sizeLimit)
}
return stream.Buffer(), nil
}

0 comments on commit 193ccd6

Please sign in to comment.