Skip to content

Commit

Permalink
More info added to CompactionServiceJobInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Sep 24, 2024
1 parent fbbb087 commit fb980aa
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 6 deletions.
7 changes: 5 additions & 2 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobInfo info(
dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact),
thread_pri_, compaction->compaction_reason(),
compaction->is_full_compaction(), compaction->is_manual_compaction(),
compaction->bottommost_level());
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
Expand Down
77 changes: 75 additions & 2 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ class MyTestCompactionService : public CompactionService {
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics),
start_info_("na", "na", "na", 0, Env::TOTAL),
wait_info_("na", "na", "na", 0, Env::TOTAL),
start_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown,
false, false, false),
wait_info_("na", "na", "na", 0, Env::TOTAL, CompactionReason::kUnknown,
false, false, false),
listeners_(listeners),
table_properties_collector_factories_(
std::move(table_properties_collector_factories)) {}
Expand Down Expand Up @@ -601,11 +603,20 @@ TEST_F(CompactionServiceTest, CompactionInfo) {
{file.db_path + "/" + file.name}, 2));
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(Env::USER, info.priority);
ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
ASSERT_EQ(true, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::USER, info.priority);
ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
ASSERT_EQ(true, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);

// Test priority BOTTOM
env_->SetBackgroundThreads(1, Env::BOTTOM);
// This will set bottommost_level = true but is_full_compaction = false
options.num_levels = 2;
ReopenWithCompactionService(&options);
my_cs =
Expand All @@ -628,9 +639,71 @@ TEST_F(CompactionServiceTest, CompactionInfo) {
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
ASSERT_EQ(false, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);
ASSERT_EQ(Env::BOTTOM, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::BOTTOM, info.priority);
ASSERT_EQ(CompactionReason::kLevelL0FilesNum, info.compaction_reason);
ASSERT_EQ(false, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);

// Test Non-Bottommost Level
options.num_levels = 4;
ReopenWithCompactionService(&options);
my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());

for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}

ASSERT_OK(dbfull()->TEST_WaitForCompact());
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(false, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(false, info.bottommost_level);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(false, info.is_manual_compaction);
ASSERT_EQ(false, info.is_full_compaction);
ASSERT_EQ(false, info.bottommost_level);

// Test Full Compaction + Bottommost Level
options.num_levels = 6;
ReopenWithCompactionService(&options);
my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());

for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value_new_new" + std::to_string(key_id)));
}
ASSERT_OK(Flush());
}

CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));

ASSERT_OK(dbfull()->TEST_WaitForCompact());
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(true, info.is_manual_compaction);
ASSERT_EQ(true, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);
ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(true, info.is_manual_compaction);
ASSERT_EQ(true, info.is_full_compaction);
ASSERT_EQ(true, info.bottommost_level);
ASSERT_EQ(CompactionReason::kManualCompaction, info.compaction_reason);
}

TEST_F(CompactionServiceTest, FallbackLocalAuto) {
Expand Down
17 changes: 15 additions & 2 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,27 @@ struct CompactionServiceJobInfo {

Env::Priority priority;

// Additional Compaction Details that can be useful in the CompactionService
CompactionReason compaction_reason;
bool is_full_compaction;
bool is_manual_compaction;
bool bottommost_level;

CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
std::string db_session_id_, uint64_t job_id_,
Env::Priority priority_)
Env::Priority priority_,
CompactionReason compaction_reason_,
bool is_full_compaction_, bool is_manual_compaction_,
bool bottommost_level_)
: db_name(std::move(db_name_)),
db_id(std::move(db_id_)),
db_session_id(std::move(db_session_id_)),
job_id(job_id_),
priority(priority_) {}
priority(priority_),
compaction_reason(compaction_reason_),
is_full_compaction(is_full_compaction_),
is_manual_compaction(is_manual_compaction_),
bottommost_level(bottommost_level_) {}
};

struct CompactionServiceScheduleResponse {
Expand Down

0 comments on commit fb980aa

Please sign in to comment.