Skip to content

Commit

Permalink
Close gzip readers after use
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Oct 29, 2024
1 parent 41a1cc0 commit b121937
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
3 changes: 3 additions & 0 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestOTLPHttpClient_WriteSeries(t *testing.T) {
// Handle compression
reader, err := gzip.NewReader(request.Body)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, reader.Close())
})

// Then Unmarshal
body, err := io.ReadAll(reader)
Expand Down
15 changes: 9 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/runutil"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -78,10 +79,10 @@ func OTLPHandler(
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\" or no compression supported", contentEncoding)
}

var decoderFunc func(io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error)
var decoderFunc func(io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
unmarshaler := otlpProtoUnmarshaler{
request: &exportReq,
Expand All @@ -98,7 +99,7 @@ func OTLPHandler(
}

case jsonContentType:
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
sz := int(r.ContentLength)
if sz > 0 {
Expand All @@ -107,14 +108,16 @@ func OTLPHandler(
}
buf := buffers.Get(sz)
if compression == util.Gzip {
var err error
reader, err = gzip.NewReader(reader)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return exportReq, 0, errors.Wrap(err, "create gzip reader")
}

defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader")
reader = gzReader
}

reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize))
reader = http.MaxBytesReader(nil, io.NopCloser(reader), int64(maxRecvMsgSize))
if _, err := buf.ReadFrom(reader); err != nil {
if util.IsRequestBodyTooLarge(err) {
return exportReq, 0, httpgrpc.Error(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/indexheader/stream_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLo
gzipped := bytes.NewReader(sparseData)
gzipReader, err := gzip.NewReader(gzipped)
if err != nil {
return fmt.Errorf("failed to create sparse index-header reader: %w", err)
return fmt.Errorf("failed to create sparse index-header gzip reader: %w", err)
}
defer runutil.CloseWithLogOnErr(logger, gzipReader, "close sparse index-header gzip reader")

sparseData, err = io.ReadAll(gzipReader)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/runutil"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -226,11 +227,13 @@ func decompressRequest(buffers *RequestBuffers, reader io.Reader, expectedSize,
}

if compression == Gzip {
var err error
reader, err = gzip.NewReader(reader)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return nil, errors.Wrap(err, "create gzip reader")
}

defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader")
reader = gzReader
}

// Limit at maxSize+1 so we can tell when the size is exceeded
Expand Down

0 comments on commit b121937

Please sign in to comment.