Skip to content

Commit

Permalink
Improvements to Connection Pooling in 2.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
robellison-je committed Dec 4, 2024
1 parent e4c848a commit 9e31fd3
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -214,17 +215,29 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
failed = append(failed, msg)
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
sent = append(sent, msg)
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()

func() {
defer resp.Body.Close() // Ensure the body is always closed

// Drain and discard the response body
_, err := io.Copy(io.Discard, resp.Body)
if err != nil {
errResult = multierror.Append(errResult, fmt.Errorf("Error discarding response body: %v", err))
failed = append(failed, msg)
return
}
} else {
errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status))
failed = append(failed, msg)
continue
}

if resp.StatusCode == http.StatusOK {
sent = append(sent, msg)
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()
}
} else {
errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status))
failed = append(failed, msg)
return
}
}()
}
if errResult != nil {
errResult = errors.Wrap(errResult, "Error sending http requests")
Expand Down

0 comments on commit 9e31fd3

Please sign in to comment.