Skip to content

Commit

Permalink
Conditionally exclude some l0 files in size amp compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Aug 29, 2023
1 parent f36394f commit 3224d21
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 1 deletion.
54 changes: 54 additions & 0 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,54 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) {
ASSERT_EQ(13, compaction->num_input_files(1));
}

TEST_F(CompactionPickerTest,
PartiallyExcludeL0ToReduceWriteStopForSizeAmpCompaction) {
const uint64_t kFileSize = 100000;
const uint64_t kL0FileCount = 30;
const uint64_t kLastLevelFileCount = 1;
const uint64_t kNumLevels = 5;

for (const uint64_t test_no_exclusion : {false, true}) {
const uint64_t kExpectedNumExcludedL0 =
test_no_exclusion ? 0 : kL0FileCount * 1 / 10;

mutable_cf_options_.level0_stop_writes_trigger = 36;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 1;
mutable_cf_options_.compaction_options_universal.max_merge_width =
test_no_exclusion
? mutable_cf_options_.level0_stop_writes_trigger - kL0FileCount
: UINT_MAX;

UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(kNumLevels, kCompactionStyleUniversal);

for (uint64_t i = 1; i <= kL0FileCount + kLastLevelFileCount; ++i) {
Add(i <= kL0FileCount ? 0 : kNumLevels - 1, static_cast<uint32_t>(i),
std::to_string((i + 100) * 1000).c_str(),
std::to_string((i + 100) * 1000 + 999).c_str(), kFileSize, 0, i * 100,
i * 100 + 99);
}

UpdateVersionStorageInfo();

ASSERT_TRUE(universal_compaction_picker.NeedsCompaction(vstorage_.get()));
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(compaction->compaction_reason(),
CompactionReason::kUniversalSizeAmplification);
ASSERT_EQ(compaction->num_input_files(0),
kL0FileCount - kExpectedNumExcludedL0);
ASSERT_EQ(compaction->num_input_files(kNumLevels - 1), kLastLevelFileCount);
for (uint64_t level = 1; level <= kNumLevels - 2; level++) {
ASSERT_EQ(compaction->num_input_files(level), 0);
}
}
}

TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
NewVersionStorage(1, kCompactionStyleFIFO);
const int kFileCount =
Expand Down Expand Up @@ -3380,6 +3428,9 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNonLastLevel) {
ioptions_.preclude_last_level_data_seconds = 1000;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 200;
// To avoid any L0 file exclusion in size amp compaction intended for reducing
// write stop
mutable_cf_options_.compaction_options_universal.max_merge_width = 2;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);

NewVersionStorage(kNumLevels, kCompactionStyleUniversal);
Expand Down Expand Up @@ -3453,6 +3504,9 @@ TEST_F(CompactionPickerTest, UniversalSizeAmpTierCompactionNotSuport) {
ioptions_.preclude_last_level_data_seconds = 1000;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 200;
// To avoid any L0 file exclusion in size amp compaction intended for reducing
// write stop
mutable_cf_options_.compaction_options_universal.max_merge_width = 2;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);

