Skip to content

Commit

Permalink
fix: http/2 stream leaks (#5775)
Browse files Browse the repository at this point in the history
* fix: http/2 stream leaks

* fix: kubepug integration test

* fix: unpack tarball close in http handler
  • Loading branch information
ed382 authored Feb 25, 2025
1 parent c4cff26 commit 3b5001e
Show file tree
Hide file tree
Showing 21 changed files with 63 additions and 21 deletions.
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func NewInitCmdDemo() *cobra.Command {
if export {
valuesResp, err := http.Get(demoValuesUrl)
ui.ExitOnError("cannot fetch values", err)
defer valuesResp.Body.Close()
valuesBytes, err := io.ReadAll(valuesResp.Body)
ui.ExitOnError("cannot fetch values", err)
values := string(valuesBytes)
Expand Down
1 change: 1 addition & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (DebugTransport) RoundTrip(r *http.Request) (*http.Response, error) {
func proxyPass(res http.ResponseWriter, req *http.Request) {
fmt.Printf("\n-------------\n")
body, _ := io.ReadAll(req.Body)
req.Body.Close()
fmt.Printf("%s\n", body)

prefix := fmt.Sprintf("/api/v1/namespaces/%s/services/testkube-api-server:%d/proxy", *namespace, *apiPort)
Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/artifacts/cloud_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (d *cloudUploader) putObject(url string, path string, file io.Reader, size
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
b, _ := io.ReadAll(res.Body)
return errors.Errorf("failed saving file: status code: %d / message: %s", res.StatusCode, string(b))
Expand Down
8 changes: 7 additions & 1 deletion cmd/testworkflow-toolkit/commands/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewTarballCmd() *cobra.Command {
os.Exit(0)
}

for _, pair := range pairs {
processPair := func(pair string) {
dirPath, url, found := strings.Cut(pair, "=")
if !found {
ui.Fail(fmt.Errorf("invalid tarball pair: %s", pair))
Expand All @@ -44,11 +44,13 @@ func NewTarballCmd() *cobra.Command {
if err == nil {
// Process the files
err = common.UnpackTarball(dirPath, resp.Body)
resp.Body.Close()
if err == nil {
break
}
fmt.Printf("failed to unpack the tarball: %s\n", err.Error())
} else {
resp.Body.Close()
fmt.Printf("failed to download the tarball: %s\n", err.Error())
}

Expand All @@ -60,6 +62,10 @@ func NewTarballCmd() *cobra.Command {
fmt.Printf("retrying - attempt %d/%d.\n", attempt, TarballRetryMaxAttempts)
}
}

for _, pair := range pairs {
processPair(pair)
}
},
}

Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/commands/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewTransferCmd() *cobra.Command {
}()
resp, err := http.Post(url, "application/tar+gzip", reader)
ui.ExitOnError("send the tarball request", err)
_ = resp.Body.Close()

if resp.StatusCode != http.StatusNoContent {
ui.Fail(fmt.Errorf("failed to send the tarball: status code %d", resp.StatusCode))
Expand Down
4 changes: 1 addition & 3 deletions cmd/testworkflow-toolkit/common/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func WriteTarball(stream io.Writer, dirPath string, files []string) error {
return err
}

func UnpackTarball(dirPath string, stream io.ReadCloser) error {
defer stream.Close()

func UnpackTarball(dirPath string, stream io.Reader) error {
// Process the files
uncompressedStream, err := gzip.NewReader(stream)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/transfer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (t *server) handler() http.Handler {
}

err := common.UnpackTarball(dirPath, request.Body)
defer request.Body.Close()
if err != nil {
fmt.Printf("Warning: '%s' error while unpacking tarball to: %s\n", dirPath, err.Error())
writer.WriteHeader(http.StatusInternalServerError)
Expand Down
22 changes: 22 additions & 0 deletions contrib/executor/kubepug/pkg/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/kubeshop/testkube/pkg/utils/test"
)

const validateAgainstKubernetesVersion string = "v1.28.3"

func TestRunString_Integration(t *testing.T) {
test.IntegrationTest(t)
t.Parallel()
Expand All @@ -40,6 +42,8 @@ func TestRunString_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -85,6 +89,8 @@ metadata:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -173,6 +179,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -208,6 +216,8 @@ func TestRunFileURI_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -244,6 +254,8 @@ func TestRunFileURI_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -287,6 +299,8 @@ func TestRunGitFile_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -323,6 +337,8 @@ func TestRunGitFile_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -370,6 +386,8 @@ func TestRunGitDirectory_Integration(t *testing.T) {
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -478,6 +496,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
}
Expand Down Expand Up @@ -566,6 +586,8 @@ spec:
execution.Command = []string{"kubepug"}
execution.Args = []string{
"--format=json",
"--k8s-version",
validateAgainstKubernetesVersion,
"--input-file",
"<runPath>",
"--k8s-version=v1.7.0", // last version apps/v1beta2/Deployment was valid
Expand Down
4 changes: 2 additions & 2 deletions internal/app/api/deprecatedv1/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (s *DeprecatedTestkubeAPI) GetArtifactHandler() fiber.Handler {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: db could not get execution result: %w", errPrefix, err))
}

var file io.Reader
var file io.ReadCloser
var bucket string
artifactsStorage := s.ArtifactsStorage
folder := execution.Id
Expand All @@ -452,8 +452,8 @@ func (s *DeprecatedTestkubeAPI) GetArtifactHandler() fiber.Handler {
if err != nil {
return s.Error(c, http.StatusInternalServerError, fmt.Errorf("%s: could not download file: %w", errPrefix, err))
}
defer file.Close()

// SendStream promises to close file using io.Close() method
return c.SendStream(file)
}
}
Expand Down
6 changes: 4 additions & 2 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,14 @@ func (s *TestkubeAPI) GetTestWorkflowExecutionLogsHandler() fiber.Handler {
return s.ClientError(c, "get execution", err)
}

reader, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name)
rc, err := s.TestWorkflowOutput.ReadLog(ctx, executionID, execution.Workflow.Name)
if err != nil {
return s.InternalError(c, "can't get log", executionID, err)
}
defer rc.Close()

c.Context().SetContentType(mediaTypePlainText)
_, err = io.Copy(c.Response().BodyWriter(), reader)
_, err = io.Copy(c.Response().BodyWriter(), rc)
return err
}
}
Expand Down Expand Up @@ -589,6 +590,7 @@ func (s *TestkubeAPI) GetTestWorkflowArtifactHandler() fiber.Handler {
if err != nil {
return s.InternalError(c, errPrefix, "could not download file", err)
}
defer file.Close()

return c.SendStream(file)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (c CopyFileDirectClient) UploadFile(parentName string, parentType TestingTy
if err != nil {
return err
}
defer resp.Body.Close()

if err = httpResponseError(resp); err != nil {
return fmt.Errorf("api %s returned error: %w", uri, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloud/client/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (c RESTClient[I, O]) List() ([]O, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
d, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -96,6 +97,7 @@ func (c RESTClient[I, O]) Get(id string) (e O, err error) {
if err != nil {
return e, err
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
d, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -131,6 +133,7 @@ func (c RESTClient[I, O]) Create(entity I, overridePath ...string) (e O, err err
if err != nil {
return e, err
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
d, err := io.ReadAll(resp.Body)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/data/artifact/artifacts_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *CloudArtifactsStorage) ListFiles(ctx context.Context, executionID, test
return commandResponse.Artifacts, nil
}

func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.Reader, error) {
func (c *CloudArtifactsStorage) DownloadFile(ctx context.Context, file, executionID, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) {
req := DownloadFileRequest{
File: file,
ExecutionID: executionID,
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *CloudArtifactsStorage) DownloadArchive(ctx context.Context, executionID
return data, nil
}

func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.Reader, error) {
func (c *CloudArtifactsStorage) getObject(ctx context.Context, url string) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/data/artifact/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (u *CloudUploader) putObject(ctx context.Context, url string, object *scrap
if err != nil {
return errors.Wrap(err, "failed to send file to cloud")
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
return errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", rsp.StatusCode)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/cloud/data/testworkflow/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func (r *CloudOutputRepository) SaveLog(ctx context.Context, id, workflowName st
if err != nil {
return errors.Wrap(err, "failed to save file in cloud storage")
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return errors.Errorf("error saving file with presigned url: expected 200 OK response code, got %d", res.StatusCode)
}
return nil
}

// ReadLog streams the output from Cloud
func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) {
// ReadLog streams the output from Cloud.
// The caller is responsible for closing the stream.
func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) {
url, err := r.PresignReadLog(ctx, id, workflowName)
if err != nil {
return nil, err
Expand All @@ -100,8 +102,10 @@ func (r *CloudOutputRepository) ReadLog(ctx context.Context, id, workflowName st
return nil, errors.Wrap(err, "failed to get file from cloud storage")
}
if res.StatusCode != http.StatusOK {
_ = res.Body.Close()
return nil, errors.Errorf("error getting file from presigned url: expected 200 OK response code, got %d", res.StatusCode)
}
// Caller must close this stream.
return res.Body, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/testworkflow/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type OutputRepository interface {
// SaveLog streams the output from the workflow to Minio
SaveLog(ctx context.Context, id, workflowName string, reader io.Reader) error
// ReadLog streams the output from Minio
ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error)
ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error)
// HasLog checks if there is an output in Minio
HasLog(ctx context.Context, id, workflowName string) (bool, error)

Expand Down
4 changes: 2 additions & 2 deletions pkg/repository/testworkflow/minio_output_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (m *MinioRepository) SaveLog(ctx context.Context, id, workflowName string,
return m.storage.UploadFileToBucket(ctx, m.bucket, bucketFolder, id, buffer, int64(buffer.Len()))
}

func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.Reader, error) {
func (m *MinioRepository) ReadLog(ctx context.Context, id, workflowName string) (io.ReadCloser, error) {
file, _, err := m.storage.DownloadFileFromBucket(ctx, m.bucket, bucketFolder, id)
if err != nil {
return nil, err
}
return file, nil
return io.NopCloser(file), nil
}

func (m *MinioRepository) HasLog(ctx context.Context, id, workflowName string) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/repository/testworkflow/mock_output_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/storage/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ArtifactsStorage interface {
// ListFiles lists available files in the configured bucket
ListFiles(ctx context.Context, executionId, testName, testSuiteName, testWorkflowName string) ([]testkube.Artifact, error)
// DownloadFile downloads file from configured
DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error)
DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error)
// DownloadArchive downloads archive from configured
DownloadArchive(ctx context.Context, executionId string, testName, testSuiteName, testWorkflowName string, masks []string) (io.Reader, error)
// UploadFile uploads file to configured bucket
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/artifacts_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/storage/minio/artifacts_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *ArtifactClient) ListFiles(ctx context.Context, executionId, testName, t
}

// DownloadFile downloads file from bucket from the config
func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.Reader, error) {
func (c *ArtifactClient) DownloadFile(ctx context.Context, file, executionId, testName, testSuiteName, testWorkflowName string) (io.ReadCloser, error) {
return c.client.DownloadFile(ctx, executionId, file)
}

Expand Down

0 comments on commit 3b5001e

Please sign in to comment.