Skip to content

Commit

Permalink
Allow CFOptions to be set by createFromString in remote compaction in…
Browse files Browse the repository at this point in the history
…stead of by CompactionServiceOptionsOverride
  • Loading branch information
jaykorean committed Aug 28, 2024
1 parent 92ad4a8 commit 5b37daf
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 42 deletions.
56 changes: 29 additions & 27 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,30 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/object_registry.h"
#include "table/unique_id_impl.h"

namespace ROCKSDB_NAMESPACE {

class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}

static const char* kClassName() { return "PartialDeleteCompactionFilter"; }
const char* Name() const override { return kClassName(); }
};

class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(
Expand All @@ -25,7 +42,17 @@ class MyTestCompactionService : public CompactionService {
wait_info_("na", "na", "na", 0, Env::TOTAL),
listeners_(listeners),
table_properties_collector_factories_(
std::move(table_properties_collector_factories)) {}
std::move(table_properties_collector_factories)) {
// Register Compaction Filter
const auto& library = ObjectLibrary::Default();
library->AddFactory<CompactionFilter>(
PartialDeleteCompactionFilter::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<rocksdb::CompactionFilter>*,
std::string* /* errmsg */) {
return new PartialDeleteCompactionFilter();
});
}

static const char* kClassName() { return "MyTestCompactionService"; }

Expand Down Expand Up @@ -73,19 +100,10 @@ class MyTestCompactionService : public CompactionService {
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;
if (!listeners_.empty()) {
options_override.listeners = listeners_;
}

if (!table_properties_collector_factories_.empty()) {
options_override.table_properties_collector_factories =
table_properties_collector_factories_;
Expand Down Expand Up @@ -477,22 +495,6 @@ TEST_F(CompactionServiceTest, SubCompaction) {
ASSERT_GE(compaction_num, 2);
}

class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}

const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};

TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
std::unique_ptr<CompactionFilter> delete_comp_filter(
Expand Down
17 changes: 2 additions & 15 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -956,23 +956,10 @@ Status DB::OpenAndCompact(
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =
override_options.merge_operator;
compaction_input.column_family.options.compaction_filter =
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
compaction_input.db_options.listeners = override_options.listeners;

compaction_input.column_family.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
compaction_input.db_options.listeners = override_options.listeners;

std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family);
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2289,13 +2289,17 @@ struct CompactionServiceOptionsOverride {
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;

// DEPRECATED - CFOption pointer configurations will be passed as static name
// These will need to be registered in the remote worker's ObjectRegistry,
// then created by Type::CreateFromString() when needed.
const Comparator* comparator = BytewiseComparator();
std::shared_ptr<MergeOperator> merge_operator = nullptr;
const CompactionFilter* compaction_filter = nullptr;
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory = nullptr;
std::shared_ptr<const SliceTransform> prefix_extractor = nullptr;
std::shared_ptr<TableFactory> table_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;
// END of deprecation marker

// Only subsets of events are triggered in remote compaction worker, like:
// `OnTableFileCreated`, `OnTableFileCreationStarted`,
Expand Down

0 comments on commit 5b37daf

Please sign in to comment.