Skip to content

Commit

Permalink
Some further cleanup in WriteBatchWithIndex::MultiGetFromBatchAndDB (…
Browse files Browse the repository at this point in the history
…#12143)

Summary:
Pull Request resolved: facebook/rocksdb#12143

facebook/rocksdb#11982 changed `WriteBatchWithIndex::MultiGetFromBatchDB` to preallocate space in the `autovector`s `key_contexts` and `merges` in order to prevent any reallocations, both as an optimization and in order to prevent pointers into the container from being invalidated during subsequent insertions. On second thought, this preallocation can actually be a pessimization in cases when only a small subset of keys require querying the underlying database. To prevent any memory regressions, the PR reverts this preallocation. In addition, it makes some small code hygiene improvements like incorporating the `PinnableWideColumns` object into `MergeTuple`.

Reviewed By: jaykorean

Differential Revision: D52136513

fbshipit-source-id: 21aa835084433feab27b501d9d1fc5434acea609
  • Loading branch information
ltamasi authored and facebook-github-bot committed Dec 14, 2023
1 parent c74531b commit cd21e4e
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -669,27 +669,41 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
return;
}

autovector<PinnableWideColumns, MultiGetContext::MAX_BATCH_SIZE> existing;
existing.reserve(num_keys);
struct MergeTuple {
MergeTuple(const Slice& _key, Status* _s, MergeContext&& _merge_context,
PinnableSlice* _value)
: key(_key),
s(_s),
merge_context(std::move(_merge_context)),
value(_value) {
assert(s);
assert(value);
}

autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_contexts;
key_contexts.reserve(num_keys);
Slice key;
Status* s;
PinnableWideColumns existing;
MergeContext merge_context;
PinnableSlice* value;
};

using MergeTuple = std::tuple<KeyContext*, MergeContext, PinnableSlice*>;
autovector<MergeTuple, MultiGetContext::MAX_BATCH_SIZE> merges;
merges.reserve(num_keys);

autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_contexts;

// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
for (size_t i = 0; i < num_keys; ++i) {
const Slice& key = keys[i];
MergeContext merge_context;
std::string batch_value;
Status* s = &statuses[i];
PinnableSlice* pinnable_val = &values[i];
pinnable_val->Reset();
Status* const s = &statuses[i];
auto result = WriteBatchWithIndexInternal::GetFromBatch(
this, column_family, keys[i], &merge_context, &batch_value, s);
this, column_family, key, &merge_context, &batch_value, s);

PinnableSlice* const pinnable_val = &values[i];
pinnable_val->Reset();

if (result == WBWIIteratorImpl::kFound) {
*pinnable_val->GetSelf() = std::move(batch_value);
Expand All @@ -708,27 +722,35 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(

// Note: we have to retrieve all columns if we have to merge KVs from the
// batch and the DB; otherwise, the default column is sufficient.
// The columns field will be populated by the loop below to prevent issues
// with dangling pointers.
if (result == WBWIIteratorImpl::kMergeInProgress) {
existing.emplace_back();
key_contexts.emplace_back(column_family, keys[i], /* value */ nullptr,
&existing.back(), /* timestamp */ nullptr,
&statuses[i]);
merges.emplace_back(&key_contexts.back(), std::move(merge_context),
pinnable_val);
merges.emplace_back(key, s, std::move(merge_context), pinnable_val);
key_contexts.emplace_back(column_family, key, /* value */ nullptr,
/* columns */ nullptr, /* timestamp */ nullptr,
s);
continue;
}

assert(result == WBWIIteratorImpl::kNotFound);
key_contexts.emplace_back(column_family, keys[i], pinnable_val,
key_contexts.emplace_back(column_family, key, pinnable_val,
/* columns */ nullptr,
/* timestamp */ nullptr, &statuses[i]);
/* timestamp */ nullptr, s);
}

autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.reserve(key_contexts.size());

for (KeyContext& key : key_contexts) {
sorted_keys.emplace_back(&key);
size_t merges_idx = 0;
for (KeyContext& key_context : key_contexts) {
if (!key_context.value) {
assert(*key_context.key == merges[merges_idx].key);

key_context.columns = &merges[merges_idx].existing;
++merges_idx;
}

sorted_keys.emplace_back(&key_context);
}

// Did not find key in batch OR could not resolve Merges. Try DB.
Expand All @@ -738,14 +760,10 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
->MultiGetWithCallback(read_options, column_family, callback,
&sorted_keys);

for (auto iter = merges.begin(); iter != merges.end(); ++iter) {
auto& [key_context, merge_context, value] = *iter;

if (key_context->s->ok() ||
key_context->s->IsNotFound()) { // DB lookup succeeded
MergeAcrossBatchAndDB(column_family, *key_context->key,
*key_context->columns, merge_context, value,
key_context->s);
for (const auto& merge : merges) {
if (merge.s->ok() || merge.s->IsNotFound()) { // DB lookup succeeded
MergeAcrossBatchAndDB(column_family, merge.key, merge.existing,
merge.merge_context, merge.value, merge.s);
}
}
}
Expand Down

0 comments on commit cd21e4e

Please sign in to comment.