Skip to content

Commit

Permalink
Merge pull request #2074 from GoogleCloudPlatform/master
Browse files Browse the repository at this point in the history
Merge master into CLI_config_parity_release
  • Loading branch information
kislaykishore authored Jun 28, 2024
2 parents eb4f8e5 + 3995bfb commit 01d07d4
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 92 deletions.
6 changes: 3 additions & 3 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ func BindFlags(flagSet *pflag.FlagSet) error {
return err
}

flagSet.IntP("download-chunk-size-mb", "", 25, "Size of chunks in MiB that each concurrent request downloads.")
flagSet.IntP("download-chunk-size-mb", "", 50, "Size of chunks in MiB that each concurrent request downloads.")

err = viper.BindPFlag("file-cache.download-chunk-size-mb", flagSet.Lookup("download-chunk-size-mb"))
if err != nil {
return err
}

flagSet.BoolP("enable-crc", "", true, "Performs CRC to ensure that file is correctly downloaded into cache.")
flagSet.BoolP("enable-crc", "", false, "Performs CRC to ensure that file is correctly downloaded into cache.")

err = viper.BindPFlag("file-cache.enable-crc", flagSet.Lookup("enable-crc"))
if err != nil {
Expand Down Expand Up @@ -603,7 +603,7 @@ func BindFlags(flagSet *pflag.FlagSet) error {
return err
}

flagSet.IntP("parallel-downloads-per-file", "", 10, "Number of concurrent download requests per file.")
flagSet.IntP("parallel-downloads-per-file", "", 16, "Number of concurrent download requests per file.")

err = viper.BindPFlag("file-cache.parallel-downloads-per-file", flagSet.Lookup("parallel-downloads-per-file"))
if err != nil {
Expand Down
17 changes: 3 additions & 14 deletions internal/cache/file/cache_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,13 @@ func (chr *CacheHandler) cleanUpEvictedFile(fileInfo *data.FileInfo) error {
chr.jobManager.InvalidateAndRemoveJob(key.ObjectName, key.BucketName)

localFilePath := util.GetDownloadPath(chr.cacheDir, util.GetObjectPath(key.BucketName, key.ObjectName))
// Truncate the file to 0 size, so that even if there are open file handles
// and linux doesn't delete the file, the file will not take space.
err = os.Truncate(localFilePath, 0)
err = util.TruncateAndRemoveFile(localFilePath)
if err != nil {
if os.IsNotExist(err) {
logger.Warnf("cleanUpEvictedFile: file was not present at the time of truncating: %v", err)
logger.Warnf("cleanUpEvictedFile: file was not present at the time of clean up: %v", err)
return nil
} else {
return fmt.Errorf("cleanUpEvictedFile: while truncating file: %s, error: %w", localFilePath, err)
}
}
err = os.Remove(localFilePath)
if err != nil {
if os.IsNotExist(err) {
logger.Warnf("cleanUpEvictedFile: file was not present at the time of deleting: %v", err)
} else {
return fmt.Errorf("cleanUpEvictedFile: while deleting file: %s, error: %w", localFilePath, err)
}
return fmt.Errorf("cleanUpEvictedFile: error while cleaning up file: %s, error: %w", localFilePath, err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ func (job *Job) validateCRC() (err error) {
}

job.fileInfoCache.Erase(fileInfoKeyName)
removeErr := os.Remove(job.fileSpec.Path)
if removeErr != nil {
removeErr := cacheutil.TruncateAndRemoveFile(job.fileSpec.Path)
if removeErr != nil && !os.IsNotExist(removeErr) {
err = errors.Join(err, removeErr)
}

Expand Down
16 changes: 16 additions & 0 deletions internal/cache/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,19 @@ func CalculateFileCRC32(ctx context.Context, filePath string) (uint32, error) {

return calculateCRC32(ctx, file)
}

// TruncateAndRemoveFile first truncates the file to 0 and then remove (delete)
// the file at given path.
func TruncateAndRemoveFile(filePath string) error {
// Truncate the file to 0 size, so that even if there are open file handles
// and linux doesn't delete the file, the file will not take space.
err := os.Truncate(filePath, 0)
if err != nil {
return err
}
err = os.Remove(filePath)
if err != nil {
return err
}
return nil
}
58 changes: 58 additions & 0 deletions internal/cache/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,64 @@ func (ut *utilTest) Test_CalculateFileCRC32_ShouldReturnErrorWhenContextIsCancel
ExpectEq(0, crc)
}

func (ut *utilTest) Test_TruncateAndRemoveFile_FileExists() {
// Create a file to be deleted.
fileName := "temp.txt"
file, err := os.Create(fileName)
AssertEq(nil, err)
_, err = file.WriteString("Writing some data")
AssertEq(nil, err)
err = file.Close()
AssertEq(nil, err)

err = TruncateAndRemoveFile(fileName)

ExpectEq(nil, err)
// Check the file is deleted.
_, err = os.Stat(fileName)
ExpectTrue(os.IsNotExist(err), fmt.Sprintf("expected not exist error but got error: %v", err))
}

func (ut *utilTest) Test_TruncateAndRemoveFile_FileDoesNotExist() {
// Create a file to be deleted.
fileName := "temp.txt"

err := TruncateAndRemoveFile(fileName)

ExpectTrue(os.IsNotExist(err), fmt.Sprintf("expected not exist error but got error: %v", err))
}

func (ut *utilTest) Test_TruncateAndRemoveFile_OpenedFileDeleted() {
// Create a file to be deleted.
fileName := "temp.txt"
file, err := os.Create(fileName)
AssertEq(nil, err)
fileString := "Writing some data"
_, err = file.WriteString(fileString)
AssertEq(nil, err)
// Close the file to get the contents synced.
err = file.Close()
AssertEq(nil, err)
// Open the file again
file, err = os.Open(fileName)
defer func() {
_ = file.Close()
}()
AssertEq(nil, err)
fileInfo, err := file.Stat()
AssertEq(nil, err)
AssertEq(len(fileString), fileInfo.Size())

// File is not closed and call TruncateAndRemoveFile
err = TruncateAndRemoveFile(fileName)

ExpectEq(nil, err)
// The size of open file should be 0.
fileInfo, err = file.Stat()
ExpectEq(nil, err)
ExpectEq(0, fileInfo.Size())
}

func Test_CreateCacheDirectoryIfNotPresentAt_ShouldNotReturnAnyErrorWhenDirectoryExists(t *testing.T) {
base := path.Join("./", string(testutil.GenerateRandomBytes(4)))
dirPath := path.Join(base, "/", "path/cachedir")
Expand Down
6 changes: 3 additions & 3 deletions internal/config/mount_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ const (

DefaultKernelListCacheTtlSeconds int64 = 0

DefaultEnableCRC = true
DefaultEnableCRC = false
DefaultEnableParallelDownloads = false
DefaultDownloadChunkSizeMB = 25
DefaultParallelDownloadsPerFile = 10
DefaultDownloadChunkSizeMB = 50
DefaultParallelDownloadsPerFile = 16
)

type WriteConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions internal/config/yaml_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,10 @@ func ParseConfigFile(fileName string) (mountConfig *MountConfig, err error) {
return mountConfig, fmt.Errorf("error parsing list config: %w", err)
}

// The EnableEmptyManagedFolders flag must be set to true to enforce folder prefixes for Hierarchical buckets.
if mountConfig.EnableHNS {
mountConfig.ListConfig.EnableEmptyManagedFolders = true
}

return
}
6 changes: 3 additions & 3 deletions internal/config/yaml_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func validateDefaultConfig(t *testing.T, mountConfig *MountConfig) {
assert.Equal(t, int64(-1), mountConfig.FileCacheConfig.MaxSizeMB)
assert.False(t, mountConfig.FileCacheConfig.CacheFileForRangeRead)
assert.False(t, mountConfig.FileCacheConfig.EnableParallelDownloads)
assert.Equal(t, 10, mountConfig.FileCacheConfig.ParallelDownloadsPerFile)
assert.Equal(t, 16, mountConfig.FileCacheConfig.ParallelDownloadsPerFile)
assert.GreaterOrEqual(t, mountConfig.FileCacheConfig.MaxParallelDownloads, 16)
assert.Equal(t, 25, mountConfig.FileCacheConfig.DownloadChunkSizeMB)
assert.True(t, mountConfig.FileCacheConfig.EnableCRC)
assert.Equal(t, 50, mountConfig.FileCacheConfig.DownloadChunkSizeMB)
assert.False(t, mountConfig.FileCacheConfig.EnableCRC)
assert.Equal(t, 1, mountConfig.GCSConnection.GRPCConnPoolSize)
assert.False(t, mountConfig.GCSAuth.AnonymousAccess)
assert.False(t, bool(mountConfig.EnableHNS))
Expand Down
34 changes: 18 additions & 16 deletions internal/fs/inode/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,42 +831,44 @@ func (d *dirInode) DeleteChildFile(
func (d *dirInode) DeleteChildDir(
ctx context.Context,
name string,
isImplicitDir bool) (err error) {
isImplicitDir bool) error {
d.cache.Erase(name)

// if the directory is an implicit directory, then no backing object
// exists in the gcs bucket, so returning from here.
if isImplicitDir {
return
// Hierarchical buckets don't have implicit dirs.
if isImplicitDir && d.bucket.BucketType() != gcs.Hierarchical {
return nil
}

childName := NewDirName(d.Name(), name)

// Delete the backing object. Unfortunately we have no way to precondition
// this on the directory being empty.
err = d.bucket.DeleteObject(
err := d.bucket.DeleteObject(
ctx,
&gcs.DeleteObjectRequest{
Name: childName.GcsObjectName(),
Generation: 0, // Delete the latest version of object named after dir.
})

if err != nil {
err = fmt.Errorf("DeleteObject: %w", err)
return
}

if d.bucket.BucketType() == gcs.Hierarchical {
// Delete Folder deletes folder (in case of Hierarchical Bucket).
err = d.bucket.DeleteFolder(ctx, childName.GcsObjectName())
if d.bucket.BucketType() != gcs.Hierarchical {
if err != nil {
return fmt.Errorf("DeleteObject: %w", err)
}
d.cache.Erase(name)
return nil
}

if err != nil {
return
// Ignoring delete object error here, as in case of hns there is no way of knowing
// if underlying placeholder object exists or not in Hierarchical bucket.
// The DeleteFolder operation handles removing empty folders.
if err = d.bucket.DeleteFolder(ctx, childName.GcsObjectName()); err != nil {
return fmt.Errorf("DeleteFolder: %w", err)
}

d.cache.Erase(name)

return
return nil
}

// LOCKS_REQUIRED(fs)
Expand Down
31 changes: 31 additions & 0 deletions internal/gcsx/prefix_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"

"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
Expand Down Expand Up @@ -418,3 +420,32 @@ func (t *PrefixBucketTest) GetFolder_Prefix() {
AssertEq("projects/_/buckets/some_bucket/folders/foo_something", result.GetName())

}

func TestDeleteFolder(t *testing.T) {
prefix := "foo_"
wrapped := fake.NewFakeBucket(timeutil.RealClock(), "some_bucket")
bucket, err := gcsx.NewPrefixBucket(prefix, wrapped)
require.Nil(t, err)
objectName := "taco"
name := "foo_" + objectName
// TODO: Replace the use of CreateObject with CreateFolder once the CreateFolder API has been successfully implemented.
// Create an object through the back door.
ctx := context.Background()
_, err = storageutil.CreateObject(ctx, wrapped, name, []byte("foobar"))
require.Nil(t, err)

err = bucket.DeleteFolder(
ctx,
objectName)

if assert.Nil(t, err) {
// TODO: Replace the use of StatObject with GetFolder once the GetFolder API has been successfully implemented.
_, _, err = wrapped.StatObject(
ctx,
&gcs.StatObjectRequest{
Name: name,
})
var notFoundErr *gcs.NotFoundError
assert.ErrorAs(t, err, &notFoundErr)
}
}
19 changes: 0 additions & 19 deletions internal/storage/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,31 +468,12 @@ func (b *bucketHandle) ComposeObjects(ctx context.Context, req *gcs.ComposeObjec
}

func (b *bucketHandle) DeleteFolder(ctx context.Context, folderName string) (err error) {
var notfound *gcs.NotFoundError
var callOptions []gax.CallOption

err = b.DeleteObject(
ctx,
&gcs.DeleteObjectRequest{
Name: folderName,
Generation: 0, // Delete the latest version of object named after dir.
})

if err != nil {
// Ignore err if it is object not found as in HNS bucket object may exist as a folder.
if !errors.As(err, &notfound) {
return err
}
}

err = b.controlClient.DeleteFolder(ctx, &controlpb.DeleteFolderRequest{
Name: "projects/_/buckets/" + b.bucketName + "/folders/" + folderName,
}, callOptions...)

if err != nil {
err = fmt.Errorf("DeleteFolder: %w", err)
}

return err
}

Expand Down
16 changes: 1 addition & 15 deletions internal/storage/bucket_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,20 +1266,6 @@ func (testSuite *BucketHandleTest) TestDeleteFolderWhenFolderExitForHierarchical
assert.Nil(testSuite.T(), err)
}

func (testSuite *BucketHandleTest) TestDeleteFolderWhenFolderExistButSameObjectNotExistInHierarchicalBucket() {
ctx := context.Background()
mockClient := new(MockStorageControlClient)
mockClient.On("DeleteFolder", ctx, &controlpb.DeleteFolderRequest{Name: "projects/_/buckets/" + TestBucketName + "/folders/" + missingObjectName}, mock.Anything).
Return(nil)
testSuite.bucketHandle.controlClient = mockClient
testSuite.bucketHandle.bucketType = gcs.Hierarchical

err := testSuite.bucketHandle.DeleteFolder(ctx, missingObjectName)

mockClient.AssertExpectations(testSuite.T())
assert.Nil(testSuite.T(), err)
}

func (testSuite *BucketHandleTest) TestDeleteFolderWhenFolderNotExistForHierarchicalBucket() {
ctx := context.Background()
mockClient := new(MockStorageControlClient)
Expand All @@ -1291,7 +1277,7 @@ func (testSuite *BucketHandleTest) TestDeleteFolderWhenFolderNotExistForHierarch
err := testSuite.bucketHandle.DeleteFolder(ctx, missingObjectName)

mockClient.AssertExpectations(testSuite.T())
assert.Equal(testSuite.T(), "DeleteFolder: mock error", err.Error())
assert.NotNil(testSuite.T(), err)
}

func (testSuite *BucketHandleTest) TestGetFolderWhenFolderExistsForHierarchicalBucket() {
Expand Down
6 changes: 3 additions & 3 deletions tools/config-gen/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@
config-path: "file-cache.enable-crc"
type: "bool"
usage: "Performs CRC to ensure that file is correctly downloaded into cache."
default: true
default: false

- flag-name: "enable-parallel-downloads"
config-path: "file-cache.enable-parallel-downloads"
Expand All @@ -445,7 +445,7 @@
config-path: "file-cache.parallel-downloads-per-file"
type: "int"
usage: "Number of concurrent download requests per file."
default: "10"
default: "16"

- flag-name: "max-parallel-downloads"
config-path: "file-cache.max-parallel-downloads"
Expand All @@ -457,7 +457,7 @@
config-path: "file-cache.download-chunk-size-mb"
type: "int"
usage: "Size of chunks in MiB that each concurrent request downloads."
default: "25"
default: "50"

- flag-name: "cache-dir"
config-path: "cache-dir"
Expand Down
5 changes: 5 additions & 0 deletions tools/integration_tests/implicit_dir/implicit_dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func TestMain(m *testing.M) {

flagsSet := [][]string{{"--implicit-dirs"}}

if hnsFlagSet, err := setup.AddHNSFlagForHierarchicalBucket(ctx, storageClient); err == nil {
hnsFlagSet = append(hnsFlagSet, "--implicit-dirs")
flagsSet = append(flagsSet, hnsFlagSet)
}

if !testing.Short() {
flagsSet = append(flagsSet, []string{"--client-protocol=grpc", "--implicit-dirs=true"})
}
Expand Down
Loading

0 comments on commit 01d07d4

Please sign in to comment.