Skip to content

Commit

Permalink
Initializing new storage control client (#1865)
Browse files Browse the repository at this point in the history
* adding parent struct

* adding new control client

* creating new storage control client on flag value true

* adding unit tests

* unit tests using new assertion

* remove unnecerry changes

* triggering e2e tests

* review changes

* lint fix

* lint fix

* lint fix

* removing unnecessary changes for go.mod

* review comment

* review comment

* rebasing

* local changes

* adding retry option for control client

* Update go.mod

* linux tests

* formating in control client file

* formating in control client file

* adding unit test

* lint fix

* lint fix

* linux tests

* linux tests

* linux tests

* adding comment for clientOpts

* review comment

* review comment

* review comment

* Update storage_handle.go
  • Loading branch information
Tulsishah authored May 8, 2024
1 parent 744605e commit 4620898
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
cloud.google.com/go/auth v0.3.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/longrunning v0.5.5 // indirect
cloud.google.com/go/monitoring v1.18.0 // indirect
cloud.google.com/go/pubsub v1.37.0 // indirect
cloud.google.com/go/trace v1.10.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA=
cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM=
cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI=
cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg=
cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s=
cloud.google.com/go/monitoring v1.18.0 h1:NfkDLQDG2UR3WYZVQE8kwSbUIEyIqJUPl+aOQdFH1T4=
cloud.google.com/go/monitoring v1.18.0/go.mod h1:c92vVBCeq/OB4Ioyo+NbN2U7tlg5ZH41PZcdvfc+Lcg=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
Expand Down
57 changes: 43 additions & 14 deletions internal/storage/storage_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"

"cloud.google.com/go/storage"
control "cloud.google.com/go/storage/control/apiv2"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
mountpkg "github.com/googlecloudplatform/gcsfuse/v2/internal/mount"
Expand All @@ -44,21 +45,12 @@ type StorageHandle interface {
}

type storageClient struct {
client *storage.Client
client *storage.Client
storageControlClient *control.StorageControlClient
}

// Followed https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API to create the gRPC client.
func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig) (sc *storage.Client, err error) {
if clientConfig.ClientProtocol != mountpkg.GRPC {
return nil, fmt.Errorf("client-protocol requested is not GRPC: %s", clientConfig.ClientProtocol)
}

if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
logger.Fatal("error setting direct path env var: %v", err)
}

var clientOpts []option.ClientOption

// Return clientOpts for both gRPC client and control client.
func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig) (clientOpts []option.ClientOption, err error) {
// Add Custom endpoint option.
if clientConfig.CustomEndpoint != nil {
if clientConfig.AnonymousAccess {
Expand Down Expand Up @@ -87,7 +79,29 @@ func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.Stora
clientOpts = append(clientOpts, option.WithGRPCConnectionPool(clientConfig.GrpcConnPoolSize))
clientOpts = append(clientOpts, option.WithUserAgent(clientConfig.UserAgent))

return
}

// Followed https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API to create the gRPC client.
func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig) (sc *storage.Client, err error) {
if clientConfig.ClientProtocol != mountpkg.GRPC {
return nil, fmt.Errorf("client-protocol requested is not GRPC: %s", clientConfig.ClientProtocol)
}

if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
logger.Fatal("error setting direct path env var: %v", err)
}

var clientOpts []option.ClientOption
clientOpts, err = createClientOptionForGRPCClient(clientConfig)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}

sc, err = storage.NewGRPCClient(ctx, clientOpts...)
if err != nil {
err = fmt.Errorf("NewGRPCClient: %w", err)
}

// Unset the environment variable, since it's used only while creation of grpc client.
if err := os.Unsetenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"); err != nil {
Expand Down Expand Up @@ -137,6 +151,9 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora
// http and gRPC client.
func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClientConfig) (sh StorageHandle, err error) {
var sc *storage.Client
// The default protocol for the Go Storage control client's folders API is gRPC.
// gcsfuse will initially mirror this behavior due to the client's lack of HTTP support.
var controlClient *control.StorageControlClient
if clientConfig.ClientProtocol == mountpkg.GRPC {
sc, err = createGRPCClientHandle(ctx, &clientConfig)
} else if clientConfig.ClientProtocol == mountpkg.HTTP1 || clientConfig.ClientProtocol == mountpkg.HTTP2 {
Expand All @@ -150,6 +167,18 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
return
}

// TODO: We will implement an additional check for the HTTP control client protocol once the Go SDK supports HTTP.
if clientConfig.EnableHNS {
clientOpts, err := createClientOptionForGRPCClient(&clientConfig)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}
controlClient, err = storageutil.CreateGRPCControlClient(ctx, clientOpts, &clientConfig)
if err != nil {
return nil, fmt.Errorf("could not create StorageControl Client: %w", err)
}
}

// ShouldRetry function checks if an operation should be retried based on the
// response of operation (error.Code).
// RetryAlways causes all operations to be checked for retries using
Expand All @@ -165,7 +194,7 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
storage.WithPolicy(storage.RetryAlways),
storage.WithErrorFunc(storageutil.ShouldRetry))

sh = &storageClient{client: sc}
sh = &storageClient{client: sc, storageControlClient: controlClient}
return
}

