From da06d99764123e70680603375d028c8f9bd22dd5 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 29 Jun 2023 11:12:38 -0700 Subject: [PATCH 1/3] WiP --- pkg/pipeline/sink/segments.go | 2 ++ pkg/pipeline/sink/uploader/alioss.go | 2 ++ pkg/pipeline/sink/uploader/azure.go | 2 ++ pkg/pipeline/sink/uploader/gcp.go | 2 ++ pkg/pipeline/sink/uploader/s3.go | 2 ++ pkg/pipeline/sink/uploader/uploader.go | 20 ++++++++++++++++++++ 6 files changed, 30 insertions(+) diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 098107fd..cd1a4980 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -92,6 +92,8 @@ func (s *SegmentSink) Start() error { return } + s.CleanupFile(segmentLocalPath) + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType) diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 210f9ea0..7c7eb7f6 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -11,6 +11,8 @@ import ( ) type AliOSSUploader struct { + baseUploader + conf *livekit.AliOSSUpload } diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 41ac260e..3a93e88e 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -13,6 +13,8 @@ import ( ) type AzureUploader struct { + baseUploader + conf *livekit.AzureBlobUpload container string } diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 781c5f48..8fb958f5 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -17,6 +17,8 @@ import ( ) type GCPUploader struct { + baseUploader + conf *livekit.GCPUpload } diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 8de64e73..0499a2a9 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -21,6 +21,8 @@ const ( ) type S3Uploader struct { + baseUploader + awsConfig *aws.Config bucket *string metadata map[string]*string diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index b35e5acc..2f916760 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -26,6 +26,7 @@ type Uploader struct { type uploader interface { upload(string, string, types.OutputType) (string, int64, error) + cleanupFile(localFilepath string) error } func New(conf config.UploadConfig, backup string) (*Uploader, error) { @@ -78,6 +79,14 @@ func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType type return "", 0, err } +func (u *Uploader) CleanupFile(localFilepath string) error { + if u.backup != "" { + return nil + } + + return u.cleanupFile(localFilepath) +} + type noOpUploader struct{} func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (string, int64, error) { @@ -89,6 +98,17 @@ func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (stri return localFilepath, stat.Size(), nil } +func (u *noOpUploader) cleanupFile(localFilepath string) error { + return nil +} + func wrap(name string, err error) error { return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) } + +type baseUploader struct { +} + +func (u *baseUploader) cleanupFile(localFilepath string) error { + return os.Remove(localFilepath) +} From c1e92d86236caca5ac1e7e6ab058115a104f57b9 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 29 Jun 2023 16:58:59 -0700 Subject: [PATCH 2/3] Delete in Upload --- pkg/pipeline/debug.go | 2 +- pkg/pipeline/sink/file.go | 2 +- pkg/pipeline/sink/manifest.go | 2 +- pkg/pipeline/sink/segments.go | 8 +++----- pkg/pipeline/sink/uploader/uploader.go | 22 +++++++++------------- 5 files changed, 15 insertions(+), 21 deletions(-) diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index da9cf1ab..abcb39b5 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -84,7 +84,7 @@ func (p *Pipeline) uploadDebugFile(u *uploader.Uploader, data []byte, fileExtens return } - _, _, err = u.Upload(filepath, filename, types.OutputTypeBlob) + _, _, err = u.Upload(filepath, filename, types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload dotfile", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index d03ec196..42057c14 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -30,7 +30,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Finalize() error { - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType) + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err } diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 0146d063..90ed3114 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -41,7 +41,7 @@ func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepat return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON) + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) return err } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index cd1a4980..334bad18 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -79,7 +79,7 @@ func (s *SegmentSink) Start() error { segmentLocalPath := path.Join(s.LocalDir, update.filename) segmentStoragePath := path.Join(s.StorageDir, update.filename) - _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType()) + _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true) if err != nil { return } @@ -92,11 +92,9 @@ func (s *SegmentSink) Start() error { return } - s.CleanupFile(segmentLocalPath) - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) if err != nil { return } @@ -204,7 +202,7 @@ func (s *SegmentSink) Finalize() error { // upload the finalized playlist playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType) + s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) if !s.DisableManifest { manifestLocalPath := fmt.Sprintf("%s.json", playlistLocalPath) diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 2f916760..ac892689 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -56,9 +56,13 @@ func New(conf config.UploadConfig, backup string) (*Uploader, error) { return u, nil } -func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { +func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { location, size, err := u.upload(localFilepath, storageFilepath, outputType) if err == nil { + if deleteAfterUpload { + u.cleanupFile(localFilepath) + } + return location, size, nil } @@ -79,14 +83,6 @@ func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType type return "", 0, err } -func (u *Uploader) CleanupFile(localFilepath string) error { - if u.backup != "" { - return nil - } - - return u.cleanupFile(localFilepath) -} - type noOpUploader struct{} func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (string, int64, error) { @@ -102,13 +98,13 @@ func (u *noOpUploader) cleanupFile(localFilepath string) error { return nil } -func wrap(name string, err error) error { - return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) -} - type baseUploader struct { } func (u *baseUploader) cleanupFile(localFilepath string) error { return os.Remove(localFilepath) } + +func wrap(name string, err error) error { + return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) +} From 009f32e9e05fdc8cf8ec6439306c410f0b0d9768 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 30 Jun 2023 13:44:17 -0700 Subject: [PATCH 3/3] Refactor uploader --- pkg/pipeline/debug.go | 6 +-- pkg/pipeline/sink/file.go | 4 +- pkg/pipeline/sink/manifest.go | 2 +- pkg/pipeline/sink/segments.go | 4 +- pkg/pipeline/sink/uploader/alioss.go | 12 ++++-- pkg/pipeline/sink/uploader/azure.go | 12 ++++-- pkg/pipeline/sink/uploader/gcp.go | 12 ++++-- pkg/pipeline/sink/uploader/s3.go | 6 ++- pkg/pipeline/sink/uploader/uploader.go | 53 ++++++++++++-------------- 9 files changed, 60 insertions(+), 51 deletions(-) diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index abcb39b5..8ae93bea 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -53,12 +53,12 @@ func (p *Pipeline) uploadDebugFiles() { } } -func (p *Pipeline) uploadDotFile(u *uploader.Uploader) { +func (p *Pipeline) uploadDotFile(u uploader.Uploader) { dot := p.GetGstPipelineDebugDot() p.uploadDebugFile(u, []byte(dot), ".dot") } -func (p *Pipeline) uploadPProf(u *uploader.Uploader) { +func (p *Pipeline) uploadPProf(u uploader.Uploader) { b, err := pprof.GetProfileData(context.Background(), "heap", 0, 0) if err != nil { logger.Errorw("failed to get profile data", err) @@ -67,7 +67,7 @@ func (p *Pipeline) uploadPProf(u *uploader.Uploader) { p.uploadDebugFile(u, b, ".prof") } -func (p *Pipeline) uploadDebugFile(u *uploader.Uploader, data []byte, fileExtension string) { +func (p *Pipeline) uploadDebugFile(u uploader.Uploader, data []byte, fileExtension string) { filename := fmt.Sprintf("%s%s", p.Info.EgressId, fileExtension) filepath := path.Join(p.LocalOutputDirectory, filename) diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 42057c14..fbc48ba5 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -11,13 +11,13 @@ import ( ) type FileSink struct { - *uploader.Uploader + uploader.Uploader conf *config.PipelineConfig *config.FileConfig } -func newFileSink(u *uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { +func newFileSink(u uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { return &FileSink{ Uploader: u, conf: conf, diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 90ed3114..aa50e12b 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -25,7 +25,7 @@ type Manifest struct { SegmentCount int64 `json:"segment_count,omitempty"` } -func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepath, storageFilepath string) error { +func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath, storageFilepath string) error { manifest, err := os.Create(localFilepath) if err != nil { return err diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 334bad18..267a5f01 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -21,7 +21,7 @@ import ( const maxPendingUploads = 100 type SegmentSink struct { - *uploader.Uploader + uploader.Uploader conf *config.PipelineConfig *config.SegmentConfig @@ -44,7 +44,7 @@ type SegmentUpdate struct { filename string } -func newSegmentSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig) (*SegmentSink, error) { +func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) playlist, err := m3u8.NewPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 7c7eb7f6..930d23d2 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -11,15 +11,19 @@ import ( ) type AliOSSUploader struct { - baseUploader + *baseUploader conf *livekit.AliOSSUpload } -func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) { - return &AliOSSUploader{ +func newAliOSSUploader(conf *livekit.AliOSSUpload, backup string) (Uploader, error) { + u := &AliOSSUploader{ conf: conf, - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 3a93e88e..afcc77f6 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -13,17 +13,21 @@ import ( ) type AzureUploader struct { - baseUploader + *baseUploader conf *livekit.AzureBlobUpload container string } -func newAzureUploader(conf *livekit.AzureBlobUpload) (uploader, error) { - return &AzureUploader{ +func newAzureUploader(conf *livekit.AzureBlobUpload, backup string) (Uploader, error) { + u := &AzureUploader{ conf: conf, container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName), - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 8fb958f5..8003976a 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -17,15 +17,19 @@ import ( ) type GCPUploader struct { - baseUploader + *baseUploader conf *livekit.GCPUpload } -func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { - return &GCPUploader{ +func newGCPUploader(conf *livekit.GCPUpload, backup string) (Uploader, error) { + u := &GCPUploader{ conf: conf, - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 0499a2a9..50665c5d 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -21,7 +21,7 @@ const ( ) type S3Uploader struct { - baseUploader + *baseUploader awsConfig *aws.Config bucket *string @@ -29,7 +29,7 @@ type S3Uploader struct { tagging *string } -func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { +func newS3Uploader(conf *livekit.S3Upload, backup string) (Uploader, error) { awsConfig := &aws.Config{ MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well. S3ForcePathStyle: aws.Bool(conf.ForcePathStyle), @@ -49,6 +49,8 @@ func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { bucket: aws.String(conf.Bucket), } + u.baseUploader = newBaseUploader(backup, u.upload) + if u.awsConfig.Region == nil { region, err := u.getBucketLocation() if err != nil { diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index ac892689..8a72362d 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -19,48 +19,50 @@ const ( maxDelay = time.Second * 5 ) -type Uploader struct { - uploader - backup string -} - -type uploader interface { - upload(string, string, types.OutputType) (string, int64, error) - cleanupFile(localFilepath string) error +type Uploader interface { + Upload(string, string, types.OutputType, bool) (string, int64, error) } -func New(conf config.UploadConfig, backup string) (*Uploader, error) { - u := &Uploader{ - backup: backup, - } +func New(conf config.UploadConfig, backup string) (Uploader, error) { + var u Uploader - var i uploader var err error switch c := conf.(type) { case *livekit.S3Upload: - i, err = newS3Uploader(c) + u, err = newS3Uploader(c, backup) case *livekit.GCPUpload: - i, err = newGCPUploader(c) + u, err = newGCPUploader(c, backup) case *livekit.AzureBlobUpload: - i, err = newAzureUploader(c) + u, err = newAzureUploader(c, backup) case *livekit.AliOSSUpload: - i, err = newAliOSSUploader(c) + u, err = newAliOSSUploader(c, backup) default: - i = &noOpUploader{} + u = &noOpUploader{} } if err != nil { return nil, err } - u.uploader = i return u, nil } -func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { +type baseUploader struct { + backup string + upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) +} + +func newBaseUploader(backup string, upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error)) *baseUploader { + return &baseUploader{ + backup: backup, + upload: upload, + } +} + +func (u *baseUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { location, size, err := u.upload(localFilepath, storageFilepath, outputType) if err == nil { if deleteAfterUpload { - u.cleanupFile(localFilepath) + os.Remove(localFilepath) } return location, size, nil @@ -85,7 +87,7 @@ func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType type type noOpUploader struct{} -func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (string, int64, error) { +func (u *noOpUploader) Upload(localFilepath, _ string, _ types.OutputType, deleteAfterUpload bool) (string, int64, error) { stat, err := os.Stat(localFilepath) if err != nil { return "", 0, err @@ -98,13 +100,6 @@ func (u *noOpUploader) cleanupFile(localFilepath string) error { return nil } -type baseUploader struct { -} - -func (u *baseUploader) cleanupFile(localFilepath string) error { - return os.Remove(localFilepath) -} - func wrap(name string, err error) error { return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) }