Skip to content

Commit

Permalink
[Performance Improvement] Pool gzip.Writer and bytes.Buffer (#19)
Browse files Browse the repository at this point in the history
* add benchmark for request body compression

* use sync.Pool to pool gzip.Writer and bytes.Buffer for better performance

* propargate error from pool.Get

* separate gzip and call compress from transport

* add header to gzip.go

* config: add comment for PoolCompressor boolean

---------

Co-authored-by: Laurent Saint-Félix <laurent.saintfelix@elastic.co>
  • Loading branch information
pakio and Anaethelion authored Mar 14, 2024
1 parent 903383c commit f7f6e87
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 12 deletions.
25 changes: 13 additions & 12 deletions elastictransport/elastictransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Config struct {

CompressRequestBody bool
CompressRequestBodyLevel int
// If PoolCompressor is true, a sync.Pool based gzip writer is used. Should be enabled with CompressRequestBody.
PoolCompressor bool

EnableMetrics bool
EnableDebugLogger bool
Expand Down Expand Up @@ -131,6 +133,7 @@ type Client struct {

compressRequestBody bool
compressRequestBodyLevel int
gzipCompressor gzipCompressor

instrumentation Instrumentation

Expand Down Expand Up @@ -269,6 +272,12 @@ func New(cfg Config) (*Client, error) {
client.compressRequestBodyLevel = gzip.DefaultCompression
}

if cfg.PoolCompressor {
client.gzipCompressor = newPooledGzipCompressor(client.compressRequestBodyLevel)
} else {
client.gzipCompressor = newSimpleGzipCompressor(client.compressRequestBodyLevel)
}

return &client, nil
}

Expand All @@ -292,22 +301,14 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) {

if req.Body != nil && req.Body != http.NoBody {
if c.compressRequestBody {
var buf bytes.Buffer
zw, err := gzip.NewWriterLevel(&buf, c.compressRequestBodyLevel)
buf, err := c.gzipCompressor.compress(req.Body)
if err != nil {
fmt.Errorf("failed setting up up compress request body (level %d): %s",
c.compressRequestBodyLevel, err)
}
if _, err = io.Copy(zw, req.Body); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err = zw.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
return nil, err
}
defer c.gzipCompressor.collectBuffer(buf)

req.GetBody = func() (io.ReadCloser, error) {
r := buf
return ioutil.NopCloser(&r), nil
return ioutil.NopCloser(buf), nil
}
req.Body, _ = req.GetBody()

Expand Down
37 changes: 37 additions & 0 deletions elastictransport/elastictransport_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,41 @@ func BenchmarkTransport(b *testing.B) {
}
}
})

b.Run("Compress body (pool: false)", func(b *testing.B) {
tp, _ := elastictransport.New(elastictransport.Config{
URLs: []*url.URL{{Scheme: "http", Host: "foo"}},
Transport: newFakeTransport(b),
CompressRequestBody: true,
})

for i := 0; i < b.N; i++ {
body := strings.NewReader(`{"query":{"match_all":{}}}`)

req, _ := http.NewRequest("GET", "/abc", body)
_, err := tp.Perform(req)
if err != nil {
b.Fatalf("Unexpected error: %s", err)
}
}
})

b.Run("Compress body (pool: true)", func(b *testing.B) {
tp, _ := elastictransport.New(elastictransport.Config{
URLs: []*url.URL{{Scheme: "http", Host: "foo"}},
Transport: newFakeTransport(b),
CompressRequestBody: true,
PoolCompressor: true,
})

for i := 0; i < b.N; i++ {
body := strings.NewReader(`{"query":{"match_all":{}}}`)

req, _ := http.NewRequest("GET", "/abc", body)
_, err := tp.Perform(req)
if err != nil {
b.Fatalf("Unexpected error: %s", err)
}
}
})
}
8 changes: 8 additions & 0 deletions elastictransport/elastictransport_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ func TestRequestCompression(t *testing.T) {
name string
compressionFlag bool
compressionLevel int
poolCompressor bool
inputBody string
}{
{
Expand All @@ -1031,6 +1032,12 @@ func TestRequestCompression(t *testing.T) {
compressionLevel: gzip.BestSpeed,
inputBody: "elasticsearch",
},
{
name: "CompressedDefault",
compressionFlag: true,
poolCompressor: true,
inputBody: "elasticsearch",
},
}

for _, test := range tests {
Expand All @@ -1039,6 +1046,7 @@ func TestRequestCompression(t *testing.T) {
URLs: []*url.URL{{}},
CompressRequestBody: test.compressionFlag,
CompressRequestBodyLevel: test.compressionLevel,
PoolCompressor: test.poolCompressor,
Transport: &mockTransp{
RoundTripFunc: func(req *http.Request) (*http.Response, error) {
if req.Body == nil || req.Body == http.NoBody {
Expand Down
127 changes: 127 additions & 0 deletions elastictransport/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elastictransport

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"sync"
)

type gzipCompressor interface {
// compress compresses the given io.ReadCloser and returns the gzip compressed data as a bytes.Buffer.
compress(io.ReadCloser) (*bytes.Buffer, error)
// collectBuffer collects the given bytes.Buffer for reuse.
collectBuffer(*bytes.Buffer)
}

// simpleGzipCompressor is a simple implementation of gzipCompressor that creates a new gzip.Writer for each call.
type simpleGzipCompressor struct {
compressionLevel int
}

func newSimpleGzipCompressor(compressionLevel int) gzipCompressor {
return &simpleGzipCompressor{
compressionLevel: compressionLevel,
}
}

func (sg *simpleGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) {
var buf bytes.Buffer
zw, err := gzip.NewWriterLevel(&buf, sg.compressionLevel)
if err != nil {
return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s",
sg.compressionLevel, err)
}

if _, err = io.Copy(zw, rc); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err := zw.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
}
return &buf, nil
}

func (sg *simpleGzipCompressor) collectBuffer(buf *bytes.Buffer) {
// no-op
}

type pooledGzipCompressor struct {
gzipWriterPool *sync.Pool
bufferPool *sync.Pool
compressionLevel int
}

type gzipWriter struct {
writer *gzip.Writer
err error
}

// newPooledGzipCompressor returns a new pooledGzipCompressor that uses a sync.Pool to reuse gzip.Writers.
func newPooledGzipCompressor(compressionLevel int) gzipCompressor {
gzipWriterPool := sync.Pool{
New: func() any {
writer, err := gzip.NewWriterLevel(io.Discard, compressionLevel)
return &gzipWriter{
writer: writer,
err: err,
}
},
}

bufferPool := sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

return &pooledGzipCompressor{
gzipWriterPool: &gzipWriterPool,
bufferPool: &bufferPool,
compressionLevel: compressionLevel,
}
}

func (pg *pooledGzipCompressor) compress(rc io.ReadCloser) (*bytes.Buffer, error) {
writer := pg.gzipWriterPool.Get().(*gzipWriter)
defer pg.gzipWriterPool.Put(writer)

if writer.err != nil {
return nil, fmt.Errorf("failed setting up up compress request body (level %d): %s",
pg.compressionLevel, writer.err)
}

buf := pg.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
writer.writer.Reset(buf)

if _, err := io.Copy(writer.writer, rc); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err := writer.writer.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
}
return buf, nil
}

func (pg *pooledGzipCompressor) collectBuffer(buf *bytes.Buffer) {
pg.bufferPool.Put(buf)
}

0 comments on commit f7f6e87

Please sign in to comment.