Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed Feb 4, 2025
1 parent e2bdc81 commit fc95526
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 222 deletions.
61 changes: 29 additions & 32 deletions dbms/src/Common/ColumnsHashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ struct HashMethodOneNumber
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
using KeyHolderType = FieldType;
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = false;
static constexpr bool can_batch_get_key_holder = false;

const FieldType * vec;

Expand Down Expand Up @@ -110,7 +112,7 @@ class KeyStringBatchHandlerBase
const auto row = batch_row_idx + i;
const auto last_offset = offsets[row - 1];
// Remove last zero byte.
StringRef key(chars + last_offset, offsets[row] - offsets[row -1 ] - 1);
StringRef key(chars + last_offset, offsets[row] - last_offset - 1);
if constexpr (has_collator)
key = derived_collator->sortKey(key.data, key.size, sort_key_containers[i]);

Expand All @@ -121,15 +123,12 @@ class KeyStringBatchHandlerBase

void santityCheck() const
{
// Make sure init() has called.
// Make sure init() has been called.
assert(sort_key_containers.size() == batch_rows.size() && !sort_key_containers.empty());
}

protected:
bool inited() const
{
return !sort_key_containers.empty();
}
bool inited() const { return !sort_key_containers.empty(); }

void init(size_t start_row, size_t max_batch_size)
{
Expand All @@ -149,20 +148,20 @@ class KeyStringBatchHandlerBase

if likely (collator)
{
#define M(VAR_PREFIX, COLLATOR_NAME, IMPL_TYPE, COLLATOR_ID) \
case (COLLATOR_ID): \
{ \
return prepareNextBatchType<IMPL_TYPE, true>(chars, offsets, cur_batch_size, collator); \
break; \
}
#define M(VAR_PREFIX, COLLATOR_NAME, IMPL_TYPE, COLLATOR_ID) \
case (COLLATOR_ID): \
{ \
return prepareNextBatchType<IMPL_TYPE, true>(chars, offsets, cur_batch_size, collator); \
break; \
}

switch (collator->getCollatorId())
{
APPLY_FOR_COLLATOR_TYPES(M)
default:
{
throw Exception(fmt::format("unexpected collator: {}", collator->getCollatorId()));
}
default:
{
throw Exception(fmt::format("unexpected collator: {}", collator->getCollatorId()));
}
};
#undef M
}
Expand All @@ -172,6 +171,7 @@ class KeyStringBatchHandlerBase
}
}

public:
// NOTE: i is the index of mini batch, it's not the row index of Column.
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolderBatch(size_t i, Arena * pool) const
{
Expand All @@ -195,6 +195,8 @@ struct HashMethodString
using BatchHandlerBase = KeyStringBatchHandlerBase;

static constexpr bool is_serialized_key = false;
// todo
static constexpr bool can_batch_get_key_holder = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
Expand All @@ -213,11 +215,6 @@ struct HashMethodString
collator = collators[0];
}

bool batchGetkeyHolder() override
{
return BatchHandlerBase::inited();
}

void initBatchHandler(size_t start_row, size_t max_batch_size)
{
assert(!BatchHandlerBase::inited());
Expand Down Expand Up @@ -260,6 +257,7 @@ struct HashMethodStringBin
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = false;
static constexpr bool can_batch_get_key_holder = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
Expand Down Expand Up @@ -461,6 +459,7 @@ struct HashMethodFastPathTwoKeysSerialized
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = true;
static constexpr bool can_batch_get_key_holder = false;

Key1Desc key_1_desc;
Key2Desc key_2_desc;
Expand Down Expand Up @@ -499,6 +498,7 @@ struct HashMethodFixedString
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = false;
static constexpr bool can_batch_get_key_holder = false;

size_t n;
const ColumnFixedString::Chars_t * chars;
Expand Down Expand Up @@ -548,6 +548,7 @@ struct HashMethodKeysFixed
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = false;
static constexpr bool can_batch_get_key_holder = false;
static constexpr bool has_nullable_keys = has_nullable_keys_;

Sizes key_sizes;
Expand Down Expand Up @@ -713,10 +714,7 @@ class KeySerializedBatchHandlerBase
}

protected:
bool inited() const
{
return !byte_size.empty();
}
bool inited() const { return !byte_size.empty(); }

