Skip to content

Commit

Permalink
Create a new inode for a folder if it is deleted/rename and then recr…
Browse files Browse the repository at this point in the history
…eated (#2346)

* add unlink method

* put default value false

* check on e2e tests

* add check while creating inode

* undo changes of e2e test

* unit test

* remove print statements

* remove log file

* lint fix

* lint fix

* lint fix

* add comment

* e2e test fix

* e2e test fix

* small fix

* base dir implementation return false

* rebase

* rebase

* update comment

* update comment

* small fix

* rename method

* add unit test

* update name

* remove TODO

* refactoring

* remove check

* update comment

* linux test fix

* linux test fix

* rename test

* review comments

* small fixed

* unit test fix

* unit test fix

* lint fix

* lint fix

* add comment
  • Loading branch information
Tulsishah authored Aug 21, 2024
1 parent e892414 commit 6ede9a6
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 29 deletions.
13 changes: 6 additions & 7 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,18 +817,17 @@ func (fs *fileSystem) mintInode(ic inode.Core) (in inode.Inode) {
// UNLOCK_FUNCTION(fs.mu)
// LOCK_FUNCTION(in)
func (fs *fileSystem) createDirInode(ic inode.Core, inodes map[inode.Name]inode.DirInode) inode.Inode {
var in inode.Inode
var ok bool
if !ic.FullName.IsDir() {
panic(fmt.Sprintf("Unexpected name for a directory: %q", ic.FullName))
}

var maxTriesToCreateInode = 3

for n := 0; n < maxTriesToCreateInode; n++ {
in, ok = (inodes)[ic.FullName]
if !ok {
in = fs.mintInode(ic)
in, ok := (inodes)[ic.FullName]
// Create a new inode when a folder is created first time, or when a folder is deleted and then recreated with the same name.
if !ok || in.IsUnlinked() {
in := fs.mintInode(ic)
(inodes)[in.Name()] = in.(inode.DirInode)
in.Lock()
return in
Expand Down Expand Up @@ -1916,7 +1915,7 @@ func (fs *fileSystem) RmDir(
_, isImplicitDir := fs.implicitDirInodes[child.Name()]
fs.mu.Unlock()
parent.Lock()
err = parent.DeleteChildDir(ctx, op.Name, isImplicitDir)
err = parent.DeleteChildDir(ctx, op.Name, isImplicitDir, childDir)
parent.Unlock()

if err != nil {
Expand Down Expand Up @@ -2215,7 +2214,7 @@ func (fs *fileSystem) renameNonHierarchicalDir(
_, isImplicitDir := fs.implicitDirInodes[oldDir.Name()]
fs.mu.Unlock()
oldParent.Lock()
err = oldParent.DeleteChildDir(ctx, oldName, isImplicitDir)
err = oldParent.DeleteChildDir(ctx, oldName, isImplicitDir, oldDir)
oldParent.Unlock()
if err != nil {
return fmt.Errorf("DeleteChildDir: %w", err)
Expand Down
95 changes: 94 additions & 1 deletion internal/fs/hns_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ var expectedFooDirEntries = []dirEntry{
func TestHNSBucketTests(t *testing.T) { suite.Run(t, new(HNSBucketTests)) }

func (t *HNSBucketTests) SetupSuite() {
t.serverCfg.NewConfig = &cfg.Config{EnableHns: true}
t.serverCfg.ImplicitDirectories = false
t.serverCfg.NewConfig = &cfg.Config{
EnableHns: true,
}
bucketType = gcs.Hierarchical
t.fsTest.SetUpTestSuite()
}
Expand Down Expand Up @@ -105,6 +107,17 @@ func (t *HNSBucketTests) TestDeleteFolder() {
assert.True(t.T(), strings.Contains(err.Error(), "no such file or directory"))
}

func (t *HNSBucketTests) TestDeleteImplicitDir() {
dirPath := path.Join(mntDir, "foo", "implicit_dir")

err = os.RemoveAll(dirPath)

assert.NoError(t.T(), err)
_, err = os.Stat(dirPath)
assert.Error(t.T(), err)
assert.True(t.T(), strings.Contains(err.Error(), "no such file or directory"))
}

func (t *HNSBucketTests) TestRenameFolderWithSrcDirectoryDoesNotExist() {
oldDirPath := path.Join(mntDir, "foo_not_exist")
newDirPath := path.Join(mntDir, "foo_rename")
Expand Down Expand Up @@ -253,3 +266,83 @@ func (t *HNSBucketTests) TestRenameFolderWithOpenGCSFile() {
assert.Equal(t.T(), "file1.txt", dirEntries[0].Name())
assert.False(t.T(), dirEntries[0].IsDir())
}

// Create directory foo.
// Stat the directory foo.
// Rename directory foo --> foo_rename
// Stat the old directory.
// Stat the new directory.
// Read new directory and validate.
// Create old directory again with same name - foo
// Stat the directory - foo
// Read directory again and validate it is empty.
func (t *HNSBucketTests) TestCreateDirectoryWithSameNameAfterRename() {
oldDirPath := path.Join(mntDir, "foo")
_, err = os.Stat(oldDirPath)
require.NoError(t.T(), err)
newDirPath := path.Join(mntDir, "foo_rename")
// Rename directory foo --> foo_rename
err = os.Rename(oldDirPath, newDirPath)
require.NoError(t.T(), err)
// Stat old directory.
_, err = os.Stat(oldDirPath)
require.Error(t.T(), err)
require.True(t.T(), strings.Contains(err.Error(), "no such file or directory"))
// Stat new directory.
_, err = os.Stat(newDirPath)
require.NoError(t.T(), err)
// Read new directory and validate.
dirEntries, err := os.ReadDir(newDirPath)
require.NoError(t.T(), err)
require.Equal(t.T(), 5, len(dirEntries))
actualDirEntries := []dirEntry{}
for _, d := range dirEntries {
actualDirEntries = append(actualDirEntries, dirEntry{
name: d.Name(),
isDir: d.IsDir(),
})
}
require.ElementsMatch(t.T(), actualDirEntries, expectedFooDirEntries)

// Create old directory again.
err = os.Mkdir(oldDirPath, dirPerms)

assert.NoError(t.T(), err)
_, err = os.Stat(oldDirPath)
assert.NoError(t.T(), err)
dirEntries, err = os.ReadDir(oldDirPath)
assert.NoError(t.T(), err)
assert.Equal(t.T(), 0, len(dirEntries))
}

// Create directory - foo/test2
// Create local file in directory - foo/test2/test.txt
// Stat the local file - foo/test2/test.txt
// Delete directory - rm -r foo/test2
// Create directory again - foo/test2
// Create local file with the same name in directory - foo/test2/test.txt
// Stat the local file - foo/test2/test.txt
func (t *HNSBucketTests) TestCreateLocalFileInSamePathAfterDeletingParentDirectory() {
dirPath := path.Join(mntDir, "foo", "test2")
filePath := path.Join(dirPath, "test.txt")
// Create local file in side it.
f1, err := os.Create(filePath)
defer require.NoError(t.T(), f1.Close())
require.NoError(t.T(), err)
_, err = os.Stat(filePath)
require.NoError(t.T(), err)
// Delete directory rm -r foo/test2
err = os.RemoveAll(dirPath)
assert.NoError(t.T(), err)
// Create directory again foo/test2
err = os.Mkdir(dirPath, dirPerms)
assert.NoError(t.T(), err)

// Create local file again.
f2, err := os.Create(filePath)
defer require.NoError(t.T(), f2.Close())

assert.NoError(t.T(), err)
_, err = os.Stat(filePath)
assert.NoError(t.T(), err)
}
11 changes: 10 additions & 1 deletion internal/fs/inode/base_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (d *baseDirInode) DeleteChildFile(
func (d *baseDirInode) DeleteChildDir(
ctx context.Context,
name string,
isImplicitDir bool) (err error) {
isImplicitDir bool,
dirInode DirInode) (err error) {
err = fuse.ENOSYS
return
}
Expand All @@ -250,3 +251,11 @@ func (d *baseDirInode) RenameFolder(ctx context.Context, folderName string, dest
err = fuse.ENOSYS
return
}

// This operation is not supported on base_dir.
func (d *baseDirInode) IsUnlinked() bool {
return false
}

func (d *baseDirInode) Unlink() {
}
33 changes: 28 additions & 5 deletions internal/fs/inode/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ type DirInode interface {
DeleteChildDir(
ctx context.Context,
name string,
isImplicitDir bool) (err error)
isImplicitDir bool,
dirInode DirInode) (err error)

// LocalFileEntries lists the local files present in the directory.
// Local means that the file is not yet present on GCS.
Expand All @@ -156,6 +157,10 @@ type DirInode interface {

// RUnlock readonly unlock.
RUnlock()

IsUnlinked() bool

Unlink()
}

// An inode that represents a directory from a GCS bucket.
Expand Down Expand Up @@ -210,6 +215,10 @@ type dirInode struct {
// enabled.
prevDirListingTimeStamp time.Time
isHNSEnabled bool

// Represents if folder has been unlinked in hierarchical bucket. This is not getting used in
// non-hierarchical bucket.
unlinked bool
}

var _ DirInode = &dirInode{}
Expand Down Expand Up @@ -264,6 +273,7 @@ func NewDirInode(
attrs: attrs,
cache: metadata.NewTypeCache(typeCacheMaxSizeMB, typeCacheTTL),
isHNSEnabled: isHNSEnabled,
unlinked: false,
}

typed.lc.Init(id)
Expand Down Expand Up @@ -593,6 +603,14 @@ func (d *dirInode) LookUpChild(ctx context.Context, name string) (*Core, error)
return result, nil
}

func (d *dirInode) IsUnlinked() bool {
return d.unlinked
}

func (d *dirInode) Unlink() {
d.unlinked = true
}

// LOCKS_REQUIRED(d)
func (d *dirInode) ReadDescendants(ctx context.Context, limit int) (map[Name]*Core, error) {
var tok string
Expand Down Expand Up @@ -914,13 +932,14 @@ func (d *dirInode) DeleteChildFile(
func (d *dirInode) DeleteChildDir(
ctx context.Context,
name string,
isImplicitDir bool) error {
isImplicitDir bool,
dirInode DirInode) error {
d.cache.Erase(name)

// if the directory is an implicit directory, then no backing object
// If the directory is an implicit directory, then no backing object
// exists in the gcs bucket, so returning from here.
// Hierarchical buckets don't have implicit dirs.
if isImplicitDir && !d.isBucketHierarchical() {
// Hierarchical buckets don't have implicit dirs so this will be always false in hierarchical bucket case.
if isImplicitDir {
return nil
}

Expand Down Expand Up @@ -950,6 +969,10 @@ func (d *dirInode) DeleteChildDir(
return fmt.Errorf("DeleteFolder: %w", err)
}

if d.isBucketHierarchical() {
dirInode.Unlink()
}

d.cache.Erase(name)
return nil
}
Expand Down
33 changes: 30 additions & 3 deletions internal/fs/inode/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ func (t *DirTest) resetInodeWithTypeCacheConfigs(implicitDirs, enableNonexistent
t.in.Lock()
}

func (t *DirTest) createDirInode(dirInodeName string) DirInode {
return NewDirInode(
5,
NewDirName(NewRootName(""), dirInodeName),
fuseops.InodeAttributes{
Uid: uid,
Gid: gid,
Mode: dirMode,
},
false,
false,
true,
typeCacheTTL,
&t.bucket,
&t.clock,
&t.clock,
config.DefaultTypeCacheMaxSizeMB,
false,
)
}

func (t *DirTest) getTypeFromCache(name string) metadata.Type {
return t.tc.Get(t.in.(*dirInode).cacheClock.Now(), name)
}
Expand Down Expand Up @@ -1378,7 +1399,7 @@ func (t *DirTest) DeleteChildFile_TypeCaching() {
func (t *DirTest) DeleteChildDir_DoesntExist() {
const name = "qux"

err := t.in.DeleteChildDir(t.ctx, name, false)
err := t.in.DeleteChildDir(t.ctx, name, false, nil)
ExpectEq(nil, err)
}

Expand All @@ -1392,21 +1413,27 @@ func (t *DirTest) DeleteChildDir_Exists() {
_, err = storageutil.CreateObject(t.ctx, t.bucket, objName, []byte("taco"))
AssertEq(nil, err)

dirIn := t.createDirInode(objName)
// Call the inode.
err = t.in.DeleteChildDir(t.ctx, name, false)
err = t.in.DeleteChildDir(t.ctx, name, false, dirIn)
AssertEq(nil, err)

// Check the bucket.
_, err = storageutil.ReadObject(t.ctx, t.bucket, objName)
var notFoundErr *gcs.NotFoundError
ExpectTrue(errors.As(err, &notFoundErr))
ExpectFalse(dirIn.IsUnlinked())
}

func (t *DirTest) DeleteChildDir_ImplicitDirTrue() {
const name = "qux"
objName := path.Join(dirInodeName, name) + "/"

dirIn := t.createDirInode(objName)
err := t.in.DeleteChildDir(t.ctx, name, true, dirIn)

err := t.in.DeleteChildDir(t.ctx, name, true)
ExpectEq(nil, err)
ExpectFalse(dirIn.IsUnlinked())
}

func (t *DirTest) CreateLocalChildFile_ShouldnotCreateObjectInGCS() {
Expand Down
Loading

0 comments on commit 6ede9a6

Please sign in to comment.