Skip to content

Commit 795b943

Browse files
otelconsumer: remove temporary handling of entity too large (#41911) (#41971)
This code was initially added in #41523 because of a limitation from the elasticsearch exporter. The elasticsearch exporter has been updated to enforce flush::max_bytes for the batcher extension and will automatically split the batch if it exceeds the limit. This error is now fixed in the collector v0.115.0. See open-telemetry/opentelemetry-collector-contrib#36396. (cherry picked from commit dbeb9cd) Co-authored-by: Mauri de Souza Meneguzzo <mauri870@gmail.com>
1 parent 57966ef commit 795b943

File tree

2 files changed

+0
-52
lines changed

2 files changed

+0
-52
lines changed

libbeat/outputs/otelconsumer/otelconsumer.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package otelconsumer
2020
import (
2121
"context"
2222
"fmt"
23-
"strings"
2423
"time"
2524

2625
"github.com/elastic/beats/v7/libbeat/beat"
@@ -103,30 +102,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
103102

104103
err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
105104
if err != nil {
106-
// If the batch is too large, the elasticsearchexporter will
107-
// return a 413 error.
108-
//
109-
// At the moment, the exporter does not support batch splitting
110-
// on error so we do it here.
111-
//
112-
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
113-
if strings.Contains(err.Error(), "Request Entity Too Large") {
114-
// Try and split the batch into smaller batches and retry
115-
if batch.SplitRetry() {
116-
st.BatchSplit()
117-
st.RetryableErrors(len(events))
118-
} else {
119-
// If the batch could not be split, there is no option left but
120-
// to drop it and log the error state.
121-
batch.Drop()
122-
st.PermanentErrors(len(events))
123-
out.log.Errorf("the batch is too large to be sent: %v", err)
124-
}
125-
126-
// Don't propagate the error, the batch was split and retried.
127-
return nil
128-
}
129-
130105
// Permanent errors shouldn't be retried. This tipically means
131106
// the data cannot be serialized by the exporter that is attached
132107
// to the pipeline or when the destination refuses the data because

libbeat/outputs/otelconsumer/otelconsumer_test.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -90,33 +90,6 @@ func TestPublish(t *testing.T) {
9090
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
9191
})
9292

93-
t.Run("split batch on entity too large error", func(t *testing.T) {
94-
batch := outest.NewBatch(event1, event2, event3)
95-
96-
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
97-
return errors.New("Request Entity Too Large")
98-
})
99-
100-
err := otelConsumer.Publish(ctx, batch)
101-
assert.NoError(t, err)
102-
assert.Len(t, batch.Signals, 1)
103-
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
104-
})
105-
106-
t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
107-
batch := outest.NewBatch(event1)
108-
109-
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
110-
return errors.New("Request Entity Too Large")
111-
})
112-
113-
err := otelConsumer.Publish(ctx, batch)
114-
assert.NoError(t, err)
115-
assert.Len(t, batch.Signals, 2)
116-
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
117-
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
118-
})
119-
12093
t.Run("drop batch on permanent consumer error", func(t *testing.T) {
12194
batch := outest.NewBatch(event1, event2, event3)
12295

0 commit comments

Comments
 (0)