Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compaction: skip output level files with no data overlap #6021

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ bool ColumnFamilyData::NeedsCompaction() const {
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
GetName(), mutable_options, current_, log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down
4 changes: 3 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score,
bool _deletion_compaction,
CompactionReason _compaction_reason)
CompactionReason _compaction_reason,
std::vector<FileMetaData*> _skipped_output_level_files)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand All @@ -233,6 +234,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
grandparents_(std::move(_grandparents)),
score_(_score),
skipped_output_level_files_(_skipped_output_level_files),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class Compaction {
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
CompactionReason compaction_reason = CompactionReason::kUnknown);
CompactionReason compaction_reason = CompactionReason::kUnknown,
std::vector<FileMetaData*> skipped_output_level_files = {});

// No copying allowed
Compaction(const Compaction&) = delete;
Expand Down Expand Up @@ -153,6 +154,10 @@ class Compaction {
return &input_levels_[compaction_input_level];
}

const std::vector<FileMetaData*>& skipped_output_level_files() const {
return skipped_output_level_files_;
}

// Maximum size of files to build during this compaction.
uint64_t max_output_file_size() const { return max_output_file_size_; }

Expand Down Expand Up @@ -350,6 +355,8 @@ class Compaction {
std::vector<FileMetaData*> grandparents_;
const double score_; // score that was used to pick this compaction.

const std::vector<FileMetaData*> skipped_output_level_files_;

// Is this compaction creating a file in the bottom most level?
const bool bottommost_level_;
// Does this compaction include all sst files?
Expand Down
28 changes: 27 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ struct CompactionJob::SubcompactionState {
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;

size_t current_skipped_file_index = 0;

SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
: compaction(c),
Expand Down Expand Up @@ -204,6 +206,15 @@ struct CompactionJob::SubcompactionState {
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
if (CheckOverlappedBytes(internal_key, curr_file_size) ||
CheckSkippedFilesRange(internal_key)) {
overlapped_bytes = 0;
return true;
}
return false;
}

bool CheckOverlappedBytes(const Slice& internal_key, uint64_t curr_file_size) {
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
Expand All @@ -227,12 +238,27 @@ struct CompactionJob::SubcompactionState {
if (overlapped_bytes + curr_file_size >
compaction->max_compaction_bytes()) {
// Too much overlap for current output; start new output
overlapped_bytes = 0;
return true;
}

return false;
}

bool CheckSkippedFilesRange(const Slice& internal_key) {
auto skipped_files = compaction->skipped_output_level_files();
auto icmp = compaction->column_family_data()->internal_comparator();

bool should_stop = false;
while (current_skipped_file_index < skipped_files.size()) {
auto file = skipped_files[current_skipped_file_index];
if (icmp.Compare(internal_key, file->smallest.Encode()) < 0) {
break;
}
should_stop = true;
current_skipped_file_index++;
}
return should_stop;
}
};

// Maintains state for the entire compaction
Expand Down
9 changes: 9 additions & 0 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <utility>
#include <vector>
#include "db/column_family.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "logging/log_buffer.h"
#include "monitoring/statistics.h"
Expand Down Expand Up @@ -1107,4 +1108,12 @@ bool CompactionPicker::GetOverlappingL0Files(
return true;
}

Compaction* CompactionPicker::PickCompaction(
const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
return PickCompaction(cf_name, mutable_cf_options,
version->storage_info(), log_buffer);
}

} // namespace rocksdb
5 changes: 5 additions & 0 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class CompactionPicker {
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) = 0;

virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
Version* version,
LogBuffer* log_buffer);

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
Expand Down
31 changes: 24 additions & 7 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class LevelCompactionBuilder {
ioptions_(ioptions) {}

// Pick and return a compaction.
Compaction* PickCompaction();
Compaction* PickCompaction(Version* version);

// Pick the initial files to compact to the next level. (or together
// in Intra-L0 compactions)
Expand All @@ -69,7 +69,7 @@ class LevelCompactionBuilder {

// Based on initial files, setup other files need to be compacted
// in this compaction, accordingly.
bool SetupOtherInputsIfNeeded();
bool SetupOtherInputsIfNeeded(Version* version);

Compaction* GetCompaction();

Expand Down Expand Up @@ -108,6 +108,7 @@ class LevelCompactionBuilder {
CompactionInputFiles start_level_inputs_;
std::vector<CompactionInputFiles> compaction_inputs_;
CompactionInputFiles output_level_inputs_;
std::vector<FileMetaData*> skipped_output_level_files_;
std::vector<FileMetaData*> grandparents_;
CompactionReason compaction_reason_ = CompactionReason::kUnknown;

Expand Down Expand Up @@ -304,7 +305,7 @@ bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
return true;
}

bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
bool LevelCompactionBuilder::SetupOtherInputsIfNeeded(Version* version) {
// Setup input files from output level. For output to L0, we only compact
// spans of files that do not interact with any pending compactions, so don't
// need to consider other levels.
Expand All @@ -316,6 +317,12 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
return false;
}

if (version && !output_level_inputs_.empty()) {
version->SkipNonOverlappingOutputLevelFiles(
start_level_inputs_.level, start_level_inputs_.files,
&output_level_inputs_.files, &skipped_output_level_files_);
}

compaction_inputs_.push_back(start_level_inputs_);
if (!output_level_inputs_.empty()) {
compaction_inputs_.push_back(output_level_inputs_);
Expand All @@ -341,7 +348,7 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
return true;
}

Compaction* LevelCompactionBuilder::PickCompaction() {
Compaction* LevelCompactionBuilder::PickCompaction(Version* version) {
// Pick up the first file to start compaction. It may have been extended
// to a clean cut.
SetupInitialFiles();
Expand All @@ -358,7 +365,7 @@ Compaction* LevelCompactionBuilder::PickCompaction() {

// Pick files in the output level and expand more files in the start level
// if needed.
if (!SetupOtherInputsIfNeeded()) {
if (!SetupOtherInputsIfNeeded(version)) {
return nullptr;
}

Expand All @@ -383,7 +390,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
output_level_, vstorage_->base_level()),
GetCompressionOptions(ioptions_, vstorage_, output_level_),
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
start_level_score_, false /* deletion_compaction */, compaction_reason_,
skipped_output_level_files_);

// If it's level 0 compaction, make sure we don't execute any other level 0
// compactions in parallel
Expand Down Expand Up @@ -548,6 +556,15 @@ Compaction* LevelCompactionPicker::PickCompaction(
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer,
mutable_cf_options, ioptions_);
return builder.PickCompaction();
return builder.PickCompaction(nullptr);
}

Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
LevelCompactionBuilder builder(cf_name, version->storage_info(), this,
log_buffer, mutable_cf_options, ioptions_);
return builder.PickCompaction(version);
}

} // namespace rocksdb
4 changes: 4 additions & 0 deletions db/compaction/compaction_picker_level.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class LevelCompactionPicker : public CompactionPicker {
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
Version* version,
LogBuffer* log_buffer) override;

virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
Expand Down
53 changes: 53 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4768,6 +4768,59 @@ TEST_F(DBCompactionTest, ConsistencyFailTest) {
ASSERT_NOK(Put("foo", "bar"));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBCompactionTest, SkipNonOverlappingOutputLevelFiles) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
DestroyAndReopen(options);
Random rnd(301);

// L0: [5, 5]
ASSERT_OK(Put(Key(5), Key(5)));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 1);

CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 1;
db_->CompactRange(cro, nullptr, nullptr);
dbfull()->TEST_WaitForCompact();
// L1: [5, 5]
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);

// L0: [2, 8], [3, 7]
ASSERT_OK(Put(Key(2), Key(2)));
ASSERT_OK(Put(Key(8), Key(8)));
Flush();
ASSERT_OK(Put(Key(3), Key(3)));
ASSERT_OK(Put(Key(7), Key(7)));
Flush();
dbfull()->TEST_WaitForCompact();
// L1: [2, 3], [5, 5], [7, 8]
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 3);

// L0: [1, 4, 6], [0, 2, 9]
ASSERT_OK(Put(Key(1), Key(1)));
ASSERT_OK(Put(Key(4), Key(4)));
ASSERT_OK(Put(Key(6), Key(6)));
Flush();
ASSERT_OK(Put(Key(0), Key(0)));
ASSERT_OK(Put(Key(2), Key(2)));
ASSERT_OK(Put(Key(9), Key(9)));
Flush();
dbfull()->TEST_WaitForCompact();
// L1: [0, 4], [5, 5], [6, 6], [7, 8], [9, 9]
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 5);

for (int i = 0; i < 10; i++) {
ASSERT_EQ(Get(Key(i)), Key(i));
}
}

#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb

Expand Down
62 changes: 62 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,68 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
return status;
}

void Version::SkipNonOverlappingOutputLevelFiles(
int start_level, const std::vector<FileMetaData*>& start_level_files,
std::vector<FileMetaData*>* output_level_files,
std::vector<FileMetaData*>* skipped_output_level_files) {
auto icmp = cfd_->internal_comparator();
auto ucmp = icmp.user_comparator();

ReadOptions read_options;
read_options.fill_cache = false;
read_options.total_order_seek = true;

EnvOptions env_options_for_compaction =
env_->OptimizeForCompactionTableRead(env_options_, *vset_->db_options());

Arena arena;
LevelFilesBrief start_level_brief;
ScopedArenaIterator start_level_iter;
if (start_level == 0) {
MergeIteratorBuilder builder(&icmp, &arena);
for (auto file : start_level_files) {
builder.AddIterator(cfd_->table_cache()->NewIterator(
read_options, env_options_for_compaction, icmp, *file,
/*range_del_agg=*/nullptr,
GetMutableCFOptions().prefix_extractor.get(),
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kCompaction, &arena,
/*skip_filters=*/false, start_level,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr));
}
start_level_iter.set(builder.Finish());
} else {
DoGenerateLevelFilesBrief(&start_level_brief, start_level_files, &arena);
auto mem = arena.AllocateAligned(sizeof(LevelIterator));
start_level_iter.set(new (mem) LevelIterator(
cfd_->table_cache(), read_options, env_options_for_compaction,
icmp, &start_level_brief,
GetMutableCFOptions().prefix_extractor.get(),
/*should_sample=*/false,
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
start_level, /*range_del_agg=*/nullptr));
}

Status s;
bool overlap = false;
Slice largest_user_key;
std::vector<FileMetaData*> origin_files(std::move(*output_level_files));
for (auto file : origin_files) {
s = OverlapWithIterator(ucmp,
file->smallest.user_key(), file->largest.user_key(),
start_level_iter.get(), &overlap);
if (s.ok() && !overlap &&
ucmp->Compare(file->smallest.user_key(), largest_user_key) > 0) {
skipped_output_level_files->push_back(file);
} else {
output_level_files->push_back(file);
largest_user_key = file->largest.user_key();
}
}
}

VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
Expand Down
5 changes: 5 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,11 @@ class Version {
const Slice& largest_user_key,
int level, bool* overlap);

void SkipNonOverlappingOutputLevelFiles(
int start_level, const std::vector<FileMetaData*>& start_level_files,
std::vector<FileMetaData*>* output_level_files,
std::vector<FileMetaData*>* skipped_output_level_files);

// Lookup the value for key or get all merge operands for key.
// If do_merge = true (default) then lookup value for key.
// Behavior if do_merge = true:
Expand Down