Skip to content

Commit

Permalink
clean up upload logic (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Jul 4, 2023
1 parent bff9c6e commit 875eef1
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 57 deletions.
12 changes: 3 additions & 9 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,13 @@ import (
)

type AliOSSUploader struct {
*baseUploader

conf *livekit.AliOSSUpload
}

func newAliOSSUploader(conf *livekit.AliOSSUpload, backup string) (Uploader, error) {
u := &AliOSSUploader{
func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) {
return &AliOSSUploader{
conf: conf,
}

u.baseUploader = newBaseUploader(backup, u.upload)

return u, nil
}, nil
}

func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) {
Expand Down
12 changes: 3 additions & 9 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@ import (
)

type AzureUploader struct {
*baseUploader

conf *livekit.AzureBlobUpload
container string
}

func newAzureUploader(conf *livekit.AzureBlobUpload, backup string) (Uploader, error) {
u := &AzureUploader{
func newAzureUploader(conf *livekit.AzureBlobUpload) (uploader, error) {
return &AzureUploader{
conf: conf,
container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName),
}

u.baseUploader = newBaseUploader(backup, u.upload)

return u, nil
}, nil
}

func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) {
Expand Down
12 changes: 3 additions & 9 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@ import (
)

type GCPUploader struct {
*baseUploader

conf *livekit.GCPUpload
}

func newGCPUploader(conf *livekit.GCPUpload, backup string) (Uploader, error) {
u := &GCPUploader{
func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
return &GCPUploader{
conf: conf,
}

u.baseUploader = newBaseUploader(backup, u.upload)

return u, nil
}, nil
}

func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ const (
)

type S3Uploader struct {
*baseUploader

awsConfig *aws.Config
bucket *string
metadata map[string]*string
tagging *string
}

func newS3Uploader(conf *livekit.S3Upload, backup string) (Uploader, error) {
func newS3Uploader(conf *livekit.S3Upload) (uploader, error) {
awsConfig := &aws.Config{
MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well.
S3ForcePathStyle: aws.Bool(conf.ForcePathStyle),
Expand All @@ -49,8 +47,6 @@ func newS3Uploader(conf *livekit.S3Upload, backup string) (Uploader, error) {
bucket: aws.String(conf.Bucket),
}

u.baseUploader = newBaseUploader(backup, u.upload)

if u.awsConfig.Region == nil {
region, err := u.getBucketLocation()
if err != nil {
Expand Down
47 changes: 22 additions & 25 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,47 @@ type Uploader interface {
Upload(string, string, types.OutputType, bool) (string, int64, error)
}

func New(conf config.UploadConfig, backup string) (Uploader, error) {
var u Uploader
type uploader interface {
upload(string, string, types.OutputType) (string, int64, error)
}

func New(conf config.UploadConfig, backup string) (Uploader, error) {
var u uploader
var err error

switch c := conf.(type) {
case *livekit.S3Upload:
u, err = newS3Uploader(c, backup)
u, err = newS3Uploader(c)
case *livekit.GCPUpload:
u, err = newGCPUploader(c, backup)
u, err = newGCPUploader(c)
case *livekit.AzureBlobUpload:
u, err = newAzureUploader(c, backup)
u, err = newAzureUploader(c)
case *livekit.AliOSSUpload:
u, err = newAliOSSUploader(c, backup)
u, err = newAliOSSUploader(c)
default:
u = &noOpUploader{}
return &localUploader{}, nil
}
if err != nil {
return nil, err
}

return u, nil
return &remoteUploader{
uploader: u,
backup: backup,
}, nil
}

type baseUploader struct {
backup string
upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error)
}
type remoteUploader struct {
uploader

func newBaseUploader(backup string, upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error)) *baseUploader {
return &baseUploader{
backup: backup,
upload: upload,
}
backup string
}

func (u *baseUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) {
func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) {
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
if err == nil {
if deleteAfterUpload {
os.Remove(localFilepath)
_ = os.Remove(localFilepath)
}

return location, size, nil
Expand All @@ -85,9 +86,9 @@ func (u *baseUploader) Upload(localFilepath, storageFilepath string, outputType
return "", 0, err
}

type noOpUploader struct{}
type localUploader struct{}

func (u *noOpUploader) Upload(localFilepath, _ string, _ types.OutputType, deleteAfterUpload bool) (string, int64, error) {
func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool) (string, int64, error) {
stat, err := os.Stat(localFilepath)
if err != nil {
return "", 0, err
Expand All @@ -96,10 +97,6 @@ func (u *noOpUploader) Upload(localFilepath, _ string, _ types.OutputType, delet
return localFilepath, stat.Size(), nil
}

func (u *noOpUploader) cleanupFile(localFilepath string) error {
return nil
}

func wrap(name string, err error) error {
return errors.Wrap(err, fmt.Sprintf("%s upload failed", name))
}

0 comments on commit 875eef1

Please sign in to comment.