Skip to content

Commit

Permalink
reuse gcp client (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Aug 11, 2023
1 parent 3c73f22 commit f3132e2
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,29 @@ import (
)

type GCPUploader struct {
conf *livekit.GCPUpload
conf *livekit.GCPUpload
client *storage.Client
}

func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
return &GCPUploader{
u := &GCPUploader{
conf: conf,
}, nil
}

var err error
if conf.Credentials != "" {
u.client, err = storage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(u.conf.Credentials)))
} else {
u.client, err = storage.NewClient(context.Background())
}
if err != nil {
return nil, err
}

return u, nil
}

func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
ctx := context.Background()

file, err := os.Open(localFilepath)
if err != nil {
return "", 0, wrap("GCP", err)
Expand All @@ -56,39 +67,26 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp
return "", 0, wrap("GCP", err)
}

var client *storage.Client
if u.conf.Credentials != "" {
client, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(u.conf.Credentials)))
} else {
client, err = storage.NewClient(ctx)
}
if err != nil {
return "", 0, wrap("GCP", err)
}
defer func() {
_ = client.Close()
}()

// In case where the total amount of data to upload is larger than googleapi.DefaultUploadChunkSize, each upload request will have a timeout of
// ChunkRetryDeadline, which is 32s by default. If the request payload is smaller than googleapi.DefaultUploadChunkSize, use a context deadline
// to apply the same timeout
var wctx context.Context
var ctx context.Context
if stat.Size() <= googleapi.DefaultUploadChunkSize {
var cancel context.CancelFunc
wctx, cancel = context.WithTimeout(ctx, time.Second*32)
ctx, cancel = context.WithTimeout(context.Background(), time.Second*32)
defer cancel()
} else {
wctx = ctx
ctx = context.Background()
}

wc := client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
storage.WithBackoff(gax.Backoff{
Initial: minDelay,
Max: maxDelay,
Multiplier: 2,
}),
storage.WithPolicy(storage.RetryAlways),
).NewWriter(wctx)
).NewWriter(ctx)

if _, err = io.Copy(wc, file); err != nil {
return "", 0, wrap("GCP", err)
Expand Down

0 comments on commit f3132e2

Please sign in to comment.