diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index da9cf1ab..8ae93bea 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -53,12 +53,12 @@ func (p *Pipeline) uploadDebugFiles() { } } -func (p *Pipeline) uploadDotFile(u *uploader.Uploader) { +func (p *Pipeline) uploadDotFile(u uploader.Uploader) { dot := p.GetGstPipelineDebugDot() p.uploadDebugFile(u, []byte(dot), ".dot") } -func (p *Pipeline) uploadPProf(u *uploader.Uploader) { +func (p *Pipeline) uploadPProf(u uploader.Uploader) { b, err := pprof.GetProfileData(context.Background(), "heap", 0, 0) if err != nil { logger.Errorw("failed to get profile data", err) @@ -67,7 +67,7 @@ func (p *Pipeline) uploadPProf(u *uploader.Uploader) { p.uploadDebugFile(u, b, ".prof") } -func (p *Pipeline) uploadDebugFile(u *uploader.Uploader, data []byte, fileExtension string) { +func (p *Pipeline) uploadDebugFile(u uploader.Uploader, data []byte, fileExtension string) { filename := fmt.Sprintf("%s%s", p.Info.EgressId, fileExtension) filepath := path.Join(p.LocalOutputDirectory, filename) @@ -84,7 +84,7 @@ func (p *Pipeline) uploadDebugFile(u *uploader.Uploader, data []byte, fileExtens return } - _, _, err = u.Upload(filepath, filename, types.OutputTypeBlob) + _, _, err = u.Upload(filepath, filename, types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload dotfile", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index d03ec196..fbc48ba5 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -11,13 +11,13 @@ import ( ) type FileSink struct { - *uploader.Uploader + uploader.Uploader conf *config.PipelineConfig *config.FileConfig } -func newFileSink(u *uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { +func newFileSink(u uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { return &FileSink{ Uploader: u, conf: conf, @@ -30,7 +30,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Finalize() error { - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType) + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err } diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 0146d063..aa50e12b 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -25,7 +25,7 @@ type Manifest struct { SegmentCount int64 `json:"segment_count,omitempty"` } -func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepath, storageFilepath string) error { +func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath, storageFilepath string) error { manifest, err := os.Create(localFilepath) if err != nil { return err @@ -41,7 +41,7 @@ func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepat return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON) + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) return err } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 098107fd..267a5f01 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -21,7 +21,7 @@ import ( const maxPendingUploads = 100 type SegmentSink struct { - *uploader.Uploader + uploader.Uploader conf *config.PipelineConfig *config.SegmentConfig @@ -44,7 +44,7 @@ type SegmentUpdate struct { filename string } -func newSegmentSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig) (*SegmentSink, error) { +func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) playlist, err := m3u8.NewPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { @@ -79,7 +79,7 @@ func (s *SegmentSink) Start() error { segmentLocalPath := path.Join(s.LocalDir, update.filename) segmentStoragePath := path.Join(s.StorageDir, update.filename) - _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType()) + _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true) if err != nil { return } @@ -94,7 +94,7 @@ func (s *SegmentSink) Start() error { playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) if err != nil { return } @@ -202,7 +202,7 @@ func (s *SegmentSink) Finalize() error { // upload the finalized playlist playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType) + s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) if !s.DisableManifest { manifestLocalPath := fmt.Sprintf("%s.json", playlistLocalPath) diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 210f9ea0..930d23d2 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -11,13 +11,19 @@ import ( ) type AliOSSUploader struct { + *baseUploader + conf *livekit.AliOSSUpload } -func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) { - return &AliOSSUploader{ +func newAliOSSUploader(conf *livekit.AliOSSUpload, backup string) (Uploader, error) { + u := &AliOSSUploader{ conf: conf, - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 41ac260e..afcc77f6 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -13,15 +13,21 @@ import ( ) type AzureUploader struct { + *baseUploader + conf *livekit.AzureBlobUpload container string } -func newAzureUploader(conf *livekit.AzureBlobUpload) (uploader, error) { - return &AzureUploader{ +func newAzureUploader(conf *livekit.AzureBlobUpload, backup string) (Uploader, error) { + u := &AzureUploader{ conf: conf, container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName), - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 781c5f48..8003976a 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -17,13 +17,19 @@ import ( ) type GCPUploader struct { + *baseUploader + conf *livekit.GCPUpload } -func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { - return &GCPUploader{ +func newGCPUploader(conf *livekit.GCPUpload, backup string) (Uploader, error) { + u := &GCPUploader{ conf: conf, - }, nil + } + + u.baseUploader = newBaseUploader(backup, u.upload) + + return u, nil } func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 8de64e73..50665c5d 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -21,13 +21,15 @@ const ( ) type S3Uploader struct { + *baseUploader + awsConfig *aws.Config bucket *string metadata map[string]*string tagging *string } -func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { +func newS3Uploader(conf *livekit.S3Upload, backup string) (Uploader, error) { awsConfig := &aws.Config{ MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well. S3ForcePathStyle: aws.Bool(conf.ForcePathStyle), @@ -47,6 +49,8 @@ func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { bucket: aws.String(conf.Bucket), } + u.baseUploader = newBaseUploader(backup, u.upload) + if u.awsConfig.Region == nil { region, err := u.getBucketLocation() if err != nil { diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index b35e5acc..8a72362d 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -19,45 +19,52 @@ const ( maxDelay = time.Second * 5 ) -type Uploader struct { - uploader - backup string -} - -type uploader interface { - upload(string, string, types.OutputType) (string, int64, error) +type Uploader interface { + Upload(string, string, types.OutputType, bool) (string, int64, error) } -func New(conf config.UploadConfig, backup string) (*Uploader, error) { - u := &Uploader{ - backup: backup, - } +func New(conf config.UploadConfig, backup string) (Uploader, error) { + var u Uploader - var i uploader var err error switch c := conf.(type) { case *livekit.S3Upload: - i, err = newS3Uploader(c) + u, err = newS3Uploader(c, backup) case *livekit.GCPUpload: - i, err = newGCPUploader(c) + u, err = newGCPUploader(c, backup) case *livekit.AzureBlobUpload: - i, err = newAzureUploader(c) + u, err = newAzureUploader(c, backup) case *livekit.AliOSSUpload: - i, err = newAliOSSUploader(c) + u, err = newAliOSSUploader(c, backup) default: - i = &noOpUploader{} + u = &noOpUploader{} } if err != nil { return nil, err } - u.uploader = i return u, nil } -func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { +type baseUploader struct { + backup string + upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) +} + +func newBaseUploader(backup string, upload func(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error)) *baseUploader { + return &baseUploader{ + backup: backup, + upload: upload, + } +} + +func (u *baseUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { location, size, err := u.upload(localFilepath, storageFilepath, outputType) if err == nil { + if deleteAfterUpload { + os.Remove(localFilepath) + } + return location, size, nil } @@ -80,7 +87,7 @@ func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType type type noOpUploader struct{} -func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (string, int64, error) { +func (u *noOpUploader) Upload(localFilepath, _ string, _ types.OutputType, deleteAfterUpload bool) (string, int64, error) { stat, err := os.Stat(localFilepath) if err != nil { return "", 0, err @@ -89,6 +96,10 @@ func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (stri return localFilepath, stat.Size(), nil } +func (u *noOpUploader) cleanupFile(localFilepath string) error { + return nil +} + func wrap(name string, err error) error { return errors.Wrap(err, fmt.Sprintf("%s upload failed", name)) }