Skip to content

Commit

Permalink
skip aws logging on successful upload
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Aug 26, 2024
1 parent 0533b5d commit 29c84e3
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"net/url"
"os"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
Expand All @@ -40,28 +41,57 @@ const (
getBucketLocationRegion = "us-east-1"
)

// CustomRetryer wraps the SDK's built in DefaultRetryer adding additional
// S3Retryer wraps the SDK's built in DefaultRetryer adding additional
// custom features. Namely, to always retry.
type CustomRetryer struct {
type S3Retryer struct {
client.DefaultRetryer
}

// ShouldRetry overrides the SDK's built in DefaultRetryer because the PUTs for segments/playlists are always idempotent
func (r CustomRetryer) ShouldRetry(_ *request.Request) bool {
func (r S3Retryer) ShouldRetry(_ *request.Request) bool {
return true
}

// S3Logger only logs aws messages on upload failure
type S3Logger struct {
mu sync.Mutex
msgs []string
}

func (l *S3Logger) Log(msg string) {
l.mu.Lock()
l.msgs = append(l.msgs, msg)
l.mu.Unlock()
}

func (l *S3Logger) Clear() {
l.mu.Lock()
l.msgs = nil
l.mu.Unlock()
}

func (l *S3Logger) PrintLogs() {
l.mu.Lock()
for _, msg := range l.msgs {
logger.Debugw(msg)
}
l.msgs = nil
l.mu.Unlock()
}

type S3Uploader struct {
awsConfig *aws.Config
logger *S3Logger
bucket *string
metadata map[string]*string
tagging *string
contentDisposition *string
}

func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
l := &S3Logger{}
awsConfig := &aws.Config{
Retryer: &CustomRetryer{
Retryer: &S3Retryer{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: conf.MaxRetries,
MaxRetryDelay: conf.MaxRetryDelay,
Expand All @@ -77,7 +107,7 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
for range len(args) {
msg += " %v"
}
logger.Debugw(fmt.Sprintf(msg, args...))
l.Log(fmt.Sprintf(msg, args...))
}),
}

Expand Down Expand Up @@ -108,7 +138,9 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
}

logger.Debugw("retrieved bucket location", "bucket", u.bucket, "location", region)

u.awsConfig.Region = aws.String(region)
u.logger.Clear()
}

if conf.Proxy != nil {
Expand Down Expand Up @@ -203,7 +235,10 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty
ContentDisposition: u.contentDisposition,
})
if err != nil {
u.logger.PrintLogs()
return "", 0, errors.ErrUploadFailed("S3", err)
} else {
u.logger.Clear()
}

endpoint := "s3.amazonaws.com"
Expand Down

0 comments on commit 29c84e3

Please sign in to comment.