Skip to content

Commit d2e7e4a

Browse files
committed
change to single location field
1 parent e4fd0e0 commit d2e7e4a

File tree

16 files changed

+93
-107
lines changed

16 files changed

+93
-107
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ require (
2323
github.com/gorilla/websocket v1.5.3
2424
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
2525
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
26-
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292
26+
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f
2727
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
2828
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
2929
github.com/pion/rtp v1.8.9
@@ -137,7 +137,7 @@ require (
137137
go.opentelemetry.io/otel/metric v1.29.0 // indirect
138138
go.opentelemetry.io/otel/trace v1.29.0 // indirect
139139
go.uber.org/multierr v1.11.0 // indirect
140-
go.uber.org/zap/exp v0.2.0 // indirect
140+
go.uber.org/zap/exp v0.3.0 // indirect
141141
golang.org/x/crypto v0.28.0 // indirect
142142
golang.org/x/net v0.30.0 // indirect
143143
golang.org/x/sync v0.8.0 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
218218
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
219219
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
220220
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
221-
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4=
222-
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
221+
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw=
222+
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
223223
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
224224
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
225225
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
@@ -357,8 +357,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
357357
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
358358
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
359359
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
360-
go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=
361-
go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ=
360+
go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U=
361+
go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ=
362362
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
363363
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
364364
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

pkg/config/manifest.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,29 +42,25 @@ type Manifest struct {
4242
}
4343

4444
type File struct {
45-
Filename string `json:"filename,omitempty"`
46-
Location string `json:"location,omitempty"`
47-
PresignedUrl string `json:"presigned_url,omitempty"`
45+
Filename string `json:"filename,omitempty"`
46+
Location string `json:"location,omitempty"`
4847
}
4948

5049
type Playlist struct {
51-
mu sync.Mutex
52-
Location string `json:"location,omitempty"`
53-
PresignedUrl string `json:"presigned_url,omitempty"`
54-
Segments []*Segment `json:"segments,omitempty"`
50+
mu sync.Mutex
51+
Location string `json:"location,omitempty"`
52+
Segments []*Segment `json:"segments,omitempty"`
5553
}
5654

5755
type Segment struct {
58-
Filename string `json:"filename,omitempty"`
59-
Location string `json:"location,omitempty"`
60-
PresignedUrl string `json:"presigned_url,omitempty"`
56+
Filename string `json:"filename,omitempty"`
57+
Location string `json:"location,omitempty"`
6158
}
6259

6360
type Image struct {
64-
Filename string `json:"filename,omitempty"`
65-
Timestamp time.Time `json:"timestamp,omitempty"`
66-
Location string `json:"location,omitempty"`
67-
PresignedUrl string `json:"presigned_url,omitempty"`
61+
Filename string `json:"filename,omitempty"`
62+
Timestamp time.Time `json:"timestamp,omitempty"`
63+
Location string `json:"location,omitempty"`
6864
}
6965

7066
func (p *PipelineConfig) initManifest() {
@@ -103,12 +99,11 @@ func (p *PipelineConfig) shouldCreateManifest() bool {
10399
return false
104100
}
105101

106-
func (m *Manifest) AddFile(filename, location, presignedUrl string) {
102+
func (m *Manifest) AddFile(filename, location string) {
107103
m.mu.Lock()
108104
m.Files = append(m.Files, &File{
109-
Filename: filename,
110-
Location: location,
111-
PresignedUrl: presignedUrl,
105+
Filename: filename,
106+
Location: location,
112107
})
113108
m.mu.Unlock()
114109
}
@@ -123,30 +118,27 @@ func (m *Manifest) AddPlaylist() *Playlist {
123118
return p
124119
}
125120

126-
func (p *Playlist) UpdateLocation(location, presignedUrl string) {
121+
func (p *Playlist) UpdateLocation(location string) {
127122
p.mu.Lock()
128123
p.Location = location
129-
p.PresignedUrl = presignedUrl
130124
p.mu.Unlock()
131125
}
132126

133-
func (p *Playlist) AddSegment(filename, location, presignedUrl string) {
127+
func (p *Playlist) AddSegment(filename, location string) {
134128
p.mu.Lock()
135129
p.Segments = append(p.Segments, &Segment{
136-
Filename: filename,
137-
Location: location,
138-
PresignedUrl: presignedUrl,
130+
Filename: filename,
131+
Location: location,
139132
})
140133
p.mu.Unlock()
141134
}
142135

143-
func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUrl string) {
136+
func (m *Manifest) AddImage(filename string, ts time.Time, location string) {
144137
m.mu.Lock()
145138
m.Images = append(m.Images, &Image{
146-
Filename: filename,
147-
Timestamp: ts,
148-
Location: location,
149-
PresignedUrl: presignedUrl,
139+
Filename: filename,
140+
Timestamp: ts,
141+
Location: location,
150142
})
151143
m.mu.Unlock()
152144
}

pkg/pipeline/controller.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -653,15 +653,14 @@ func (c *Controller) uploadManifest() {
653653
infoUpdated := false
654654
for _, si := range c.sinks {
655655
for _, s := range si {
656-
location, presignedUrl, uploaded, err := s.UploadManifest(manifestPath)
656+
location, uploaded, err := s.UploadManifest(manifestPath)
657657
if err != nil {
658658
logger.Errorw("failed to upload manifest", err)
659659
continue
660660
}
661661

662662
if !infoUpdated && uploaded {
663663
c.Info.ManifestLocation = location
664-
c.Info.ManifestPresignedUrl = presignedUrl
665664
infoUpdated = true
666665
}
667666
}

pkg/pipeline/sink/file.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (s *FileSink) Start() error {
4242
}
4343

4444
func (s *FileSink) Close() error {
45-
location, size, presignedUrl, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
45+
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
4646
if err != nil {
4747
return err
4848
}
@@ -51,22 +51,22 @@ func (s *FileSink) Close() error {
5151
s.FileInfo.Size = size
5252

5353
if s.conf.Manifest != nil {
54-
s.conf.Manifest.AddFile(s.StorageFilepath, location, presignedUrl)
54+
s.conf.Manifest.AddFile(s.StorageFilepath, location)
5555
}
5656

5757
return nil
5858
}
5959

60-
func (s *FileSink) UploadManifest(filepath string) (string, string, bool, error) {
60+
func (s *FileSink) UploadManifest(filepath string) (string, bool, error) {
6161
if s.DisableManifest && !s.ManifestRequired() {
62-
return "", "", false, nil
62+
return "", false, nil
6363
}
6464

6565
storagePath := path.Join(path.Dir(s.StorageFilepath), path.Base(filepath))
66-
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
66+
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
6767
if err != nil {
68-
return "", "", false, err
68+
return "", false, err
6969
}
7070

71-
return location, presignedUrl, true, nil
71+
return location, true, nil
7272
}

pkg/pipeline/sink/image.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error {
106106

107107
imageStoragePath := path.Join(s.StorageDir, filename)
108108

109-
location, _, presignedUrl, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
109+
location, _, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true)
110110
if err != nil {
111111
return err
112112
}
113113

114114
if s.conf.Manifest != nil {
115-
s.conf.Manifest.AddImage(imageStoragePath, ts, location, presignedUrl)
115+
s.conf.Manifest.AddImage(imageStoragePath, ts, location)
116116
}
117117

118118
return nil

pkg/pipeline/sink/segments.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
154154
go func() {
155155
defer close(update.uploadComplete)
156156

157-
location, size, presignedUrl, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
157+
location, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
158158
if err != nil {
159159
s.callbacks.OnError(err)
160160
return
@@ -165,7 +165,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
165165
s.SegmentsInfo.SegmentCount++
166166
s.SegmentsInfo.Size += size
167167
if s.manifestPlaylist != nil {
168-
s.manifestPlaylist.AddSegment(segmentStoragePath, location, presignedUrl)
168+
s.manifestPlaylist.AddSegment(segmentStoragePath, location)
169169
}
170170
s.infoLock.Unlock()
171171
}()
@@ -222,12 +222,11 @@ func (s *SegmentSink) shouldUploadPlaylist() bool {
222222
func (s *SegmentSink) uploadPlaylist() error {
223223
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
224224
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
225-
playlistLocation, _, presignedUrl, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
225+
playlistLocation, _, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
226226
if err == nil {
227227
s.SegmentsInfo.PlaylistLocation = playlistLocation
228228
if s.manifestPlaylist != nil {
229229
s.manifestPlaylist.Location = playlistLocation
230-
s.manifestPlaylist.PresignedUrl = presignedUrl
231230
}
232231
}
233232
return err
@@ -236,7 +235,7 @@ func (s *SegmentSink) uploadPlaylist() error {
236235
func (s *SegmentSink) uploadLivePlaylist() error {
237236
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
238237
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
239-
livePlaylistLocation, _, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
238+
livePlaylistLocation, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
240239
if err == nil {
241240
s.SegmentsInfo.LivePlaylistLocation = livePlaylistLocation
242241
}
@@ -322,16 +321,16 @@ func (s *SegmentSink) Close() error {
322321
return nil
323322
}
324323

325-
func (s *SegmentSink) UploadManifest(filepath string) (string, string, bool, error) {
324+
func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) {
326325
if s.DisableManifest && !s.ManifestRequired() {
327-
return "", "", false, nil
326+
return "", false, nil
328327
}
329328

330329
storagePath := path.Join(s.StorageDir, path.Base(filepath))
331-
location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
330+
location, _, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false)
332331
if err != nil {
333-
return "", "", false, err
332+
return "", false, err
334333
}
335334

336-
return location, presignedUrl, true, nil
335+
return location, true, nil
337336
}

pkg/pipeline/sink/sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
type Sink interface {
2626
Start() error
2727
Close() error
28-
UploadManifest(string) (string, string, bool, error)
28+
UploadManifest(string) (string, bool, error)
2929
}
3030

3131
func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) {

pkg/pipeline/sink/uploader/alioss.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,28 +45,28 @@ func newAliOSSUploader(c *config.StorageConfig) (uploader, error) {
4545
}, nil
4646
}
4747

48-
func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) {
48+
func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
4949
storageFilepath = path.Join(u.prefix, storageFilepath)
5050

5151
stat, err := os.Stat(localFilepath)
5252
if err != nil {
53-
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
53+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
5454
}
5555

5656
client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
5757
if err != nil {
58-
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
58+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
5959
}
6060

6161
bucket, err := client.Bucket(u.conf.Bucket)
6262
if err != nil {
63-
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
63+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
6464
}
6565

6666
err = bucket.PutObjectFromFile(storageFilepath, localFilepath)
6767
if err != nil {
68-
return "", 0, "", errors.ErrUploadFailed("AliOSS", err)
68+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
6969
}
7070

71-
return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), "", nil
71+
return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil
7272
}

pkg/pipeline/sink/uploader/azure.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ func newAzureUploader(c *config.StorageConfig) (uploader, error) {
4949
}, nil
5050
}
5151

52-
func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, string, error) {
52+
func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) {
5353
storageFilepath = path.Join(u.prefix, storageFilepath)
5454

5555
credential, err := azblob.NewSharedKeyCredential(
5656
u.conf.AccountName,
5757
u.conf.AccountKey,
5858
)
5959
if err != nil {
60-
return "", 0, "", errors.ErrUploadFailed("Azure", err)
60+
return "", 0, errors.ErrUploadFailed("Azure", err)
6161
}
6262

6363
azUrl, err := url.Parse(u.container)
6464
if err != nil {
65-
return "", 0, "", errors.ErrUploadFailed("Azure", err)
65+
return "", 0, errors.ErrUploadFailed("Azure", err)
6666
}
6767

6868
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
@@ -78,15 +78,15 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
7878

7979
file, err := os.Open(localFilepath)
8080
if err != nil {
81-
return "", 0, "", errors.ErrUploadFailed("Azure", err)
81+
return "", 0, errors.ErrUploadFailed("Azure", err)
8282
}
8383
defer func() {
8484
_ = file.Close()
8585
}()
8686

8787
stat, err := file.Stat()
8888
if err != nil {
89-
return "", 0, "", errors.ErrUploadFailed("Azure", err)
89+
return "", 0, errors.ErrUploadFailed("Azure", err)
9090
}
9191

9292
// upload blocks in parallel for optimal performance
@@ -97,8 +97,8 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
9797
Parallelism: 16,
9898
})
9999
if err != nil {
100-
return "", 0, "", errors.ErrUploadFailed("Azure", err)
100+
return "", 0, errors.ErrUploadFailed("Azure", err)
101101
}
102102

103-
return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), "", nil
103+
return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
104104
}

0 commit comments

Comments
 (0)