Skip to content

Commit

Permalink
Add warning log for direct path connectivity (#2691)
Browse files Browse the repository at this point in the history
* adding warning log for direct connectivity

* changing buckethandle signature

* enabling log only for grpc

* refactors

* nits and refactors

* shared context for test suite
  • Loading branch information
anushka567 authored Nov 25, 2024
1 parent 833a760 commit 4e33b30
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 44 deletions.
4 changes: 2 additions & 2 deletions internal/cache/file/cache_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ func (cht *cacheHandleTest) verifyContentRead(readStartOffset int64, expectedCon
func (cht *cacheHandleTest) SetupTest() {
locker.EnableInvariantsCheck()
cht.cacheDir = path.Join(os.Getenv("HOME"), "cache/dir")
ctx := context.Background()

// Create bucket in fake storage.
cht.fakeStorage = storage.NewFakeStorage()
storageHandle := cht.fakeStorage.CreateStorageHandle()
cht.bucket = storageHandle.BucketHandle(storage.TestBucketName, "")
cht.bucket = storageHandle.BucketHandle(ctx, storage.TestBucketName, "")

// Create test object in the bucket.
ctx := context.Background()
testObjectContent := make([]byte, TestObjectSize)
n, err := rand.Read(testObjectContent)
assert.Equal(cht.T(), TestObjectSize, n)
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/cache_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func initializeCacheHandlerTestArgs(t *testing.T, fileCacheConfig *cfg.FileCache
fakeStorage.ShutDown()
})
storageHandle := fakeStorage.CreateStorageHandle()
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
ctx := context.Background()
bucket := storageHandle.BucketHandle(ctx, storage.TestBucketName, "")

// Create test object in the bucket.
testObjectContent := make([]byte, TestObjectSize)
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (dt *downloaderTest) setupHelper() {
// Create bucket in fake storage.
dt.fakeStorage = storage.NewFakeStorage()
storageHandle := dt.fakeStorage.CreateStorageHandle()
dt.bucket = storageHandle.BucketHandle(storage.TestBucketName, "")
ctx := context.Background()
dt.bucket = storageHandle.BucketHandle(ctx, storage.TestBucketName, "")

dt.initJobTest(DefaultObjectName, []byte("taco"), DefaultSequentialReadSizeMb, CacheMaxSize, func() {})
dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig)
Expand Down
7 changes: 4 additions & 3 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func TestParallelDownloads(t *testing.T) {
t.Parallel()
cache, cacheDir := configureCache(t, 2*tc.objectSize)
storageHandle := configureFakeStorage(t)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
ctx := context.Background()
bucket := storageHandle.BucketHandle(ctx, storage.TestBucketName, "")
minObj, content := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", tc.objectSize)
fileCacheConfig := &cfg.FileCacheConfig{
EnableParallelDownloads: true,
Expand Down Expand Up @@ -178,7 +179,8 @@ func TestMultipleConcurrentDownloads(t *testing.T) {
t.Parallel()
storageHandle := configureFakeStorage(t)
cache, cacheDir := configureCache(t, 30*util.MiB)
bucket := storageHandle.BucketHandle(storage.TestBucketName, "")
ctx := context.Background()
bucket := storageHandle.BucketHandle(ctx, storage.TestBucketName, "")
minObj1, content1 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/foo.txt", 10*util.MiB)
minObj2, content2 := createObjectInStoreAndInitCache(t, cache, bucket, "path/in/gcs/bar.txt", 5*util.MiB)
fileCacheConfig := &cfg.FileCacheConfig{
Expand All @@ -194,7 +196,6 @@ func TestMultipleConcurrentDownloads(t *testing.T) {
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
s2 := job2.subscribe(5 * util.MiB)
ctx := context.Background()

_, err1 := job1.Download(ctx, 10*util.MiB, false)
_, err2 := job2.Download(ctx, 5*util.MiB, false)
Expand Down
2 changes: 1 addition & 1 deletion internal/gcsx/bucket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (bm *bucketManager) SetUpBucket(
if name == canned.FakeBucketName {
b = canned.MakeFakeBucket(ctx)
} else {
b = bm.storageHandle.BucketHandle(name, bm.config.BillingProject)
b = bm.storageHandle.BucketHandle(ctx, name, bm.config.BillingProject)
}

// Enable monitoring.
Expand Down
3 changes: 2 additions & 1 deletion internal/gcsx/bucket_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func init() { RegisterTestSuite(&BucketManagerTest{}) }
func (t *BucketManagerTest) SetUp(_ *TestInfo) {
t.fakeStorage = storage.NewFakeStorage()
t.storageHandle = t.fakeStorage.CreateStorageHandle()
t.bucket = t.storageHandle.BucketHandle(TestBucketName, "")
ctx := context.Background()
t.bucket = t.storageHandle.BucketHandle(ctx, TestBucketName, "")

AssertNe(nil, t.bucket)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/bucket_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestBucketHandleTestSuite(testSuite *testing.T) {
func (testSuite *BucketHandleTest) SetupTest() {
testSuite.fakeStorage = NewFakeStorage()
testSuite.storageHandle = testSuite.fakeStorage.CreateStorageHandle()
testSuite.bucketHandle = testSuite.storageHandle.BucketHandle(TestBucketName, "")
ctx := context.Background()
testSuite.bucketHandle = testSuite.storageHandle.BucketHandle(ctx, TestBucketName, "")
testSuite.mockClient = new(MockStorageControlClient)
testSuite.bucketHandle.controlClient = testSuite.mockClient

Expand Down
31 changes: 27 additions & 4 deletions internal/storage/storage_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,24 @@ type StorageHandle interface {
// to that project rather than to the bucket's owning project.
//
// A user-project is required for all operations on Requester Pays buckets.
BucketHandle(bucketName string, billingProject string) (bh *bucketHandle)
BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle)
}

type storageClient struct {
client *storage.Client
storageControlClient *control.StorageControlClient
directPathDetector *gRPCDirectPathDetector
}

type gRPCDirectPathDetector struct {
clientOptions []option.ClientOption
}

// isDirectPathPossible checks if gRPC direct connectivity is available for a specific bucket
// from the environment where the client is running. A `nil` error represents Direct Connectivity was
// detected.
func (pd *gRPCDirectPathDetector) isDirectPathPossible(ctx context.Context, bucketName string) error {
return storage.CheckDirectConnectivitySupported(ctx, bucketName, pd.clientOptions...)
}

// Return clientOpts for both gRPC client and control client.
Expand Down Expand Up @@ -188,8 +200,14 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
// The default protocol for the Go Storage control client's folders API is gRPC.
// gcsfuse will initially mirror this behavior due to the client's lack of HTTP support.
var controlClient *control.StorageControlClient
var clientOpts []option.ClientOption
var directPathDetector *gRPCDirectPathDetector
if clientConfig.ClientProtocol == cfg.GRPC {
sc, err = createGRPCClientHandle(ctx, &clientConfig)
if err == nil {
clientOpts, err = createClientOptionForGRPCClient(&clientConfig)
directPathDetector = &gRPCDirectPathDetector{clientOptions: clientOpts}
}
} else if clientConfig.ClientProtocol == cfg.HTTP1 || clientConfig.ClientProtocol == cfg.HTTP2 {
sc, err = createHTTPClientHandle(ctx, &clientConfig)
} else {
Expand All @@ -204,7 +222,7 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
// TODO: We will implement an additional check for the HTTP control client protocol once the Go SDK supports HTTP.
// TODO: Custom endpoints do not currently support gRPC. Remove this additional check once TPC(custom-endpoint) supports gRPC.
if clientConfig.EnableHNS && clientConfig.CustomEndpoint == "" {
clientOpts, err := createClientOptionForGRPCClient(&clientConfig)
clientOpts, err = createClientOptionForGRPCClient(&clientConfig)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}
Expand Down Expand Up @@ -234,11 +252,11 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
sc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts))
}

sh = &storageClient{client: sc, storageControlClient: controlClient}
sh = &storageClient{client: sc, storageControlClient: controlClient, directPathDetector: directPathDetector}
return
}

func (sh *storageClient) BucketHandle(bucketName string, billingProject string) (bh *bucketHandle) {
func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle) {
storageBucketHandle := sh.client.Bucket(bucketName)

if billingProject != "" {
Expand All @@ -250,5 +268,10 @@ func (sh *storageClient) BucketHandle(bucketName string, billingProject string)
bucketName: bucketName,
controlClient: sh.storageControlClient,
}
if sh.directPathDetector != nil {
if err := sh.directPathDetector.isDirectPathPossible(ctx, bucketName); err != nil {
logger.Warnf("Direct path connectivity unavailable for %s, reason: %v", bucketName, err)
}
}
return
}
Loading

0 comments on commit 4e33b30

Please sign in to comment.