From e8a5d5791a7f1734e181584c34ef5b66c7763088 Mon Sep 17 00:00:00 2001 From: Huachao Huang Date: Fri, 8 Nov 2019 22:09:02 +0800 Subject: [PATCH] compaction: skip output level files with no data overlap The idea is to skip output level files that are not overlapping with the data of start level files on compaction. By an output level file overlapping with the data of a start level file, I mean that there is at least one key in the start level file that is inside the range of the output level file. For example, an output level file *O* has range ["e", "f"] and keys "e" and "f", a start level file *S* has range ["a", "z"] and keys "a" and "z", although the range of file O overlaps with the range of file S, file O does not overlap with the data of file S. So when is this idea useful? We know that when we do sequential writes, all generated SST files don't overlap with each other and all compactions are just trivial moves, which is perfect. However, if we do concurrent sequential writes in multiple ranges, life gets hard. Take a relational database as an example. A common construction of the record keys is a table ID prefix concatenating with an auto-increment record ID (e.g. "1_1" means table 1, record 1). Now let's see what happens if we insert records into three tables (table 1 and table 2) in this order: "1_1", "2_1", "1_2", "2_2", "1_3", "2_3", "1_4", "2_4" ... Assume that RocksDB uses level compaction and each memtable and SST file contains at most two keys. After putting eight keys, we get four level 0 files: L0: ["1_1", "2_1"], ["1_2", "2_2"], ["1_3", "2_3"], ["1_4", "2_4"] L1: Then a level 0 compaction is triggered and we this: L0: L1: ["1_1", "1_2"], ["1_3", "1_4"], ["2_1", "2_2"], ["2_3", "2_4"] Then after putting four more keys: L0: ["1_5", "2_5"], ["1_6", "2_6"], ["1_7", "2_7"], ["1_8", "2_8"] L1: ["1_1", "1_2"], ["1_3", "1_4"], ["2_1", "2_2"], ["2_3", "2_4"] Now if a level 0 compaction is triggered, according to the current implementation, the start level inputs will be all files in level 0, which cover range ["1_5", "2_8"], and the output level inputs will be ["2_1", "2_2"] and ["2_3", "2_4"] because these two files overlap with the range of the start level. However, files ["2_1", "2_2"], ["2_3", "2_4"] don't overlap with the data of the start level inputs at all. So can we compact the start level inputs without rewriting these two output level files? The answer is yes, as long as we ensure that newly generated files don't overlap with existing files in the output level. We can use the ranges of skipped output level files as split points for the compaction output files. For this compaction, "2_1" will be a split point, which prevents the compaction from generating a file like ["1_8", "2_5"]. With this optimization, we reduce two file reads and writes, which is 1/3 of the IO in this compaction. While the above example seems a bit artificial, I also experimented on a real-world database with this idea. A simple sysbench insert benchmark on TiDB shows more than 30% compaction IO reduction in some cases. I think other similar databases can benefit from this optimization too. Note that the current change is ugly, so just consider it as a proof of concept implementation for now. --- db/column_family.cc | 2 +- db/compaction/compaction.cc | 4 +- db/compaction/compaction.h | 9 +++- db/compaction/compaction_job.cc | 28 ++++++++++- db/compaction/compaction_picker.cc | 9 ++++ db/compaction/compaction_picker.h | 5 ++ db/compaction/compaction_picker_level.cc | 31 +++++++++--- db/compaction/compaction_picker_level.h | 4 ++ db/db_compaction_test.cc | 53 ++++++++++++++++++++ db/version_set.cc | 62 ++++++++++++++++++++++++ db/version_set.h | 5 ++ 11 files changed, 201 insertions(+), 11 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index f6a012d8faa..7f06dba0b07 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -955,7 +955,7 @@ bool ColumnFamilyData::NeedsCompaction() const { Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { auto* result = compaction_picker_->PickCompaction( - GetName(), mutable_options, current_->storage_info(), log_buffer); + GetName(), mutable_options, current_, log_buffer); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 6d7a3561660..e57a00e3d7a 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -214,7 +214,8 @@ Compaction::Compaction(VersionStorageInfo* vstorage, std::vector _grandparents, bool _manual_compaction, double _score, bool _deletion_compaction, - CompactionReason _compaction_reason) + CompactionReason _compaction_reason, + std::vector _skipped_output_level_files) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -233,6 +234,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage, inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))), grandparents_(std::move(_grandparents)), score_(_score), + skipped_output_level_files_(_skipped_output_level_files), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), is_manual_compaction_(_manual_compaction), diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 598b08e7c65..5d0d3e09a69 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -78,7 +78,8 @@ class Compaction { std::vector grandparents, bool manual_compaction = false, double score = -1, bool deletion_compaction = false, - CompactionReason compaction_reason = CompactionReason::kUnknown); + CompactionReason compaction_reason = CompactionReason::kUnknown, + std::vector skipped_output_level_files = {}); // No copying allowed Compaction(const Compaction&) = delete; @@ -153,6 +154,10 @@ class Compaction { return &input_levels_[compaction_input_level]; } + const std::vector& skipped_output_level_files() const { + return skipped_output_level_files_; + } + // Maximum size of files to build during this compaction. uint64_t max_output_file_size() const { return max_output_file_size_; } @@ -350,6 +355,8 @@ class Compaction { std::vector grandparents_; const double score_; // score that was used to pick this compaction. + const std::vector skipped_output_level_files_; + // Is this compaction creating a file in the bottom most level? const bool bottommost_level_; // Does this compaction include all sst files? diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 4204c402c09..b4319448bb3 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -156,6 +156,8 @@ struct CompactionJob::SubcompactionState { // A flag determine whether the key has been seen in ShouldStopBefore() bool seen_key = false; + size_t current_skipped_file_index = 0; + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) : compaction(c), @@ -204,6 +206,15 @@ struct CompactionJob::SubcompactionState { // Returns true iff we should stop building the current output // before processing "internal_key". bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) { + if (CheckOverlappedBytes(internal_key, curr_file_size) || + CheckSkippedFilesRange(internal_key)) { + overlapped_bytes = 0; + return true; + } + return false; + } + + bool CheckOverlappedBytes(const Slice& internal_key, uint64_t curr_file_size) { const InternalKeyComparator* icmp = &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); @@ -227,12 +238,27 @@ struct CompactionJob::SubcompactionState { if (overlapped_bytes + curr_file_size > compaction->max_compaction_bytes()) { // Too much overlap for current output; start new output - overlapped_bytes = 0; return true; } return false; } + + bool CheckSkippedFilesRange(const Slice& internal_key) { + auto skipped_files = compaction->skipped_output_level_files(); + auto icmp = compaction->column_family_data()->internal_comparator(); + + bool should_stop = false; + while (current_skipped_file_index < skipped_files.size()) { + auto file = skipped_files[current_skipped_file_index]; + if (icmp.Compare(internal_key, file->smallest.Encode()) < 0) { + break; + } + should_stop = true; + current_skipped_file_index++; + } + return should_stop; + } }; // Maintains state for the entire compaction diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 3357e06319d..f978c523006 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -16,6 +16,7 @@ #include #include #include "db/column_family.h" +#include "db/version_set.h" #include "file/filename.h" #include "logging/log_buffer.h" #include "monitoring/statistics.h" @@ -1107,4 +1108,12 @@ bool CompactionPicker::GetOverlappingL0Files( return true; } +Compaction* CompactionPicker::PickCompaction( + const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { + return PickCompaction(cf_name, mutable_cf_options, + version->storage_info(), log_buffer); +} + } // namespace rocksdb diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 53477014cf6..0b0a3307685 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -59,6 +59,11 @@ class CompactionPicker { VersionStorageInfo* vstorage, LogBuffer* log_buffer) = 0; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + Version* version, + LogBuffer* log_buffer); + // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that // level that overlaps the specified range. Caller should delete diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index cc0f19b8171..5c6ab454c06 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -57,7 +57,7 @@ class LevelCompactionBuilder { ioptions_(ioptions) {} // Pick and return a compaction. - Compaction* PickCompaction(); + Compaction* PickCompaction(Version* version); // Pick the initial files to compact to the next level. (or together // in Intra-L0 compactions) @@ -69,7 +69,7 @@ class LevelCompactionBuilder { // Based on initial files, setup other files need to be compacted // in this compaction, accordingly. - bool SetupOtherInputsIfNeeded(); + bool SetupOtherInputsIfNeeded(Version* version); Compaction* GetCompaction(); @@ -108,6 +108,7 @@ class LevelCompactionBuilder { CompactionInputFiles start_level_inputs_; std::vector compaction_inputs_; CompactionInputFiles output_level_inputs_; + std::vector skipped_output_level_files_; std::vector grandparents_; CompactionReason compaction_reason_ = CompactionReason::kUnknown; @@ -304,7 +305,7 @@ bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() { return true; } -bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { +bool LevelCompactionBuilder::SetupOtherInputsIfNeeded(Version* version) { // Setup input files from output level. For output to L0, we only compact // spans of files that do not interact with any pending compactions, so don't // need to consider other levels. @@ -316,6 +317,12 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { return false; } + if (version && !output_level_inputs_.empty()) { + version->SkipNonOverlappingOutputLevelFiles( + start_level_inputs_.level, start_level_inputs_.files, + &output_level_inputs_.files, &skipped_output_level_files_); + } + compaction_inputs_.push_back(start_level_inputs_); if (!output_level_inputs_.empty()) { compaction_inputs_.push_back(output_level_inputs_); @@ -341,7 +348,7 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { return true; } -Compaction* LevelCompactionBuilder::PickCompaction() { +Compaction* LevelCompactionBuilder::PickCompaction(Version* version) { // Pick up the first file to start compaction. It may have been extended // to a clean cut. SetupInitialFiles(); @@ -358,7 +365,7 @@ Compaction* LevelCompactionBuilder::PickCompaction() { // Pick files in the output level and expand more files in the start level // if needed. - if (!SetupOtherInputsIfNeeded()) { + if (!SetupOtherInputsIfNeeded(version)) { return nullptr; } @@ -383,7 +390,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() { output_level_, vstorage_->base_level()), GetCompressionOptions(ioptions_, vstorage_, output_level_), /* max_subcompactions */ 0, std::move(grandparents_), is_manual_, - start_level_score_, false /* deletion_compaction */, compaction_reason_); + start_level_score_, false /* deletion_compaction */, compaction_reason_, + skipped_output_level_files_); // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel @@ -548,6 +556,15 @@ Compaction* LevelCompactionPicker::PickCompaction( VersionStorageInfo* vstorage, LogBuffer* log_buffer) { LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer, mutable_cf_options, ioptions_); - return builder.PickCompaction(); + return builder.PickCompaction(nullptr); } + +Compaction* LevelCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { + LevelCompactionBuilder builder(cf_name, version->storage_info(), this, + log_buffer, mutable_cf_options, ioptions_); + return builder.PickCompaction(version); +} + } // namespace rocksdb diff --git a/db/compaction/compaction_picker_level.h b/db/compaction/compaction_picker_level.h index 9fc196698a1..fb76de208aa 100644 --- a/db/compaction/compaction_picker_level.h +++ b/db/compaction/compaction_picker_level.h @@ -24,6 +24,10 @@ class LevelCompactionPicker : public CompactionPicker { const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + Version* version, + LogBuffer* log_buffer) override; virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index bf301d9834a..824c02110f1 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4768,6 +4768,59 @@ TEST_F(DBCompactionTest, ConsistencyFailTest) { ASSERT_NOK(Put("foo", "bar")); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } + +TEST_F(DBCompactionTest, SkipNonOverlappingOutputLevelFiles) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + DestroyAndReopen(options); + Random rnd(301); + + // L0: [5, 5] + ASSERT_OK(Put(Key(5), Key(5))); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 1; + db_->CompactRange(cro, nullptr, nullptr); + dbfull()->TEST_WaitForCompact(); + // L1: [5, 5] + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 1); + + // L0: [2, 8], [3, 7] + ASSERT_OK(Put(Key(2), Key(2))); + ASSERT_OK(Put(Key(8), Key(8))); + Flush(); + ASSERT_OK(Put(Key(3), Key(3))); + ASSERT_OK(Put(Key(7), Key(7))); + Flush(); + dbfull()->TEST_WaitForCompact(); + // L1: [2, 3], [5, 5], [7, 8] + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 3); + + // L0: [1, 4, 6], [0, 2, 9] + ASSERT_OK(Put(Key(1), Key(1))); + ASSERT_OK(Put(Key(4), Key(4))); + ASSERT_OK(Put(Key(6), Key(6))); + Flush(); + ASSERT_OK(Put(Key(0), Key(0))); + ASSERT_OK(Put(Key(2), Key(2))); + ASSERT_OK(Put(Key(9), Key(9))); + Flush(); + dbfull()->TEST_WaitForCompact(); + // L1: [0, 4], [5, 5], [6, 6], [7, 8], [9, 9] + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 5); + + for (int i = 0; i < 10; i++) { + ASSERT_EQ(Get(Key(i)), Key(i)); + } +} + #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index a73806b8199..6b45affc367 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1671,6 +1671,68 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, return status; } +void Version::SkipNonOverlappingOutputLevelFiles( + int start_level, const std::vector& start_level_files, + std::vector* output_level_files, + std::vector* skipped_output_level_files) { + auto icmp = cfd_->internal_comparator(); + auto ucmp = icmp.user_comparator(); + + ReadOptions read_options; + read_options.fill_cache = false; + read_options.total_order_seek = true; + + EnvOptions env_options_for_compaction = + env_->OptimizeForCompactionTableRead(env_options_, *vset_->db_options()); + + Arena arena; + LevelFilesBrief start_level_brief; + ScopedArenaIterator start_level_iter; + if (start_level == 0) { + MergeIteratorBuilder builder(&icmp, &arena); + for (auto file : start_level_files) { + builder.AddIterator(cfd_->table_cache()->NewIterator( + read_options, env_options_for_compaction, icmp, *file, + /*range_del_agg=*/nullptr, + GetMutableCFOptions().prefix_extractor.get(), + /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, + TableReaderCaller::kCompaction, &arena, + /*skip_filters=*/false, start_level, + /*smallest_compaction_key=*/nullptr, + /*largest_compaction_key=*/nullptr)); + } + start_level_iter.set(builder.Finish()); + } else { + DoGenerateLevelFilesBrief(&start_level_brief, start_level_files, &arena); + auto mem = arena.AllocateAligned(sizeof(LevelIterator)); + start_level_iter.set(new (mem) LevelIterator( + cfd_->table_cache(), read_options, env_options_for_compaction, + icmp, &start_level_brief, + GetMutableCFOptions().prefix_extractor.get(), + /*should_sample=*/false, + /*no per level latency histogram=*/nullptr, + TableReaderCaller::kCompaction, /*skip_filters=*/false, + start_level, /*range_del_agg=*/nullptr)); + } + + Status s; + bool overlap = false; + Slice largest_user_key; + std::vector origin_files(std::move(*output_level_files)); + for (auto file : origin_files) { + s = OverlapWithIterator(ucmp, + file->smallest.user_key(), file->largest.user_key(), + start_level_iter.get(), &overlap); + if (s.ok() && !overlap && + ucmp->Compare(file->smallest.user_key(), largest_user_key) > 0) { + skipped_output_level_files->push_back(file); + } else { + output_level_files->push_back(file); + largest_user_key = file->largest.user_key(); + } + } +} + VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, diff --git a/db/version_set.h b/db/version_set.h index 758bd5e5d32..89a8e46482f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -572,6 +572,11 @@ class Version { const Slice& largest_user_key, int level, bool* overlap); + void SkipNonOverlappingOutputLevelFiles( + int start_level, const std::vector& start_level_files, + std::vector* output_level_files, + std::vector* skipped_output_level_files); + // Lookup the value for key or get all merge operands for key. // If do_merge = true (default) then lookup value for key. // Behavior if do_merge = true: