Skip to content

Commit

Permalink
Close gzip readers after use
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Jan 1, 2025
1 parent fc8af05 commit 35f63a4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
3 changes: 3 additions & 0 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestOTLPHttpClient_WriteSeries(t *testing.T) {
// Handle compression
reader, err := gzip.NewReader(request.Body)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, reader.Close())
})

// Then Unmarshal
body, err := io.ReadAll(reader)
Expand Down
14 changes: 8 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/runutil"
"github.com/grafana/dskit/tenant"
"github.com/pierrec/lz4/v4"
"github.com/pkg/errors"
Expand Down Expand Up @@ -84,10 +85,10 @@ func OTLPHandler(
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\", \"lz4\", or no compression supported", contentEncoding)
}

var decoderFunc func(io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error)
var decoderFunc func(io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
unmarshaler := otlpProtoUnmarshaler{
request: &exportReq,
Expand All @@ -104,7 +105,7 @@ func OTLPHandler(
}

case jsonContentType:
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
decoderFunc = func(reader io.Reader) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
sz := int(r.ContentLength)
if sz > 0 {
Expand All @@ -114,16 +115,17 @@ func OTLPHandler(
buf := buffers.Get(sz)
switch compression {
case util.Gzip:
var err error
reader, err = gzip.NewReader(reader)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return exportReq, 0, errors.Wrap(err, "create gzip reader")
}
defer runutil.CloseWithLogOnErr(logger, gzReader, "close gzip reader")
reader = gzReader
case util.Lz4:
reader = io.NopCloser(lz4.NewReader(reader))
}

reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize))
reader = http.MaxBytesReader(nil, io.NopCloser(reader), int64(maxRecvMsgSize))
if _, err := buf.ReadFrom(reader); err != nil {
if util.IsRequestBodyTooLarge(err) {
return exportReq, 0, httpgrpc.Error(http.StatusRequestEntityTooLarge, distributorMaxOTLPRequestSizeErr{
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/indexheader/stream_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLo
gzipped := bytes.NewReader(sparseData)
gzipReader, err := gzip.NewReader(gzipped)
if err != nil {
return fmt.Errorf("failed to create sparse index-header reader: %w", err)
return fmt.Errorf("failed to create sparse index-header gzip reader: %w", err)
}
defer runutil.CloseWithLogOnErr(logger, gzipReader, "close sparse index-header gzip reader")

sparseData, err = io.ReadAll(gzipReader)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,21 @@ func decompressRequest(buffers *RequestBuffers, reader io.Reader, expectedSize,
return decompressSnappyFromBuffer(buffers, buf, maxSize, sp)
}
case Gzip:
var err error
reader, err = gzip.NewReader(reader)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return nil, errors.Wrap(err, "create gzip reader")
}

defer func() {
_ = gzReader.Close()
}()
reader = gzReader
case Lz4:
reader = lz4.NewReader(reader)
default:
return nil, fmt.Errorf("unrecognized compression type %v", compression)
}

if sp != nil {
sp.LogFields(otlog.Event("util.ParseProtoReader[decompress]"), otlog.Int("expectedSize", expectedSize))
}
Expand Down

0 comments on commit 35f63a4

Please sign in to comment.