From b1219376bfd70b41e56c4f98dc27f6c6d01816d7 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 29 Oct 2024 18:20:02 +0100 Subject: [PATCH] Close gzip readers after use Signed-off-by: Arve Knudsen --- pkg/continuoustest/client_test.go | 3 +++ pkg/distributor/otel.go | 15 +++++++++------ .../indexheader/stream_binary_reader.go | 3 ++- pkg/util/http.go | 7 +++++-- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/continuoustest/client_test.go b/pkg/continuoustest/client_test.go index 20071ef5835..322aecdf84d 100644 --- a/pkg/continuoustest/client_test.go +++ b/pkg/continuoustest/client_test.go @@ -37,6 +37,9 @@ func TestOTLPHttpClient_WriteSeries(t *testing.T) { // Handle compression reader, err := gzip.NewReader(request.Body) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, reader.Close()) + }) // Then Unmarshal body, err := io.ReadAll(reader) diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index cfdc7ed17c7..a4012ef0aa8 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/runutil" "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -78,10 +79,10 @@ func OTLPHandler( return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\" or no compression supported", contentEncoding) } - var decoderFunc func(io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) + var decoderFunc func(io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) switch contentType { case pbContentType: - decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { + decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { exportReq := pmetricotlp.NewExportRequest() unmarshaler := otlpProtoUnmarshaler{ request: &exportReq, @@ -98,7 +99,7 @@ func OTLPHandler( } case jsonContentType: - decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { + decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) { exportReq := pmetricotlp.NewExportRequest() sz := int(r.ContentLength) if sz > 0 { @@ -107,14 +108,16 @@ func OTLPHandler( } buf := buffers.Get(sz) if compression == util.Gzip { - var err error - reader, err = gzip.NewReader(reader) + gzReader, err := gzip.NewReader(reader) if err != nil { return exportReq, 0, errors.Wrap(err, "create gzip reader") } + + defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader") + reader = gzReader } - reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize)) + reader = http.MaxBytesReader(nil, io.NopCloser(reader), int64(maxRecvMsgSize)) if _, err := buf.ReadFrom(reader); err != nil { if util.IsRequestBodyTooLarge(err) { return exportReq, 0, httpgrpc.Error(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{ diff --git a/pkg/storegateway/indexheader/stream_binary_reader.go b/pkg/storegateway/indexheader/stream_binary_reader.go index 8a605b736ab..88d9ab1657b 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader.go +++ b/pkg/storegateway/indexheader/stream_binary_reader.go @@ -197,8 +197,9 @@ func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLo gzipped := bytes.NewReader(sparseData) gzipReader, err := gzip.NewReader(gzipped) if err != nil { - return fmt.Errorf("failed to create sparse index-header reader: %w", err) + return fmt.Errorf("failed to create sparse index-header gzip reader: %w", err) } + defer runutil.CloseWithLogOnErr(logger, gzipReader, "close sparse index-header gzip reader") sparseData, err = io.ReadAll(gzipReader) if err != nil { diff --git a/pkg/util/http.go b/pkg/util/http.go index 6b6b85f5289..1c839cf92d7 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -23,6 +23,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/runutil" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" @@ -226,11 +227,13 @@ func decompressRequest(buffers *RequestBuffers, reader io.Reader, expectedSize, } if compression == Gzip { - var err error - reader, err = gzip.NewReader(reader) + gzReader, err := gzip.NewReader(reader) if err != nil { return nil, errors.Wrap(err, "create gzip reader") } + + defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader") + reader = gzReader } // Limit at maxSize+1 so we can tell when the size is exceeded