Skip to content

Commit

Permalink
Parse OPTIONS file instead of including options in the compaction input
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Sep 20, 2024
1 parent 71e38db commit 9a726ef
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 109 deletions.
8 changes: 3 additions & 5 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,8 @@ class CompactionJob {
// doesn't contain the LSM tree information, which is passed though MANIFEST
// file.
struct CompactionServiceInput {
ColumnFamilyDescriptor column_family;

DBOptions db_options;
std::string options_file;
std::string cf_name;

std::vector<SequenceNumber> snapshots;

Expand All @@ -402,8 +401,7 @@ struct CompactionServiceInput {
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);

// Initialize a dummy ColumnFamilyDescriptor
CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
CompactionServiceInput() {}

#ifndef NDEBUG
bool TEST_Equals(CompactionServiceInput* other);
Expand Down
17 changes: 4 additions & 13 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1568,17 +1568,8 @@ TEST_F(CompactionJobTest, InputSerialization) {
const int kStrMaxLen = 1000;
Random rnd(static_cast<uint32_t>(time(nullptr)));
Random64 rnd64(time(nullptr));
input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
input.column_family.options.comparator = ReverseBytewiseComparator();
input.column_family.options.max_bytes_for_level_base =
rnd64.Uniform(UINT64_MAX);
input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
input.column_family.options.compression = kZSTD;
input.column_family.options.compression_opts.level = 4;
input.db_options.max_background_flushes = 10;
input.db_options.paranoid_checks = rnd.OneIn(2);
input.db_options.statistics = CreateDBStatistics();
input.db_options.env = env_;
input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
input.options_file = "OPTIONS-007";
while (!rnd.OneIn(10)) {
input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
}
Expand Down Expand Up @@ -1606,10 +1597,10 @@ TEST_F(CompactionJobTest, InputSerialization) {
ASSERT_TRUE(deserialized1.TEST_Equals(&input));

// Test mismatch
deserialized1.db_options.max_background_flushes += 10;
deserialized1.output_level += 10;
std::string mismatch;
ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
ASSERT_EQ(mismatch, "db_options.max_background_flushes");
ASSERT_EQ(mismatch, "output_level");

// Test unknown field
CompactionServiceInput deserialized2;
Expand Down
81 changes: 28 additions & 53 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/utilities/options_util.h"

namespace ROCKSDB_NAMESPACE {
class SubcompactionState;
Expand All @@ -28,6 +29,13 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(

const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
Status s = GetLatestOptionsFileName(dbname_, db_options_.env,
&compaction_input.options_file);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}

compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;

Expand All @@ -39,12 +47,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.column_family.name =
compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);

compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin =
Expand All @@ -54,7 +58,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.has_end ? sub_compact->end->ToString() : "";

std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
Expand All @@ -70,7 +74,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
Expand All @@ -84,13 +88,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(),
job_id_);
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
default:
assert(false); // unknown status
Expand All @@ -99,7 +104,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
Expand All @@ -109,7 +114,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}

Expand All @@ -134,9 +139,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}

Expand All @@ -162,7 +167,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());

// Installation Starts
Expand Down Expand Up @@ -264,8 +269,8 @@ Status CompactionServiceCompactionJob::Run() {
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);

write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());

bottommost_level_ = c->bottommost_level();

Slice begin = compaction_input_.begin;
Expand Down Expand Up @@ -404,42 +409,12 @@ static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
};

static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"column_family",
OptionTypeInfo::Struct(
"column_family", &cfd_type_info,
offsetof(struct CompactionServiceInput, column_family),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"db_options",
{offsetof(struct CompactionServiceInput, db_options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto options = static_cast<DBOptions*>(addr);
return GetDBOptionsFromString(opts, DBOptions(), value, options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto options = static_cast<const DBOptions*>(addr);
std::string result;
auto status = GetStringFromDBOptions(opts, *options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const DBOptions*>(addr1);
const auto that_one = static_cast<const DBOptions*>(addr2);
auto this_conf = DBOptionsAsConfigurable(*this_one);
auto that_conf = DBOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
{"options_file",
{offsetof(struct CompactionServiceInput, options_file),
OptionType::kEncodedString}},
{"cf_name",
{offsetof(struct CompactionServiceInput, cf_name),
OptionType::kEncodedString}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
Expand Down
107 changes: 69 additions & 38 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"

Expand Down Expand Up @@ -938,69 +940,98 @@ Status DB::OpenAndCompact(
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
// Check for cancellation
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}

// 1. Deserialize Compaction Input
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
return s;
}

compaction_input.db_options.max_open_files = -1;
compaction_input.db_options.compaction_service = nullptr;
if (compaction_input.db_options.statistics) {
compaction_input.db_options.statistics.reset();
// 2. Parse Base DBOptions from OPTIONS File
DBOptions db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
s = LoadOptionsFromFile(config_options,
name + "/" + compaction_input.options_file,
&db_options, &all_column_families);
if (!s.ok()) {
return s;
}
compaction_input.db_options.env = override_options.env;
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =
override_options.merge_operator;
compaction_input.column_family.options.compaction_filter =
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
compaction_input.column_family.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
compaction_input.db_options.listeners = override_options.listeners;

// 3. Override pointer configurations in DBOptions with
// CompactionServiceOptionsOverride
db_options.env = override_options.env;
db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
db_options.statistics = override_options.statistics;
db_options.listeners = override_options.listeners;
db_options.compaction_service = nullptr;
// We will close the DB after the compaction anyway.
// Open as many files as needed for the compaction.
db_options.max_open_files = -1;

// 4. Filter CFs that are needed for OpenAndCompact()
// We do not need to open all column families for the remote compaction.
// Only open default CF + target CF. If target CF == default CF, we will open
// just the default CF (Due to current limitation, DB cannot open without the
// default CF)
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family);
// TODO: we have to open default CF, because of an implementation limitation,
// currently we just use the same CF option from input, which is not collect
// and open may fail.
if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
column_families.emplace_back(kDefaultColumnFamilyName,
compaction_input.column_family.options);
for (auto& cf : all_column_families) {
if (cf.name == compaction_input.cf_name) {
cf.options.comparator = override_options.comparator;
cf.options.merge_operator = override_options.merge_operator;
cf.options.compaction_filter = override_options.compaction_filter;
cf.options.compaction_filter_factory =
override_options.compaction_filter_factory;
cf.options.prefix_extractor = override_options.prefix_extractor;
cf.options.table_factory = override_options.table_factory;
cf.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
cf.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
column_families.emplace_back(cf);
} else if (cf.name == kDefaultColumnFamilyName) {
column_families.emplace_back(cf);
}
}

// 5. Open db As Secondary
DB* db;
std::vector<ColumnFamilyHandle*> handles;

s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
column_families, &handles, &db);
s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
&handles, &db);
if (!s.ok()) {
return s;
}
assert(db);

// 6. Find the handle of the Column Family that this will compact
ColumnFamilyHandle* cfh;
for (auto* handle : handles) {
if (compaction_input.cf_name == handle->GetName()) {
cfh = handle;
break;
}
}
assert(cfh);

// 7. Run the compaction without installation.
// Output will be stored in the directory specified by output_directory
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0);
s = db_secondary->CompactWithoutInstallation(
options, handles[0], compaction_input, &compaction_result);
s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
&compaction_result);

// 8. Serialize the result
Status serialization_status = compaction_result.Write(output);

// 9. Close the db and return
for (auto& handle : handles) {
delete handle;
}
Expand Down

0 comments on commit 9a726ef

Please sign in to comment.