Skip to content

Commit 5ce94ed

Browse files
committed
use lazy_emplace_batch
1 parent 0ec4bbb commit 5ce94ed

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

be/src/exec/operator/aggregation_sink_operator.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -639,11 +639,10 @@ void AggSinkLocalState::_emplace_into_hash_table_inline_count(ColumnRawPtrs& key
639639
auto creator_for_null_key = [&](auto& mapped) { mapped = nullptr; };
640640

641641
SCOPED_TIMER(_hash_table_emplace_timer);
642-
for (size_t i = 0; i < num_rows; ++i) {
643-
auto* mapped_ptr = agg_method.lazy_emplace(state, i, creator,
644-
creator_for_null_key);
645-
++reinterpret_cast<UInt64&>(*mapped_ptr);
646-
}
642+
lazy_emplace_batch(agg_method, state, num_rows, creator,
643+
creator_for_null_key, [&](uint32_t, auto& mapped) {
644+
++reinterpret_cast<UInt64&>(mapped);
645+
});
647646

648647
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
649648
}},
@@ -680,11 +679,11 @@ void AggSinkLocalState::_merge_into_hash_table_inline_count(ColumnRawPtrs& key_c
680679
auto creator_for_null_key = [&](auto& mapped) { mapped = nullptr; };
681680

682681
SCOPED_TIMER(_hash_table_emplace_timer);
683-
for (size_t i = 0; i < num_rows; ++i) {
684-
auto* mapped_ptr = agg_method.lazy_emplace(state, i, creator,
685-
creator_for_null_key);
686-
reinterpret_cast<UInt64&>(*mapped_ptr) += col_data[i].count;
687-
}
682+
lazy_emplace_batch(
683+
agg_method, state, num_rows, creator, creator_for_null_key,
684+
[&](uint32_t i, auto& mapped) {
685+
reinterpret_cast<UInt64&>(mapped) += col_data[i].count;
686+
});
688687

689688
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
690689
}},

be/src/exec/operator/streaming_aggregation_operator.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
103103
// StreamingAgg only operates in update + serialize mode: input is raw data, output is serialized intermediate state.
104104
// The serialization format of count is UInt64 itself, so it can be inlined into the hash table mapped slot.
105105
if (_aggregate_evaluators.size() == 1 &&
106-
_aggregate_evaluators[0]->function()->is_simple_count()) {
106+
_aggregate_evaluators[0]->function()->is_simple_count() && limit == -1) {
107107
_use_simple_count = true;
108108
#ifndef NDEBUG
109109
// Randomly enable/disable in debug mode to verify correctness of multi-phase agg promotion/demotion.
@@ -892,11 +892,10 @@ void StreamingAggLocalState::_emplace_into_hash_table_inline_count(ColumnRawPtrs
892892
auto creator_for_null_key = [&](auto& mapped) { mapped = nullptr; };
893893

894894
SCOPED_TIMER(_hash_table_emplace_timer);
895-
for (size_t i = 0; i < num_rows; ++i) {
896-
auto* mapped_ptr = agg_method.lazy_emplace(state, i, creator,
897-
creator_for_null_key);
898-
++reinterpret_cast<UInt64&>(*mapped_ptr);
899-
}
895+
lazy_emplace_batch(agg_method, state, num_rows, creator,
896+
creator_for_null_key, [&](uint32_t, auto& mapped) {
897+
++reinterpret_cast<UInt64&>(mapped);
898+
});
900899

901900
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
902901
}},

0 commit comments

Comments
 (0)