From fb980aa3adf1d0e698ad7a66459c40b2c93fce82 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Mon, 23 Sep 2024 21:27:51 -0700 Subject: [PATCH] More info added to CompactionServiceJobInfo --- db/compaction/compaction_service_job.cc | 7 ++- db/compaction/compaction_service_test.cc | 77 +++++++++++++++++++++++- include/rocksdb/options.h | 17 +++++- 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 8a8db33627b..6ad1c351487 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -68,8 +68,11 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", compaction->column_family_data()->GetName().c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); - CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact), thread_pri_); + CompactionServiceJobInfo info( + dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), + thread_pri_, compaction->compaction_reason(), + compaction->is_full_compaction(), compaction->is_manual_compaction(), + compaction->bottommost_level()); CompactionServiceScheduleResponse response = db_options_.compaction_service->Schedule(info, compaction_input_binary); switch (response.status) { diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 8aacf2b6d2e..66d9419c4dd 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -21,8 +21,10 @@ class MyTestCompactionService : public CompactionService { : db_path_(std::move(db_path)), options_(options), statistics_(statistics), - start_info_("na", "na", "na", 0, Env::TOTAL), - wait_info_("na", "na", "na", 0, Env::TOTAL), + start_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown, + false, false, false), + wait_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown, + false, false, false), listeners_(listeners), table_properties_collector_factories_( std::move(table_properties_collector_factories)) {} @@ -601,11 +603,20 @@ TEST_F(CompactionServiceTest, CompactionInfo) { {file.db_path + "/" + file.name}, 2)); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::USER, info.priority); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::USER, info.priority); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); // Test priority BOTTOM env_->SetBackgroundThreads(1, Env::BOTTOM); + // This will set bottommost_level = true but is_full_compaction = false options.num_levels = 2; ReopenWithCompactionService(&options); my_cs = @@ -628,9 +639,71 @@ TEST_F(CompactionServiceTest, CompactionInfo) { } ASSERT_OK(dbfull()->TEST_WaitForCompact()); info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); ASSERT_EQ(Env::BOTTOM, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::BOTTOM, info.priority); + ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + + // Test Non-Bottommost Level + options.num_levels = 4; + ReopenWithCompactionService(&options); + my_cs = + static_cast_with_check(GetCompactionService()); + + for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(false, info.bottommost_level); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(false, info.is_manual_compaction); + ASSERT_EQ(false, info.is_full_compaction); + ASSERT_EQ(false, info.bottommost_level); + + // Test Full Compaction + Bottommost Level + options.num_levels = 6; + ReopenWithCompactionService(&options); + my_cs = + static_cast_with_check(GetCompactionService()); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id))); + } + ASSERT_OK(Flush()); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(true, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(true, info.is_manual_compaction); + ASSERT_EQ(true, info.is_full_compaction); + ASSERT_EQ(true, info.bottommost_level); + ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason); } TEST_F(CompactionServiceTest, FallbackLocalAuto) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e272e3a69ad..9700e25af55 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -466,14 +466,27 @@ struct CompactionServiceJobInfo { Env::Priority priority; + // Additional Compaction Details that can be useful in the CompactionService + CompactionReason compaction_reason; + bool is_full_compaction; + bool is_manual_compaction; + bool bottommost_level; + CompactionServiceJobInfo(std::string db_name_, std::string db_id_, std::string db_session_id_, uint64_t job_id_, - Env::Priority priority_) + Env::Priority priority_, + CompactionReason compaction_reason_, + bool is_full_compaction_, bool is_manual_compaction_, + bool bottommost_level_) : db_name(std::move(db_name_)), db_id(std::move(db_id_)), db_session_id(std::move(db_session_id_)), job_id(job_id_), - priority(priority_) {} + priority(priority_), + compaction_reason(compaction_reason_), + is_full_compaction(is_full_compaction_), + is_manual_compaction(is_manual_compaction_), + bottommost_level(bottommost_level_) {} }; struct CompactionServiceScheduleResponse {