From 9e31fd335d4d0f1e559015a8d705eb5353293db1 Mon Sep 17 00:00:00 2001 From: Rob Ellison Date: Wed, 4 Dec 2024 15:36:59 +0000 Subject: [PATCH] Improvements to Connection Pooling in 2.3.0 --- pkg/target/http.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/pkg/target/http.go b/pkg/target/http.go index 30c293e8..baf541b8 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -11,6 +11,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "net/url" "time" @@ -214,17 +215,29 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu failed = append(failed, msg) continue } - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - sent = append(sent, msg) - if msg.AckFunc != nil { // Ack successful messages - msg.AckFunc() + + func() { + defer resp.Body.Close() // Ensure the body is always closed + + // Drain and discard the response body + _, err := io.Copy(io.Discard, resp.Body) + if err != nil { + errResult = multierror.Append(errResult, fmt.Errorf("Error discarding response body: %v", err)) + failed = append(failed, msg) + return } - } else { - errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status)) - failed = append(failed, msg) - continue - } + + if resp.StatusCode == http.StatusOK { + sent = append(sent, msg) + if msg.AckFunc != nil { // Ack successful messages + msg.AckFunc() + } + } else { + errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status)) + failed = append(failed, msg) + return + } + }() } if errResult != nil { errResult = errors.Wrap(errResult, "Error sending http requests")