From f3132e2ed2f287af126987f47a7d76e1460dd317 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 11 Aug 2023 16:48:14 -0700 Subject: [PATCH] reuse gcp client (#461) --- pkg/pipeline/sink/uploader/gcp.go | 44 +++++++++++++++---------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index e85c0267..ed767b5a 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -31,18 +31,29 @@ import ( ) type GCPUploader struct { - conf *livekit.GCPUpload + conf *livekit.GCPUpload + client *storage.Client } func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { - return &GCPUploader{ + u := &GCPUploader{ conf: conf, - }, nil + } + + var err error + if conf.Credentials != "" { + u.client, err = storage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(u.conf.Credentials))) + } else { + u.client, err = storage.NewClient(context.Background()) + } + if err != nil { + return nil, err + } + + return u, nil } func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { - ctx := context.Background() - file, err := os.Open(localFilepath) if err != nil { return "", 0, wrap("GCP", err) @@ -56,39 +67,26 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp return "", 0, wrap("GCP", err) } - var client *storage.Client - if u.conf.Credentials != "" { - client, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(u.conf.Credentials))) - } else { - client, err = storage.NewClient(ctx) - } - if err != nil { - return "", 0, wrap("GCP", err) - } - defer func() { - _ = client.Close() - }() - // In case where the total amount of data to upload is larger than googleapi.DefaultUploadChunkSize, each upload request will have a timeout of // ChunkRetryDeadline, which is 32s by default. If the request payload is smaller than googleapi.DefaultUploadChunkSize, use a context deadline // to apply the same timeout - var wctx context.Context + var ctx context.Context if stat.Size() <= googleapi.DefaultUploadChunkSize { var cancel context.CancelFunc - wctx, cancel = context.WithTimeout(ctx, time.Second*32) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*32) defer cancel() } else { - wctx = ctx + ctx = context.Background() } - wc := client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( + wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( storage.WithBackoff(gax.Backoff{ Initial: minDelay, Max: maxDelay, Multiplier: 2, }), storage.WithPolicy(storage.RetryAlways), - ).NewWriter(wctx) + ).NewWriter(ctx) if _, err = io.Copy(wc, file); err != nil { return "", 0, wrap("GCP", err)