From 9a726efb987596ed5f6e5fbd6d8950847f45d017 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Thu, 19 Sep 2024 20:36:55 -0700 Subject: [PATCH 1/5] Parse OPTIONS file instead of including options in the compaction input --- db/compaction/compaction_job.h | 8 +- db/compaction/compaction_job_test.cc | 17 +--- db/compaction/compaction_service_job.cc | 81 +++++++----------- db/db_impl/db_impl_secondary.cc | 107 +++++++++++++++--------- 4 files changed, 104 insertions(+), 109 deletions(-) diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 224f4e46f34..7bf8d683de3 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -377,9 +377,8 @@ class CompactionJob { // doesn't contain the LSM tree information, which is passed though MANIFEST // file. struct CompactionServiceInput { - ColumnFamilyDescriptor column_family; - - DBOptions db_options; + std::string options_file; + std::string cf_name; std::vector snapshots; @@ -402,8 +401,7 @@ struct CompactionServiceInput { static Status Read(const std::string& data_str, CompactionServiceInput* obj); Status Write(std::string* output); - // Initialize a dummy ColumnFamilyDescriptor - CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {} + CompactionServiceInput() {} #ifndef NDEBUG bool TEST_Equals(CompactionServiceInput* other); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 8e85a9f96f2..868872547ad 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1568,17 +1568,8 @@ TEST_F(CompactionJobTest, InputSerialization) { const int kStrMaxLen = 1000; Random rnd(static_cast(time(nullptr))); Random64 rnd64(time(nullptr)); - input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); - input.column_family.options.comparator = ReverseBytewiseComparator(); - input.column_family.options.max_bytes_for_level_base = - rnd64.Uniform(UINT64_MAX); - input.column_family.options.disable_auto_compactions = rnd.OneIn(2); - input.column_family.options.compression = kZSTD; - input.column_family.options.compression_opts.level = 4; - input.db_options.max_background_flushes = 10; - input.db_options.paranoid_checks = rnd.OneIn(2); - input.db_options.statistics = CreateDBStatistics(); - input.db_options.env = env_; + input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); + input.options_file = "OPTIONS-007"; while (!rnd.OneIn(10)) { input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); } @@ -1606,10 +1597,10 @@ TEST_F(CompactionJobTest, InputSerialization) { ASSERT_TRUE(deserialized1.TEST_Equals(&input)); // Test mismatch - deserialized1.db_options.max_background_flushes += 10; + deserialized1.output_level += 10; std::string mismatch; ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch)); - ASSERT_EQ(mismatch, "db_options.max_background_flushes"); + ASSERT_EQ(mismatch, "output_level"); // Test unknown field CompactionServiceInput deserialized2; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index a923e4fcc49..01de45e2039 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -15,6 +15,7 @@ #include "monitoring/thread_status_util.h" #include "options/options_helper.h" #include "rocksdb/utilities/options_type.h" +#include "rocksdb/utilities/options_util.h" namespace ROCKSDB_NAMESPACE { class SubcompactionState; @@ -28,6 +29,13 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( const Compaction* compaction = sub_compact->compaction; CompactionServiceInput compaction_input; + Status s = GetLatestOptionsFileName(dbname_, db_options_.env, + &compaction_input.options_file); + if (!s.ok()) { + sub_compact->status = s; + return CompactionServiceJobStatus::kFailure; + } + compaction_input.output_level = compaction->output_level(); compaction_input.db_id = db_id_; @@ -39,12 +47,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( MakeTableFileName(file->fd.GetNumber())); } } - compaction_input.column_family.name = - compaction->column_family_data()->GetName(); - compaction_input.column_family.options = - compaction->column_family_data()->GetLatestCFOptions(); - compaction_input.db_options = - BuildDBOptions(db_options_, mutable_db_options_copy_); + + compaction_input.cf_name = compaction->column_family_data()->GetName(); compaction_input.snapshots = existing_snapshots_; compaction_input.has_begin = sub_compact->start.has_value(); compaction_input.begin = @@ -54,7 +58,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.has_end ? sub_compact->end->ToString() : ""; std::string compaction_input_binary; - Status s = compaction_input.Write(&compaction_input_binary); + s = compaction_input.Write(&compaction_input_binary); if (!s.ok()) { sub_compact->status = s; return CompactionServiceJobStatus::kFailure; @@ -70,7 +74,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); @@ -84,13 +88,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), + job_id_); return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return response.status; default: assert(false); // unknown status @@ -99,7 +104,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); std::string compaction_result_binary; CompactionServiceJobStatus compaction_status = db_options_.compaction_service->Wait(response.scheduled_job_id, @@ -109,7 +114,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -134,9 +139,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "result is returned)."); compaction_result.status.PermitUncheckedError(); } - ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_WARN( + db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.", + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -162,7 +167,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( db_options_.info_log, "[%s] [JOB %d] Received remote compaction result, output path: " "%s, files: %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_result.output_path.c_str(), output_files_oss.str().c_str()); // Installation Starts @@ -264,8 +269,8 @@ Status CompactionServiceCompactionJob::Run() { const VersionStorageInfo* storage_info = c->input_version()->storage_info(); assert(storage_info); assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); + bottommost_level_ = c->bottommost_level(); Slice begin = compaction_input_.begin; @@ -404,42 +409,12 @@ static std::unordered_map cfd_type_info = { }; static std::unordered_map cs_input_type_info = { - {"column_family", - OptionTypeInfo::Struct( - "column_family", &cfd_type_info, - offsetof(struct CompactionServiceInput, column_family), - OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, - {"db_options", - {offsetof(struct CompactionServiceInput, db_options), - OptionType::kConfigurable, OptionVerificationType::kNormal, - OptionTypeFlags::kNone, - [](const ConfigOptions& opts, const std::string& /*name*/, - const std::string& value, void* addr) { - auto options = static_cast(addr); - return GetDBOptionsFromString(opts, DBOptions(), value, options); - }, - [](const ConfigOptions& opts, const std::string& /*name*/, - const void* addr, std::string* value) { - const auto options = static_cast(addr); - std::string result; - auto status = GetStringFromDBOptions(opts, *options, &result); - *value = "{" + result + "}"; - return status; - }, - [](const ConfigOptions& opts, const std::string& name, const void* addr1, - const void* addr2, std::string* mismatch) { - const auto this_one = static_cast(addr1); - const auto that_one = static_cast(addr2); - auto this_conf = DBOptionsAsConfigurable(*this_one); - auto that_conf = DBOptionsAsConfigurable(*that_one); - std::string mismatch_opt; - bool result = - this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); - if (!result) { - *mismatch = name + "." + mismatch_opt; - } - return result; - }}}, + {"options_file", + {offsetof(struct CompactionServiceInput, options_file), + OptionType::kEncodedString}}, + {"cf_name", + {offsetof(struct CompactionServiceInput, cf_name), + OptionType::kEncodedString}}, {"snapshots", OptionTypeInfo::Vector( offsetof(struct CompactionServiceInput, snapshots), OptionVerificationType::kNormal, OptionTypeFlags::kNone, diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 92944d11818..d04c920b74b 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -13,6 +13,8 @@ #include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "rocksdb/configurable.h" +#include "rocksdb/convenience.h" +#include "rocksdb/utilities/options_util.h" #include "util/cast_util.h" #include "util/write_batch_util.h" @@ -938,69 +940,98 @@ Status DB::OpenAndCompact( const std::string& output_directory, const std::string& input, std::string* output, const CompactionServiceOptionsOverride& override_options) { + // Check for cancellation if (options.canceled && options.canceled->load(std::memory_order_acquire)) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + + // 1. Deserialize Compaction Input CompactionServiceInput compaction_input; Status s = CompactionServiceInput::Read(input, &compaction_input); if (!s.ok()) { return s; } - compaction_input.db_options.max_open_files = -1; - compaction_input.db_options.compaction_service = nullptr; - if (compaction_input.db_options.statistics) { - compaction_input.db_options.statistics.reset(); + // 2. Parse Base DBOptions from OPTIONS File + DBOptions db_options; + ConfigOptions config_options; + config_options.env = override_options.env; + std::vector all_column_families; + s = LoadOptionsFromFile(config_options, + name + "/" + compaction_input.options_file, + &db_options, &all_column_families); + if (!s.ok()) { + return s; } - compaction_input.db_options.env = override_options.env; - compaction_input.db_options.file_checksum_gen_factory = - override_options.file_checksum_gen_factory; - compaction_input.db_options.statistics = override_options.statistics; - compaction_input.column_family.options.comparator = - override_options.comparator; - compaction_input.column_family.options.merge_operator = - override_options.merge_operator; - compaction_input.column_family.options.compaction_filter = - override_options.compaction_filter; - compaction_input.column_family.options.compaction_filter_factory = - override_options.compaction_filter_factory; - compaction_input.column_family.options.prefix_extractor = - override_options.prefix_extractor; - compaction_input.column_family.options.table_factory = - override_options.table_factory; - compaction_input.column_family.options.sst_partitioner_factory = - override_options.sst_partitioner_factory; - compaction_input.column_family.options.table_properties_collector_factories = - override_options.table_properties_collector_factories; - compaction_input.db_options.listeners = override_options.listeners; + // 3. Override pointer configurations in DBOptions with + // CompactionServiceOptionsOverride + db_options.env = override_options.env; + db_options.file_checksum_gen_factory = + override_options.file_checksum_gen_factory; + db_options.statistics = override_options.statistics; + db_options.listeners = override_options.listeners; + db_options.compaction_service = nullptr; + // We will close the DB after the compaction anyway. + // Open as many files as needed for the compaction. + db_options.max_open_files = -1; + + // 4. Filter CFs that are needed for OpenAndCompact() + // We do not need to open all column families for the remote compaction. + // Only open default CF + target CF. If target CF == default CF, we will open + // just the default CF (Due to current limitation, DB cannot open without the + // default CF) std::vector column_families; - column_families.push_back(compaction_input.column_family); - // TODO: we have to open default CF, because of an implementation limitation, - // currently we just use the same CF option from input, which is not collect - // and open may fail. - if (compaction_input.column_family.name != kDefaultColumnFamilyName) { - column_families.emplace_back(kDefaultColumnFamilyName, - compaction_input.column_family.options); + for (auto& cf : all_column_families) { + if (cf.name == compaction_input.cf_name) { + cf.options.comparator = override_options.comparator; + cf.options.merge_operator = override_options.merge_operator; + cf.options.compaction_filter = override_options.compaction_filter; + cf.options.compaction_filter_factory = + override_options.compaction_filter_factory; + cf.options.prefix_extractor = override_options.prefix_extractor; + cf.options.table_factory = override_options.table_factory; + cf.options.sst_partitioner_factory = + override_options.sst_partitioner_factory; + cf.options.table_properties_collector_factories = + override_options.table_properties_collector_factories; + column_families.emplace_back(cf); + } else if (cf.name == kDefaultColumnFamilyName) { + column_families.emplace_back(cf); + } } + // 5. Open db As Secondary DB* db; std::vector handles; - - s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, - column_families, &handles, &db); + s = DB::OpenAsSecondary(db_options, name, output_directory, column_families, + &handles, &db); if (!s.ok()) { return s; } + assert(db); + + // 6. Find the handle of the Column Family that this will compact + ColumnFamilyHandle* cfh; + for (auto* handle : handles) { + if (compaction_input.cf_name == handle->GetName()) { + cfh = handle; + break; + } + } + assert(cfh); + // 7. Run the compaction without installation. + // Output will be stored in the directory specified by output_directory CompactionServiceResult compaction_result; DBImplSecondary* db_secondary = static_cast_with_check(db); - assert(handles.size() > 0); - s = db_secondary->CompactWithoutInstallation( - options, handles[0], compaction_input, &compaction_result); + s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input, + &compaction_result); + // 8. Serialize the result Status serialization_status = compaction_result.Write(output); + // 9. Close the db and return for (auto& handle : handles) { delete handle; } From 764598aa75ece539960c4a6fb973b146c95b4996 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Thu, 19 Sep 2024 20:51:20 -0700 Subject: [PATCH 2/5] Fix test --- db/db_impl/db_impl_secondary.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d04c920b74b..9215e77f509 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -1012,7 +1012,7 @@ Status DB::OpenAndCompact( assert(db); // 6. Find the handle of the Column Family that this will compact - ColumnFamilyHandle* cfh; + ColumnFamilyHandle* cfh = nullptr; for (auto* handle : handles) { if (compaction_input.cf_name == handle->GetName()) { cfh = handle; From 4515e33efe7e4df0f97e104c2530e6d7faa886ff Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 20 Sep 2024 07:08:18 -0700 Subject: [PATCH 3/5] linter fix and minor cleanup --- db/compaction/compaction_job.h | 2 -- db/db_impl/db_impl_secondary.cc | 1 - include/rocksdb/options.h | 3 --- 3 files changed, 6 deletions(-) diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 7bf8d683de3..294b107143a 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -401,8 +401,6 @@ struct CompactionServiceInput { static Status Read(const std::string& data_str, CompactionServiceInput* obj); Status Write(std::string* output); - CompactionServiceInput() {} - #ifndef NDEBUG bool TEST_Equals(CompactionServiceInput* other); bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 9215e77f509..e41c94abca6 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -12,7 +12,6 @@ #include "logging/auto_roll_logger.h" #include "logging/logging.h" #include "monitoring/perf_context_imp.h" -#include "rocksdb/configurable.h" #include "rocksdb/convenience.h" #include "rocksdb/utilities/options_util.h" #include "util/cast_util.h" diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 507f9bab80a..27feadb804e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2295,9 +2295,6 @@ struct SizeApproximationOptions { }; struct CompactionServiceOptionsOverride { - // Currently pointer configurations are not passed to compaction service - // compaction so the user needs to set it. It will be removed once pointer - // configuration passing is supported. Env* env = Env::Default(); std::shared_ptr file_checksum_gen_factory = nullptr; From 5a4ae3bc9c73a367806b475f87b6105f25ed515a Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 20 Sep 2024 11:13:18 -0700 Subject: [PATCH 4/5] Remote options file name from payload completely --- db/compaction/compaction_job.h | 1 - db/compaction/compaction_job_test.cc | 1 - db/compaction/compaction_service_job.cc | 11 +---------- db/db_impl/db_impl_secondary.cc | 15 ++++++++++----- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 294b107143a..dd3b5373737 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -377,7 +377,6 @@ class CompactionJob { // doesn't contain the LSM tree information, which is passed though MANIFEST // file. struct CompactionServiceInput { - std::string options_file; std::string cf_name; std::vector snapshots; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 868872547ad..e286817e6fe 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1569,7 +1569,6 @@ TEST_F(CompactionJobTest, InputSerialization) { Random rnd(static_cast(time(nullptr))); Random64 rnd64(time(nullptr)); input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); - input.options_file = "OPTIONS-007"; while (!rnd.OneIn(10)) { input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); } diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 01de45e2039..0bb994caea3 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -29,12 +29,6 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( const Compaction* compaction = sub_compact->compaction; CompactionServiceInput compaction_input; - Status s = GetLatestOptionsFileName(dbname_, db_options_.env, - &compaction_input.options_file); - if (!s.ok()) { - sub_compact->status = s; - return CompactionServiceJobStatus::kFailure; - } compaction_input.output_level = compaction->output_level(); compaction_input.db_id = db_id_; @@ -58,7 +52,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.has_end ? sub_compact->end->ToString() : ""; std::string compaction_input_binary; - s = compaction_input.Write(&compaction_input_binary); + Status s = compaction_input.Write(&compaction_input_binary); if (!s.ok()) { sub_compact->status = s; return CompactionServiceJobStatus::kFailure; @@ -409,9 +403,6 @@ static std::unordered_map cfd_type_info = { }; static std::unordered_map cs_input_type_info = { - {"options_file", - {offsetof(struct CompactionServiceInput, options_file), - OptionType::kEncodedString}}, {"cf_name", {offsetof(struct CompactionServiceInput, cf_name), OptionType::kEncodedString}}, diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index e41c94abca6..fb7ea1110e2 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -951,16 +951,21 @@ Status DB::OpenAndCompact( return s; } - // 2. Parse Base DBOptions from OPTIONS File + // 2. Load the options from latest OPTIONS file DBOptions db_options; ConfigOptions config_options; config_options.env = override_options.env; std::vector all_column_families; - s = LoadOptionsFromFile(config_options, - name + "/" + compaction_input.options_file, - &db_options, &all_column_families); + s = LoadLatestOptions(config_options, name, &db_options, + &all_column_families); + // In a very rare scenario, loading options may fail if the options changed by + // the primary host at the same time. Just retry once for now. if (!s.ok()) { - return s; + s = LoadLatestOptions(config_options, name, &db_options, + &all_column_families); + if (!s.ok()) { + return s; + } } // 3. Override pointer configurations in DBOptions with From 5a76e5bde4cecc425ab6178825681190a79c139c Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 20 Sep 2024 11:36:29 -0700 Subject: [PATCH 5/5] fix linter --- db/compaction/compaction_service_job.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 0bb994caea3..8a8db33627b 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -15,7 +15,6 @@ #include "monitoring/thread_status_util.h" #include "options/options_helper.h" #include "rocksdb/utilities/options_type.h" -#include "rocksdb/utilities/options_util.h" namespace ROCKSDB_NAMESPACE { class SubcompactionState; @@ -29,7 +28,6 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( const Compaction* compaction = sub_compact->compaction; CompactionServiceInput compaction_input; - compaction_input.output_level = compaction->output_level(); compaction_input.db_id = db_id_;