Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben committed Jun 29, 2023
1 parent 014f21c commit da06d99
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (s *SegmentSink) Start() error {
return
}

s.CleanupFile(segmentLocalPath)

playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType)
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/uploader/alioss.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

type AliOSSUploader struct {
baseUploader

conf *livekit.AliOSSUpload
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
)

type AzureUploader struct {
baseUploader

conf *livekit.AzureBlobUpload
container string
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
)

type GCPUploader struct {
baseUploader

conf *livekit.GCPUpload
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
)

type S3Uploader struct {
baseUploader

awsConfig *aws.Config
bucket *string
metadata map[string]*string
Expand Down
20 changes: 20 additions & 0 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Uploader struct {

type uploader interface {
upload(string, string, types.OutputType) (string, int64, error)
cleanupFile(localFilepath string) error
}

func New(conf config.UploadConfig, backup string) (*Uploader, error) {
Expand Down Expand Up @@ -78,6 +79,14 @@ func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType type
return "", 0, err
}

func (u *Uploader) CleanupFile(localFilepath string) error {
if u.backup != "" {
return nil
}

return u.cleanupFile(localFilepath)
}

type noOpUploader struct{}

func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (string, int64, error) {
Expand All @@ -89,6 +98,17 @@ func (u *noOpUploader) upload(localFilepath, _ string, _ types.OutputType) (stri
return localFilepath, stat.Size(), nil
}

func (u *noOpUploader) cleanupFile(localFilepath string) error {
return nil
}

func wrap(name string, err error) error {
return errors.Wrap(err, fmt.Sprintf("%s upload failed", name))
}

type baseUploader struct {
}

func (u *baseUploader) cleanupFile(localFilepath string) error {
return os.Remove(localFilepath)
}

0 comments on commit da06d99

Please sign in to comment.