From 0f7391784f8191b704267c85bb76f59d407ef6e7 Mon Sep 17 00:00:00 2001 From: Tulsi Shah <46474643+Tulsishah@users.noreply.github.com> Date: Thu, 5 Sep 2024 08:35:47 +0000 Subject: [PATCH] Add lock for erasing/inserting entries for folder (#2446) * add lock before erasing * add package * lint fix * add comment * remove comment * insert method with lock * lint fix --- internal/storage/caching/fast_stat_bucket.go | 23 ++++++++++++++++---- tools/integration_tests/run_e2e_tests.sh | 9 +------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/internal/storage/caching/fast_stat_bucket.go b/internal/storage/caching/fast_stat_bucket.go index e47a666e22..226598d5e7 100644 --- a/internal/storage/caching/fast_stat_bucket.go +++ b/internal/storage/caching/fast_stat_bucket.go @@ -85,6 +85,14 @@ func (b *fastStatBucket) insertMultiple(objs []*gcs.Object) { } } +// LOCKS_EXCLUDED(b.mu) +func (b *fastStatBucket) eraseEntriesWithGivenPrefix(folderName string) { + b.mu.Lock() + defer b.mu.Unlock() + + b.cache.EraseEntriesWithGivenPrefix(folderName) +} + // insertHierarchicalListing saves the objects in cache excluding zero byte objects corresponding to folders // by iterating objects present in listing and saves prefixes as folders (all prefixes are folders in hns) by // iterating collapsedRuns of listing. @@ -120,6 +128,14 @@ func (b *fastStatBucket) insert(o *gcs.Object) { b.insertMultiple([]*gcs.Object{o}) } +// LOCKS_EXCLUDED(b.mu) +func (b *fastStatBucket) insertFolder(f *gcs.Folder) { + b.mu.Lock() + defer b.mu.Unlock() + + b.cache.InsertFolder(f, b.clock.Now().Add(b.ttl)) +} + // LOCKS_EXCLUDED(b.mu) func (b *fastStatBucket) addNegativeEntry(name string) { b.mu.Lock() @@ -378,7 +394,7 @@ func (b *fastStatBucket) GetFolder( } // Record the new folder. - b.cache.InsertFolder(folder, b.clock.Now().Add(b.ttl)) + b.insertFolder(folder) return folder, nil } @@ -386,14 +402,13 @@ func (b *fastStatBucket) CreateFolder(ctx context.Context, folderName string) (f // Throw away any existing record for this folder. b.invalidate(folderName) - expiration := b.clock.Now().Add(b.ttl) f, err = b.wrapped.CreateFolder(ctx, folderName) if err != nil { return } // Record the new folder. - b.cache.InsertFolder(f, expiration) + b.insertFolder(f) return } @@ -402,7 +417,7 @@ func (b *fastStatBucket) RenameFolder(ctx context.Context, folderName string, de o, err = b.wrapped.RenameFolder(ctx, folderName, destinationFolderId) // Invalidate cache for old directory. - b.cache.EraseEntriesWithGivenPrefix(folderName) + b.eraseEntriesWithGivenPrefix(folderName) return o, err } diff --git a/tools/integration_tests/run_e2e_tests.sh b/tools/integration_tests/run_e2e_tests.sh index 703afc9fdd..97a2af278d 100755 --- a/tools/integration_tests/run_e2e_tests.sh +++ b/tools/integration_tests/run_e2e_tests.sh @@ -104,6 +104,7 @@ TEST_DIR_HNS_PARALLEL_GROUP=( "list_large_dir" "mounting" "kernel_list_cache" + "concurrent_operations" ) TEST_DIR_HNS_NON_PARALLEL=( @@ -287,14 +288,6 @@ function run_e2e_tests_for_hns_bucket(){ wait $non_parallel_tests_hns_group_pid non_parallel_tests_hns_group_exit_code=$? - # TODO: The tests are currently flaky. Please unblock this after the issue is fixed. - # The concurrent_operations package, which experiences intermittent failures on presubmit tests, primarily due to parallel execution on the FLAT bucket, has been executed. - # Added it serially after all the tests are completed to avoid failures. - # local log_file="/tmp/concurrent_operation_${hns_bucket_name_parallel_group}.log" - # echo $log_file >> $TEST_LOGS_FILE - # GODEBUG=asyncpreemptoff=1 go test ./tools/integration_tests/concurrent_operations $GO_TEST_SHORT_FLAG $PRESUBMIT_RUN_FLAG -p 1 --integrationTest -v --testbucket=$hns_bucket_name_non_parallel_group --testInstalledPackage=$RUN_E2E_TESTS_ON_PACKAGE -timeout $INTEGRATION_TEST_TIMEOUT > "$log_file" 2>&1 - # non_parallel_tests_hns_group_exit_code_2=$? - hns_buckets=("$hns_bucket_name_parallel_group" "$hns_bucket_name_non_parallel_group") clean_up hns_buckets