diff --git a/elastictransport/elastictransport.go b/elastictransport/elastictransport.go index 4934d29..d9b2f85 100644 --- a/elastictransport/elastictransport.go +++ b/elastictransport/elastictransport.go @@ -88,6 +88,8 @@ type Config struct { CompressRequestBody bool CompressRequestBodyLevel int + // If PoolCompressor is true, a sync.Pool based gzip writer is used. Should be enabled with CompressRequestBody. + PoolCompressor bool EnableMetrics bool EnableDebugLogger bool @@ -131,6 +133,7 @@ type Client struct { compressRequestBody bool compressRequestBodyLevel int + gzipCompressor gzipCompressor instrumentation Instrumentation @@ -269,6 +272,12 @@ func New(cfg Config) (*Client, error) { client.compressRequestBodyLevel = gzip.DefaultCompression } + if cfg.PoolCompressor { + client.gzipCompressor = newPooledGzipCompressor(client.compressRequestBodyLevel) + } else { + client.gzipCompressor = newSimpleGzipCompressor(client.compressRequestBodyLevel) + } + return &client, nil } @@ -292,22 +301,14 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { if req.Body != nil && req.Body != http.NoBody { if c.compressRequestBody { - var buf bytes.Buffer - zw, err := gzip.NewWriterLevel(&buf, c.compressRequestBodyLevel) + buf, err := c.gzipCompressor.compress(req.Body) if err != nil { - fmt.Errorf("failed setting up up compress request body (level %d): %s", - c.compressRequestBodyLevel, err) - } - if _, err = io.Copy(zw, req.Body); err != nil { - return nil, fmt.Errorf("failed to compress request body: %s", err) - } - if err = zw.Close(); err != nil { - return nil, fmt.Errorf("failed to compress request body (during close): %s", err) + return nil, err } + defer c.gzipCompressor.collectBuffer(buf) req.GetBody = func() (io.ReadCloser, error) { - r := buf - return ioutil.NopCloser(&r), nil + return ioutil.NopCloser(buf), nil } req.Body, _ = req.GetBody() diff --git a/elastictransport/elastictransport_benchmark_test.go b/elastictransport/elastictransport_benchmark_test.go index a5110fb..26e2f47 100644 --- a/elastictransport/elastictransport_benchmark_test.go +++ b/elastictransport/elastictransport_benchmark_test.go @@ -86,4 +86,41 @@ func BenchmarkTransport(b *testing.B) { } } }) + + b.Run("Compress body (pool: false)", func(b *testing.B) { + tp, _ := elastictransport.New(elastictransport.Config{ + URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, + Transport: newFakeTransport(b), + CompressRequestBody: true, + }) + + for i := 0; i < b.N; i++ { + body := strings.NewReader(`{"query":{"match_all":{}}}`) + + req, _ := http.NewRequest("GET", "/abc", body) + _, err := tp.Perform(req) + if err != nil { + b.Fatalf("Unexpected error: %s", err) + } + } + }) + + b.Run("Compress body (pool: true)", func(b *testing.B) { + tp, _ := elastictransport.New(elastictransport.Config{ + URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, + Transport: newFakeTransport(b), + CompressRequestBody: true, + PoolCompressor: true, + }) + + for i := 0; i < b.N; i++ { + body := strings.NewReader(`{"query":{"match_all":{}}}`) + + req, _ := http.NewRequest("GET", "/abc", body) + _, err := tp.Perform(req) + if err != nil { + b.Fatalf("Unexpected error: %s", err) + } + } + }) } diff --git a/elastictransport/elastictransport_internal_test.go b/elastictransport/elastictransport_internal_test.go index f669e2a..00efdce 100644 --- a/elastictransport/elastictransport_internal_test.go +++ b/elastictransport/elastictransport_internal_test.go @@ -1013,6 +1013,7 @@ func TestRequestCompression(t *testing.T) { name string compressionFlag bool compressionLevel int + poolCompressor bool inputBody string }{ { @@ -1031,6 +1032,12 @@ func TestRequestCompression(t *testing.T) { compressionLevel: gzip.BestSpeed, inputBody: "elasticsearch", }, + { + name: "CompressedDefault", + compressionFlag: true, + poolCompressor: true, + inputBody: "elasticsearch", + }, } for _, test := range tests { @@ -1039,6 +1046,7 @@ func TestRequestCompression(t *testing.T) { URLs: []*url.URL{{}}, CompressRequestBody: test.compressionFlag, CompressRequestBodyLevel: test.compressionLevel, + PoolCompressor: test.poolCompressor, Transport: &mockTransp{ RoundTripFunc: func(req *http.Request) (*http.Response, error) { if req.Body == nil || req.Body == http.NoBody { diff --git a/elastictransport/gzip.go b/elastictransport/gzip.go new file mode 100644 index 0000000..8433678 --- /dev/null +++ b/elastictransport/gzip.go @@ -0,0 +1,127 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elastictransport + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "sync" +) + +type gzipCompressor interface { + // compress compresses the given io.ReadCloser and returns the gzip compressed data as a bytes.Buffer. + compress(io.ReadCloser) (*bytes.Buffer, error) + // collectBuffer collects the given bytes.Buffer for reuse. + collectBuffer(*bytes.Buffer) +} + +// simpleGzipCompressor is a simple implementation of gzipCompressor that creates a new gzip.Writer for each call. +type simpleGzipCompressor struct { + compressionLevel int +} + +func newSimpleGzipCompressor(compressionLevel int) gzipCompressor { + return &simpleGzipCompressor{ + compressionLevel: compressionLevel, + } +} + +func (sg *simpleGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) { + var buf bytes.Buffer + zw, err := gzip.NewWriterLevel(&buf, sg.compressionLevel) + if err != nil { + return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s", + sg.compressionLevel, err) + } + + if _, err = io.Copy(zw, rc); err != nil { + return nil, fmt.Errorf("failed to compress request body: %s", err) + } + if err := zw.Close(); err != nil { + return nil, fmt.Errorf("failed to compress request body (during close): %s", err) + } + return &buf, nil +} + +func (sg *simpleGzipCompressor) collectBuffer(buf *bytes.Buffer) { + // no-op +} + +type pooledGzipCompressor struct { + gzipWriterPool *sync.Pool + bufferPool *sync.Pool + compressionLevel int +} + +type gzipWriter struct { + writer *gzip.Writer + err error +} + +// newPooledGzipCompressor returns a new pooledGzipCompressor that uses a sync.Pool to reuse gzip.Writers. +func newPooledGzipCompressor(compressionLevel int) gzipCompressor { + gzipWriterPool := sync.Pool{ + New: func() any { + writer, err := gzip.NewWriterLevel(io.Discard, compressionLevel) + return &gzipWriter{ + writer: writer, + err: err, + } + }, + } + + bufferPool := sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, + } + + return &pooledGzipCompressor{ + gzipWriterPool: &gzipWriterPool, + bufferPool: &bufferPool, + compressionLevel: compressionLevel, + } +} + +func (pg *pooledGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) { + writer := pg.gzipWriterPool.Get().(*gzipWriter) + defer pg.gzipWriterPool.Put(writer) + + if writer.err != nil { + return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s", + pg.compressionLevel, writer.err) + } + + buf := pg.bufferPool.Get().(*bytes.Buffer) + buf.Reset() + writer.writer.Reset(buf) + + if _, err := io.Copy(writer.writer, rc); err != nil { + return nil, fmt.Errorf("failed to compress request body: %s", err) + } + if err := writer.writer.Close(); err != nil { + return nil, fmt.Errorf("failed to compress request body (during close): %s", err) + } + return buf, nil +} + +func (pg *pooledGzipCompressor) collectBuffer(buf *bytes.Buffer) { + pg.bufferPool.Put(buf) +}