From 4fab4382a2b237066dfd339a7e4cb2f6736a08e6 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Wed, 12 Jun 2024 02:17:19 +0200 Subject: [PATCH] feat: add require datastreams to bulk index requests (#13398) --- internal/beater/beater.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 0d7fe4c8c9d..758eac2c089 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -691,13 +691,14 @@ func (s *Runner) newFinalBatchProcessor( scalingCfg.Disabled = !*enabled } opts := docappender.Config{ - CompressionLevel: esConfig.CompressionLevel, - FlushBytes: flushBytes, - FlushInterval: esConfig.FlushInterval, - Tracer: tracer, - MaxRequests: esConfig.MaxRequests, - Scaling: scalingCfg, - Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), + CompressionLevel: esConfig.CompressionLevel, + FlushBytes: flushBytes, + FlushInterval: esConfig.FlushInterval, + Tracer: tracer, + MaxRequests: esConfig.MaxRequests, + Scaling: scalingCfg, + Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), + RequireDataStream: true, } opts = docappenderConfig(opts, memLimit, s.logger) appender, err := docappender.New(client, opts)