diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 55271cfd57e9..122ccb2d913e 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -190,7 +190,6 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers // fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed. func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) { - var b []byte var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) @@ -204,36 +203,50 @@ func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterS is.record = 0 // Reset record index for next library. logRecord := sl.LogRecords().At(k) + var err error + var l int if c.config.ExportRaw { - b = []byte(logRecord.Body().AsString() + "\n") + b := []byte(logRecord.Body().AsString() + "\n") + l = len(b) + _, err = buf.Write(b) } else { // Parsing log record to Splunk event. event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config) // JSON encoding event and writing to buffer. - var err error - b, err = marshalEvent(event, c.config.MaxEventSize, jsonStream) - if err != nil { + jsonStream.Reset(buf) + jsonStream.Error = nil + jsonStream.WriteVal(event) + if jsonStream.Error != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "dropped log event: %v, error: %w", event, err))) + "dropped log event: %v, error: %w", event, jsonStream.Error))) + continue + } + if uint(jsonStream.Buffered()) > c.config.MaxEventSize { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "dropped log event: %v, error: %w", event, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize)))) + continue + } + l = jsonStream.Buffered() + err = jsonStream.Flush() + if err == nil { continue } } // Continue adding events to buffer up to capacity. - _, err := buf.Write(b) - if err == nil { - continue - } + if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthLogs))) + " content length %d bytes", l, c.config.MaxContentLengthLogs))) return iterState{i, j, k + 1, false}, permanentErrors } + // flush exited before resetting the buffer of the stream. + jsonStream.Reset(buf) permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", err))) } @@ -247,7 +260,6 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) - tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) @@ -299,16 +311,24 @@ func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffe var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) + jsonStream.Reset(buf) for i := is.record; i < len(events); i++ { event := events[i] // JSON encoding event and writing to buffer. - b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) - if jsonErr != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) + + jsonStream.Error = nil + jsonStream.WriteVal(event) + if jsonStream.Error != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonStream.Error))) continue } - _, err := buf.Write(b) + if uint(jsonStream.Buffered()) > c.config.MaxEventSize { + permanentErrors = append(permanentErrors, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize)) + continue + } + l := jsonStream.Buffered() + err := jsonStream.Flush() if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{ @@ -318,12 +338,14 @@ func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffe } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + " content length %d bytes", l, c.config.MaxContentLengthMetrics))) return iterState{ record: i + 1, done: i+1 != len(events), }, permanentErrors } else if err != nil { + // flush exited before resetting the buffer of the stream. + jsonStream.Reset(buf) permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) } @@ -350,14 +372,21 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) // JSON encoding event and writing to buffer. - b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) + jsonStream.Reset(buf) + jsonStream.Error = nil + jsonStream.WriteVal(event) + if jsonStream.Error != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, jsonStream.Error))) + continue + } + if uint(jsonStream.Buffered()) > c.config.MaxEventSize { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, fmt.Errorf("event size %d exceeds limit %d", jsonStream.Buffered(), c.config.MaxEventSize)))) continue } // Continue adding events to buffer up to capacity. - _, err = buf.Write(b) + l := jsonStream.Buffered() + err := jsonStream.Flush() if err == nil { continue } @@ -367,9 +396,11 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthTraces))) + " content length %d bytes", l, c.config.MaxContentLengthTraces))) return iterState{i, j, k + 1, false}, permanentErrors } + // flush exited before resetting the buffer of the stream. + jsonStream.Reset(buf) permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) }