NewVersionStorage(kNumLevels, kCompactionStyleUniversal);
Expand Down
102 changes: 101 additions & 1 deletion db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class UniversalCompactionBuilder {
// because some files are being compacted.
Compaction* PickPeriodicCompaction();

bool ShouldSkipLastSortedRunForSizeAmpCompaction() {
bool ShouldSkipLastSortedRunForSizeAmpCompaction() const {
assert(!sorted_runs_.empty());
return ioptions_.preclude_last_level_data_seconds > 0 &&
ioptions_.num_levels > 2 &&
Expand All @@ -129,6 +129,94 @@ class UniversalCompactionBuilder {

uint64_t GetMaxOverlappingBytes() const;

// To conditionally exclude some L0 input files (newest first)
// from a size amp compaction. This is to prevent a large number of L0
// files from being locked by a size amp compaction, potentially leading to
// write stop with a few more flushes.
//
// Such exclusion is based on `num_l0_input_pre_exclusion`,
// `level0_stop_writes_trigger`, `max/min_merge_width` and the pre-exclusion
// compaction score. Noted that it will not make the size amp compaction of
// interest invalid from running as a size amp compaction as long as its
// pre-exclusion compaction score satisfies the condition to run.
//
// @param `num_l0_input_pre_exclusion` Number of L0 input files prior to
// exclusion
// @param `end_index` Index of the last sorted run selected as compaction
// input. Will not be affected by this exclusion.
// @param `start_index` Index of the first input sorted run prior to
// exclusion. Will be modified as output based on the exclusion.
// @param `candidate_size` Total size of all except for the last input sorted
// runs prior to exclusion. Will be modified as output based on the exclusion.
//
// @return Number of L0 files to exclude. `start_index` and
// `candidate_size` will be modified accordingly
std::size_t MightExcludeNewL0sToReduceWriteStop(
std::size_t num_l0_input_pre_exclusion, std::size_t end_index,
std::size_t& start_index, std::size_t& candidate_size) const {
if (num_l0_input_pre_exclusion == 0) {
return 0;
}

assert(mutable_cf_options_.level0_stop_writes_trigger > 0);
const std::size_t level0_stop_writes_trigger = static_cast<std::size_t>(
mutable_cf_options_.level0_stop_writes_trigger);
const std::size_t max_merge_width = static_cast<std::size_t>(
mutable_cf_options_.compaction_options_universal.max_merge_width);
const std::size_t min_merge_width = static_cast<std::size_t>(
mutable_cf_options_.compaction_options_universal.min_merge_width);
const std::size_t max_size_amplification_percent = static_cast<std::size_t>(
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent);
const size_t base_sr_size = sorted_runs_[end_index].size;

// Leave at least 1 L0 file and 2 input sorted runs after exclusion
const std::size_t max_num_l0_to_exclude =
std::min(num_l0_input_pre_exclusion - 1, end_index - start_index - 1);
const std::size_t num_extra_l0_before_write_stop =
level0_stop_writes_trigger -
std::min(level0_stop_writes_trigger, num_l0_input_pre_exclusion);
const std::size_t num_l0_to_exclude_for_max_merge_width =
std::min(max_merge_width -
std::min(max_merge_width, num_extra_l0_before_write_stop),
max_num_l0_to_exclude);
const std::size_t num_l0_to_exclude_for_min_merge_width =
std::min(min_merge_width -
std::min(min_merge_width, num_extra_l0_before_write_stop),
max_num_l0_to_exclude);

std::size_t num_l0_to_exclude = 0;
std::size_t candidate_size_post_exclusion = candidate_size;

for (std::size_t possible_num_l0_to_exclude =
num_l0_to_exclude_for_min_merge_width;
possible_num_l0_to_exclude <= num_l0_to_exclude_for_max_merge_width;
++possible_num_l0_to_exclude) {
std::size_t current_candidate_size = candidate_size_post_exclusion;
for (std::size_t j = num_l0_to_exclude; j < possible_num_l0_to_exclude;
++j) {
current_candidate_size -=
sorted_runs_.at(start_index + j).compensated_file_size;
}

// To ensure the compaction score before and after exclusion is similar
// so this exclusion will not make the size amp compaction of
// interest invalid from running as a size amp compaction as long as its
// pre-exclusion compaction score satisfies the condition to run.
if (current_candidate_size * 100 <
max_size_amplification_percent * base_sr_size ||
current_candidate_size < candidate_size * 9 / 10) {
break;
}
num_l0_to_exclude = possible_num_l0_to_exclude;
candidate_size_post_exclusion = current_candidate_size;
}

start_index += num_l0_to_exclude;
candidate_size = candidate_size_post_exclusion;
return num_l0_to_exclude;
}

const ImmutableOptions& ioptions_;
const InternalKeyComparator* icmp_;
double score_;
Expand Down Expand Up @@ -800,6 +888,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
const size_t base_sr_size = sorted_runs_[end_index].size;
size_t start_index = end_index;
size_t candidate_size = 0;
size_t l0_candidate_count = 0;

// Get longest span (i.e, [start_index, end_index]) of available sorted runs
while (start_index > 0) {
Expand All @@ -815,13 +904,24 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
break;
}
candidate_size += sr->compensated_file_size;
l0_candidate_count += sr->level == 0 ? 1 : 0;
--start_index;
}

if (start_index == end_index) {
return nullptr;
}

{
const size_t num_l0_to_exclude = MightExcludeNewL0sToReduceWriteStop(
l0_candidate_count, end_index, start_index, candidate_size);
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: Excluding %" ROCKSDB_PRIszt
" latest L0 files to reduce potential write stop "
"triggered by `level0_stop_writes_trigger`",
cf_name_.c_str(), num_l0_to_exclude);
}

{
char file_num_buf[kFormatFileNumberBufSize];
sorted_runs_[start_index].Dump(file_num_buf, sizeof(file_num_buf), true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Universal size amp compaction will conditionally exclude some L0 files (newest first) when selecting input with a small negative impact to size amp. This is to prevent a large number of L0 files from being locked by a size amp compaction, potentially leading to write stop with a few more flushes.

0 comments on commit 3224d21

Please sign in to comment.