diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 224f4e46f34..7bf8d683de3 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -377,9 +377,8 @@ class CompactionJob { // doesn't contain the LSM tree information, which is passed though MANIFEST // file. struct CompactionServiceInput { - ColumnFamilyDescriptor column_family; - - DBOptions db_options; + std::string options_file; + std::string cf_name; std::vector snapshots; @@ -402,8 +401,7 @@ struct CompactionServiceInput { static Status Read(const std::string& data_str, CompactionServiceInput* obj); Status Write(std::string* output); - // Initialize a dummy ColumnFamilyDescriptor - CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {} + CompactionServiceInput() {} #ifndef NDEBUG bool TEST_Equals(CompactionServiceInput* other); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 8e85a9f96f2..868872547ad 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1568,17 +1568,8 @@ TEST_F(CompactionJobTest, InputSerialization) { const int kStrMaxLen = 1000; Random rnd(static_cast(time(nullptr))); Random64 rnd64(time(nullptr)); - input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); - input.column_family.options.comparator = ReverseBytewiseComparator(); - input.column_family.options.max_bytes_for_level_base = - rnd64.Uniform(UINT64_MAX); - input.column_family.options.disable_auto_compactions = rnd.OneIn(2); - input.column_family.options.compression = kZSTD; - input.column_family.options.compression_opts.level = 4; - input.db_options.max_background_flushes = 10; - input.db_options.paranoid_checks = rnd.OneIn(2); - input.db_options.statistics = CreateDBStatistics(); - input.db_options.env = env_; + input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); + input.options_file = "OPTIONS-007"; while (!rnd.OneIn(10)) { input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); } @@ -1606,10 +1597,10 @@ TEST_F(CompactionJobTest, InputSerialization) { ASSERT_TRUE(deserialized1.TEST_Equals(&input)); // Test mismatch - deserialized1.db_options.max_background_flushes += 10; + deserialized1.output_level += 10; std::string mismatch; ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch)); - ASSERT_EQ(mismatch, "db_options.max_background_flushes"); + ASSERT_EQ(mismatch, "output_level"); // Test unknown field CompactionServiceInput deserialized2; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index a923e4fcc49..01de45e2039 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -15,6 +15,7 @@ #include "monitoring/thread_status_util.h" #include "options/options_helper.h" #include "rocksdb/utilities/options_type.h" +#include "rocksdb/utilities/options_util.h" namespace ROCKSDB_NAMESPACE { class SubcompactionState; @@ -28,6 +29,13 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( const Compaction* compaction = sub_compact->compaction; CompactionServiceInput compaction_input; + Status s = GetLatestOptionsFileName(dbname_, db_options_.env, + &compaction_input.options_file); + if (!s.ok()) { + sub_compact->status = s; + return CompactionServiceJobStatus::kFailure; + } + compaction_input.output_level = compaction->output_level(); compaction_input.db_id = db_id_; @@ -39,12 +47,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( MakeTableFileName(file->fd.GetNumber())); } } - compaction_input.column_family.name = - compaction->column_family_data()->GetName(); - compaction_input.column_family.options = - compaction->column_family_data()->GetLatestCFOptions(); - compaction_input.db_options = - BuildDBOptions(db_options_, mutable_db_options_copy_); + + compaction_input.cf_name = compaction->column_family_data()->GetName(); compaction_input.snapshots = existing_snapshots_; compaction_input.has_begin = sub_compact->start.has_value(); compaction_input.begin = @@ -54,7 +58,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.has_end ? sub_compact->end->ToString() : ""; std::string compaction_input_binary; - Status s = compaction_input.Write(&compaction_input_binary); + s = compaction_input.Write(&compaction_input_binary); if (!s.ok()) { sub_compact->status = s; return CompactionServiceJobStatus::kFailure; @@ -70,7 +74,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); @@ -84,13 +88,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), + job_id_); return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return response.status; default: assert(false); // unknown status @@ -99,7 +104,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); std::string compaction_result_binary; CompactionServiceJobStatus compaction_status = db_options_.compaction_service->Wait(response.scheduled_job_id, @@ -109,7 +114,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -134,9 +139,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "result is returned)."); compaction_result.status.PermitUncheckedError(); } - ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_WARN( + db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.", + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -162,7 +167,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( db_options_.info_log, "[%s] [JOB %d] Received remote compaction result, output path: " "%s, files: %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_result.output_path.c_str(), output_files_oss.str().c_str()); // Installation Starts @@ -264,8 +269,8 @@ Status CompactionServiceCompactionJob::Run() { const VersionStorageInfo* storage_info = c->input_version()->storage_info(); assert(storage_info); assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); + bottommost_level_ = c->bottommost_level(); Slice begin = compaction_input_.begin; @@ -404,42 +409,12 @@ static std::unordered_map cfd_type_info = { }; static std::unordered_map cs_input_type_info = { - {"column_family", - OptionTypeInfo::Struct( - "column_family", &cfd_type_info, - offsetof(struct CompactionServiceInput, column_family), - OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, - {"db_options", - {offsetof(struct CompactionServiceInput, db_options), - OptionType::kConfigurable, OptionVerificationType::kNormal, - OptionTypeFlags::kNone, - [](const ConfigOptions& opts, const std::string& /*name*/, - const std::string& value, void* addr) { - auto options = static_cast(addr); - return GetDBOptionsFromString(opts, DBOptions(), value, options); - }, - [](const ConfigOptions& opts, const std::string& /*name*/, - const void* addr, std::string* value) { - const auto options = static_cast(addr); - std::string result; - auto status = GetStringFromDBOptions(opts, *options, &result); - *value = "{" + result + "}"; - return status; - }, - [](const ConfigOptions& opts, const std::string& name, const void* addr1, - const void* addr2, std::string* mismatch) { - const auto this_one = static_cast(addr1); - const auto that_one = static_cast(addr2); - auto this_conf = DBOptionsAsConfigurable(*this_one); - auto that_conf = DBOptionsAsConfigurable(*that_one); - std::string mismatch_opt; - bool result = - this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); - if (!result) { - *mismatch = name + "." + mismatch_opt; - } - return result; - }}}, + {"options_file", + {offsetof(struct CompactionServiceInput, options_file), + OptionType::kEncodedString}}, + {"cf_name", + {offsetof(struct CompactionServiceInput, cf_name), + OptionType::kEncodedString}}, {"snapshots", OptionTypeInfo::Vector( offsetof(struct CompactionServiceInput, snapshots), OptionVerificationType::kNormal, OptionTypeFlags::kNone, diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 92944d11818..d04c920b74b 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -13,6 +13,8 @@ #include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "rocksdb/configurable.h" +#include "rocksdb/convenience.h" +#include "rocksdb/utilities/options_util.h" #include "util/cast_util.h" #include "util/write_batch_util.h" @@ -938,69 +940,98 @@ Status DB::OpenAndCompact( const std::string& output_directory, const std::string& input, std::string* output, const CompactionServiceOptionsOverride& override_options) { + // Check for cancellation if (options.canceled && options.canceled->load(std::memory_order_acquire)) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + + // 1. Deserialize Compaction Input CompactionServiceInput compaction_input; Status s = CompactionServiceInput::Read(input, &compaction_input); if (!s.ok()) { return s; } - compaction_input.db_options.max_open_files = -1; - compaction_input.db_options.compaction_service = nullptr; - if (compaction_input.db_options.statistics) { - compaction_input.db_options.statistics.reset(); + // 2. Parse Base DBOptions from OPTIONS File + DBOptions db_options; + ConfigOptions config_options; + config_options.env = override_options.env; + std::vector all_column_families; + s = LoadOptionsFromFile(config_options, + name + "/" + compaction_input.options_file, + &db_options, &all_column_families); + if (!s.ok()) { + return s; } - compaction_input.db_options.env = override_options.env; - compaction_input.db_options.file_checksum_gen_factory = - override_options.file_checksum_gen_factory; - compaction_input.db_options.statistics = override_options.statistics; - compaction_input.column_family.options.comparator = - override_options.comparator; - compaction_input.column_family.options.merge_operator = - override_options.merge_operator; - compaction_input.column_family.options.compaction_filter = - override_options.compaction_filter; - compaction_input.column_family.options.compaction_filter_factory = - override_options.compaction_filter_factory; - compaction_input.column_family.options.prefix_extractor = - override_options.prefix_extractor; - compaction_input.column_family.options.table_factory = - override_options.table_factory; - compaction_input.column_family.options.sst_partitioner_factory = - override_options.sst_partitioner_factory; - compaction_input.column_family.options.table_properties_collector_factories = - override_options.table_properties_collector_factories; - compaction_input.db_options.listeners = override_options.listeners; + // 3. Override pointer configurations in DBOptions with + // CompactionServiceOptionsOverride + db_options.env = override_options.env; + db_options.file_checksum_gen_factory = + override_options.file_checksum_gen_factory; + db_options.statistics = override_options.statistics; + db_options.listeners = override_options.listeners; + db_options.compaction_service = nullptr; + // We will close the DB after the compaction anyway. + // Open as many files as needed for the compaction. + db_options.max_open_files = -1; + + // 4. Filter CFs that are needed for OpenAndCompact() + // We do not need to open all column families for the remote compaction. + // Only open default CF + target CF. If target CF == default CF, we will open + // just the default CF (Due to current limitation, DB cannot open without the + // default CF) std::vector column_families; - column_families.push_back(compaction_input.column_family); - // TODO: we have to open default CF, because of an implementation limitation, - // currently we just use the same CF option from input, which is not collect - // and open may fail. - if (compaction_input.column_family.name != kDefaultColumnFamilyName) { - column_families.emplace_back(kDefaultColumnFamilyName, - compaction_input.column_family.options); + for (auto& cf : all_column_families) { + if (cf.name == compaction_input.cf_name) { + cf.options.comparator = override_options.comparator; + cf.options.merge_operator = override_options.merge_operator; + cf.options.compaction_filter = override_options.compaction_filter; + cf.options.compaction_filter_factory = + override_options.compaction_filter_factory; + cf.options.prefix_extractor = override_options.prefix_extractor; + cf.options.table_factory = override_options.table_factory; + cf.options.sst_partitioner_factory = + override_options.sst_partitioner_factory; + cf.options.table_properties_collector_factories = + override_options.table_properties_collector_factories; + column_families.emplace_back(cf); + } else if (cf.name == kDefaultColumnFamilyName) { + column_families.emplace_back(cf); + } } + // 5. Open db As Secondary DB* db; std::vector handles; - - s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, - column_families, &handles, &db); + s = DB::OpenAsSecondary(db_options, name, output_directory, column_families, + &handles, &db); if (!s.ok()) { return s; } + assert(db); + + // 6. Find the handle of the Column Family that this will compact + ColumnFamilyHandle* cfh; + for (auto* handle : handles) { + if (compaction_input.cf_name == handle->GetName()) { + cfh = handle; + break; + } + } + assert(cfh); + // 7. Run the compaction without installation. + // Output will be stored in the directory specified by output_directory CompactionServiceResult compaction_result; DBImplSecondary* db_secondary = static_cast_with_check(db); - assert(handles.size() > 0); - s = db_secondary->CompactWithoutInstallation( - options, handles[0], compaction_input, &compaction_result); + s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input, + &compaction_result); + // 8. Serialize the result Status serialization_status = compaction_result.Write(output); + // 9. Close the db and return for (auto& handle : handles) { delete handle; }