Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate memory_units_t in fetch to reduce cross-shard calls #12092

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ static ss::future<read_result> read_from_partition(
std::move(aborted_transactions));
}

read_result::memory_units_t::memory_units_t(
ssx::semaphore& memory_sem, ssx::semaphore& memory_fetch_sem) noexcept
: kafka(ss::consume_units(memory_sem, 0))
, fetch(ss::consume_units(memory_fetch_sem, 0)) {}

read_result::memory_units_t::~memory_units_t() noexcept {
if (shard == ss::this_shard_id()) {
if (shard == ss::this_shard_id() || !has_units()) {
return;
}
auto f = ss::smp::submit_to(
Expand All @@ -169,6 +174,13 @@ read_result::memory_units_t::~memory_units_t() noexcept {
}
}

void read_result::memory_units_t::adopt(memory_units_t&& o) {
// Adopts assert internally that the units are from the same semaphore.
// So there is no need to assert that they are from the same shard here.
kafka.adopt(std::move(o.kafka));
fetch.adopt(std::move(o.fetch));
}

/**
* Consume proper amounts of units from memory semaphores and return them as
* semaphore_units. Fetch semaphore units returned are the indication of
Expand Down Expand Up @@ -263,7 +275,7 @@ static ss::future<read_result> do_read_from_ntp(
ssx::semaphore& memory_sem,
ssx::semaphore& memory_fetch_sem) {
// control available memory
read_result::memory_units_t memory_units;
read_result::memory_units_t memory_units(memory_sem, memory_fetch_sem);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a latent bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if for some reason read_from_partition reads data even though skip_read is set a seg fault will occur when we try to adjust the semaphore units in memory_units_t. This doesn't happen today, but one never knows what the future holds.

This avoids this potential issue by setting the semaphore units to some reasonable default state.

if (!ntp_config.cfg.skip_read) {
memory_units = reserve_memory_units(
memory_sem,
Expand Down Expand Up @@ -421,6 +433,10 @@ static void fill_fetch_responses(
range = boost::irange<size_t>(
0, std::min({results.size(), responses.size()}));
}

// Used to aggregate semaphore_units from results.
std::optional<read_result::memory_units_t> total_memory_units;

for (auto idx : range) {
auto& res = results[idx];
auto& resp_it = responses[idx];
Expand Down Expand Up @@ -454,6 +470,20 @@ static void fill_fetch_responses(
resp.preferred_read_replica = *res.preferred_replica;
}

// Aggregate memory_units from all results together to avoid
// making more than one cross-shard function call to free them.
//
// Only aggregate non-empty memory_units.
if (res.memory_units.has_units()) {
if (unlikely(!total_memory_units)) {
// Move the first set of semaphore_units to get a copy of the
// semaphore pointer and the shard results originates from.
total_memory_units = std::move(res.memory_units);
} else {
total_memory_units->adopt(std::move(res.memory_units));
}
}

/**
* According to KIP-74 we have to return first batch even if it would
* violate max_bytes fetch parameter
Expand Down
13 changes: 13 additions & 0 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ struct read_result {
memory_units_t& operator=(memory_units_t&&) noexcept = default;
memory_units_t(const memory_units_t&) = delete;
memory_units_t& operator=(const memory_units_t&) = delete;
memory_units_t(
ssx::semaphore& memory_sem,
ssx::semaphore& memory_fetch_sem) noexcept;

/*
* Adopts another memory_units_t. This requires that both
* memory_units_t are from the same shard.
*/
void adopt(memory_units_t&& o);

bool has_units() const {
return fetch.count() > 0 || kafka.count() > 0;
}
};

explicit read_result(error_code e)
Expand Down