Expand Down
19 changes: 19 additions & 0 deletions internal/storage/storage_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,22 @@ func (testSuite *StorageHandleTest) TestNewStorageHandleWithGRPCClientWithCustom
assert.Contains(testSuite.T(), err.Error(), "GRPC client doesn't support auth for custom-endpoint. Please set anonymous-access: true via config-file.")
assert.Nil(testSuite.T(), handleCreated)
}

func (testSuite *StorageHandleTest) TestCreateStorageHandleWithEnableHNSTrue() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableHNS = true

sh, err := NewStorageHandle(context.Background(), sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), sh)
}

func (testSuite *StorageHandleTest) TestCreateClientOptionForGRPCClient() {
sc := storageutil.GetDefaultStorageClientConfig()

clientOption, err := createClientOptionForGRPCClient(&sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), clientOption)
}
4 changes: 4 additions & 0 deletions internal/storage/storageutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/auth"
"github.com/googlecloudplatform/gcsfuse/v2/internal/config"
mountpkg "github.com/googlecloudplatform/gcsfuse/v2/internal/mount"
"golang.org/x/net/context"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -52,6 +53,9 @@ type StorageClientConfig struct {

/** Grpc client parameters. */
GrpcConnPoolSize int

// Enabling new API flow for HNS bucket.
EnableHNS config.EnableHNS
}

func CreateHttpClient(storageClientConfig *StorageClientConfig) (httpClient *http.Client, err error) {
Expand Down
64 changes: 64 additions & 0 deletions internal/storage/storageutil/control_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storageutil

import (
"context"
"fmt"
"os"

control "cloud.google.com/go/storage/control/apiv2"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"google.golang.org/api/option"
)

func storageControlClientRetryOptions(clientConfig *StorageClientConfig) []gax.CallOption {
return []gax.CallOption{gax.WithRetry(func() gax.Retryer {
return gax.OnErrorFunc(gax.Backoff{
Max: clientConfig.MaxRetrySleep,
Multiplier: clientConfig.RetryMultiplier,
}, ShouldRetry)
})}
}

func setRetryConfigForFolderAPIs(sc *control.StorageControlClient, clientConfig *StorageClientConfig) {
sc.CallOptions.CreateFolder = storageControlClientRetryOptions(clientConfig)
sc.CallOptions.DeleteFolder = storageControlClientRetryOptions(clientConfig)
sc.CallOptions.RenameFolder = storageControlClientRetryOptions(clientConfig)
sc.CallOptions.GetFolder = storageControlClientRetryOptions(clientConfig)
sc.CallOptions.GetStorageLayout = storageControlClientRetryOptions(clientConfig)
}

func CreateGRPCControlClient(ctx context.Context, clientOpts []option.ClientOption, clientConfig *StorageClientConfig) (controlClient *control.StorageControlClient, err error) {
if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
logger.Fatal("error setting direct path env var: %v", err)
}

controlClient, err = control.NewStorageControlClient(ctx, clientOpts...)
if err != nil {
return nil, fmt.Errorf("NewStorageControlClient: %w", err)
}

// Set retries for control client.
setRetryConfigForFolderAPIs(controlClient, clientConfig)

// Unset the environment variable, since it's used only while creation of grpc client.
if err := os.Unsetenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"); err != nil {
logger.Fatal("error while unsetting direct path env var: %v", err)
}

return controlClient, err
}
57 changes: 57 additions & 0 deletions internal/storage/storageutil/control_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storageutil

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/api/option"
)

type ControlClientTest struct {
suite.Suite
}

func TestControlClientTestSuite(t *testing.T) {
suite.Run(t, new(ControlClientTest))
}

func (testSuite *ControlClientTest) SetupTest() {
}

func (testSuite *ControlClientTest) TearDownTest() {
}

func (testSuite *ControlClientTest) TestStorageControlClientRetryOptions() {
clientConfig := GetDefaultStorageClientConfig()

gaxOpts := storageControlClientRetryOptions(&clientConfig)

assert.NotNil(testSuite.T(), gaxOpts)
}

func (testSuite *ControlClientTest) TestStorageControlClient() {
var clientOpts []option.ClientOption
clientOpts = append(clientOpts, option.WithoutAuthentication())
clientConfig := GetDefaultStorageClientConfig()

controlClient, err := CreateGRPCControlClient(context.Background(), clientOpts, &clientConfig)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), controlClient)
}
1 change: 1 addition & 0 deletions internal/storage/storageutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ func GetDefaultStorageClientConfig() (clientConfig StorageClientConfig) {
ReuseTokenFromUrl: true,
ExperimentalEnableJsonRead: false,
AnonymousAccess: true,
EnableHNS: false,
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func createStorageHandle(flags *flagStorage, mountConfig *config.MountConfig, us
ReuseTokenFromUrl: flags.ReuseTokenFromUrl,
ExperimentalEnableJsonRead: flags.ExperimentalEnableJsonRead,
GrpcConnPoolSize: mountConfig.GrpcClientConfig.ConnPoolSize,
EnableHNS: mountConfig.EnableHNS,
}
logger.Infof("UserAgent = %s\n", storageClientConfig.UserAgent)
storageHandle, err = storage.NewStorageHandle(context.Background(), storageClientConfig)
Expand Down

0 comments on commit 4620898

Please sign in to comment.