void init(size_t start_row, const ColumnRawPtrs & key_columns, const TiDB::TiDBCollators & collators)
{
Expand Down Expand Up @@ -756,7 +754,7 @@ class KeySerializedBatchHandlerBase
pos,
batch_row_idx,
cur_batch_size,
false,
nullptr,
collators.empty() ? nullptr : collators[i],
&sort_key_container);

Expand All @@ -768,6 +766,7 @@ class KeySerializedBatchHandlerBase
return mem_size;
}

public:
// NOTE: i is the index of mini batch, it's not the row index of Column.
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolderBatch(size_t i, Arena * pool) const
{
Expand All @@ -790,10 +789,12 @@ struct HashMethodSerialized
using Self = HashMethodSerialized<Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
using BatchHandlerBase = KeySerializedBatchHandlerBase;
static constexpr bool is_serialized_key = true;
using KeyHolderType = SerializedKeyHolder;
using BatchKeyHolderType = ArenaKeyHolder;

static constexpr bool is_serialized_key = true;
static constexpr bool can_batch_get_key_holder = true;

ColumnRawPtrs key_columns;
size_t keys_size;
TiDB::TiDBCollators collators;
Expand All @@ -807,11 +808,6 @@ struct HashMethodSerialized
, collators(collators_)
{}

bool batchGetkeyHolder() override
{
return BatchHandlerBase::inited();
}

void initBatchHandler(size_t start_row, size_t /* max_batch_size */)
{
assert(!BatchHandlerBase::inited());
Expand Down Expand Up @@ -849,6 +845,7 @@ struct HashMethodHashed
using BatchKeyHolderType = KeyHolderType;

static constexpr bool is_serialized_key = false;
static constexpr bool can_batch_get_key_holder = false;

ColumnRawPtrs key_columns;
TiDB::TiDBCollators collators;
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Common/ColumnsHashingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ class HashMethodBase
using Cache = LastElementCache<Value, consecutive_keys_optimization>;
using Derived = TDerived;

bool batchGetKeyHolder() const override
{
return false;
}

template <typename Data>
ALWAYS_INLINE inline EmplaceResult emplaceKey(
Data & data,
Expand Down
24 changes: 15 additions & 9 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,21 +853,25 @@ try

context.addMockTable("test_db", "agg_table_with_special_key", table_column_infos, table_column_data);

std::vector<size_t> max_block_sizes{1, 8, DEFAULT_BLOCK_SIZE};
std::vector<size_t> concurrences{1, 8};
// std::vector<size_t> max_block_sizes{1, 8, DEFAULT_BLOCK_SIZE};
std::vector<size_t> max_block_sizes{1};
// std::vector<size_t> concurrences{1, 8};
std::vector<size_t> concurrences{1};
// 0: use one level
// 1: use two level
std::vector<UInt64> two_level_thresholds{0, 1};
std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN, TiDB::ITiDBCollator::UTF8MB4_GENERAL_CI};
// std::vector<UInt64> two_level_thresholds{0, 1};
std::vector<UInt64> two_level_thresholds{0};
// std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN, TiDB::ITiDBCollator::UTF8MB4_GENERAL_CI};
std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN};
std::vector<std::vector<String>> group_by_keys{
/// fast path with one int and one string
{"key_64", "key_string_1"},
/// fast path with two string
{"key_string_1", "key_string_2"},
/// fast path with one string
{"key_string_1"},
/// keys need to be shuffled
{"key_8", "key_16", "key_32", "key_64"},
// {"key_string_1", "key_string_2"},
// /// fast path with one string
// {"key_string_1"},
// /// keys need to be shuffled
// {"key_8", "key_16", "key_32", "key_64"},
};
for (auto collator_id : collators)
{
Expand All @@ -888,6 +892,7 @@ try
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
LOG_DEBUG(Logger::get(), "gjt debug 1");
auto reference = executeStreams(request);
if (current_collator->isCI())
{
Expand Down Expand Up @@ -922,6 +927,7 @@ try
Field(static_cast<UInt64>(two_level_threshold)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(block_size)));
WRAP_FOR_AGG_FAILPOINTS_START
LOG_DEBUG(Logger::get(), "gjt debug 2");
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency);
for (auto & block : blocks)
{
Expand Down
Loading

0 comments on commit fc95526

Please sign in to comment.