Skip to content

Commit

Permalink
Add lock for erasing/inserting entries for folder (#2446)
Browse files Browse the repository at this point in the history
* add lock before erasing

* add package

* lint fix

* add comment

* remove comment

* insert method with lock

* lint fix
  • Loading branch information
Tulsishah authored Sep 5, 2024
1 parent 47b45fe commit 0f73917
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
23 changes: 19 additions & 4 deletions internal/storage/caching/fast_stat_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (b *fastStatBucket) insertMultiple(objs []*gcs.Object) {
}
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) eraseEntriesWithGivenPrefix(folderName string) {
b.mu.Lock()
defer b.mu.Unlock()

b.cache.EraseEntriesWithGivenPrefix(folderName)
}

// insertHierarchicalListing saves the objects in cache excluding zero byte objects corresponding to folders
// by iterating objects present in listing and saves prefixes as folders (all prefixes are folders in hns) by
// iterating collapsedRuns of listing.
Expand Down Expand Up @@ -120,6 +128,14 @@ func (b *fastStatBucket) insert(o *gcs.Object) {
b.insertMultiple([]*gcs.Object{o})
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertFolder(f *gcs.Folder) {
b.mu.Lock()
defer b.mu.Unlock()

b.cache.InsertFolder(f, b.clock.Now().Add(b.ttl))
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) addNegativeEntry(name string) {
b.mu.Lock()
Expand Down Expand Up @@ -378,22 +394,21 @@ func (b *fastStatBucket) GetFolder(
}

// Record the new folder.
b.cache.InsertFolder(folder, b.clock.Now().Add(b.ttl))
b.insertFolder(folder)
return folder, nil
}

func (b *fastStatBucket) CreateFolder(ctx context.Context, folderName string) (f *gcs.Folder, err error) {
// Throw away any existing record for this folder.
b.invalidate(folderName)

expiration := b.clock.Now().Add(b.ttl)
f, err = b.wrapped.CreateFolder(ctx, folderName)
if err != nil {
return
}

// Record the new folder.
b.cache.InsertFolder(f, expiration)
b.insertFolder(f)

return
}
Expand All @@ -402,7 +417,7 @@ func (b *fastStatBucket) RenameFolder(ctx context.Context, folderName string, de
o, err = b.wrapped.RenameFolder(ctx, folderName, destinationFolderId)

// Invalidate cache for old directory.
b.cache.EraseEntriesWithGivenPrefix(folderName)
b.eraseEntriesWithGivenPrefix(folderName)

return o, err
}
9 changes: 1 addition & 8 deletions tools/integration_tests/run_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ TEST_DIR_HNS_PARALLEL_GROUP=(
"list_large_dir"
"mounting"
"kernel_list_cache"
"concurrent_operations"
)

TEST_DIR_HNS_NON_PARALLEL=(
Expand Down Expand Up @@ -287,14 +288,6 @@ function run_e2e_tests_for_hns_bucket(){
wait $non_parallel_tests_hns_group_pid
non_parallel_tests_hns_group_exit_code=$?

# TODO: The tests are currently flaky. Please unblock this after the issue is fixed.
# The concurrent_operations package, which experiences intermittent failures on presubmit tests, primarily due to parallel execution on the FLAT bucket, has been executed.
# Added it serially after all the tests are completed to avoid failures.
# local log_file="/tmp/concurrent_operation_${hns_bucket_name_parallel_group}.log"
# echo $log_file >> $TEST_LOGS_FILE
# GODEBUG=asyncpreemptoff=1 go test ./tools/integration_tests/concurrent_operations $GO_TEST_SHORT_FLAG $PRESUBMIT_RUN_FLAG -p 1 --integrationTest -v --testbucket=$hns_bucket_name_non_parallel_group --testInstalledPackage=$RUN_E2E_TESTS_ON_PACKAGE -timeout $INTEGRATION_TEST_TIMEOUT > "$log_file" 2>&1
# non_parallel_tests_hns_group_exit_code_2=$?

hns_buckets=("$hns_bucket_name_parallel_group" "$hns_bucket_name_non_parallel_group")
clean_up hns_buckets

Expand Down

0 comments on commit 0f73917

Please sign in to comment.