Skip to content

Commit

Permalink
combinator function
Browse files Browse the repository at this point in the history
  • Loading branch information
Jasmine-ge committed Dec 9, 2024
1 parent fe8395f commit 22733ff
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 308 deletions.
2 changes: 1 addition & 1 deletion src/AggregateFunctions/AggregateFunctionAvgWeighted.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AggregateFunctionAvgWeighted final :

using Numerator = typename Base::Numerator;
using Denominator = typename Base::Denominator;
using Fraction = typename Base::Fraction;
using Fraction = typename Base::Fraction;

void NO_SANITIZE_UNDEFINED add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
Expand Down
6 changes: 4 additions & 2 deletions src/AggregateFunctions/AggregateFunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
if (combinator_name == "_time_weighted" && nested_name == "avg")
nested_name = "avg_weighted";
else if (combinator_name == "_time_weighted" && nested_name == "median")
nested_name = "median_timing";

nested_name = "median_timing_weighted";
else
throw Exception(ErrorCodes::ILLEGAL_AGGREGATION, "Combinator '{}' with {} is not supported", combinator_name, nested_name);

/// Nested identical combinators (i.e. uniqCombinedIfIf) is not
/// supported (since they don't work -- silently).
///
Expand Down
38 changes: 6 additions & 32 deletions src/AggregateFunctions/AggregateFunctionTimeWeighted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,7 @@ class AggregateFunctionCombinatorTimeWeighted final : public IAggregateFunctionC
throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

DataTypes nested_arguments;
nested_arguments.push_back(arguments[0]);
//delete
if (isDate(*arguments.begin()))
return nested_arguments;
if (isDate(arguments.back()))
nested_arguments.push_back(std::make_shared<DataTypeUInt16>());
else if(isDate32(arguments.back()))
nested_arguments.push_back(std::make_shared<DataTypeInt32>());
else if(isDateTime(arguments.back()))
nested_arguments.push_back(std::make_shared<DataTypeUInt32>());
else if(isDateTime64(arguments.back()))
nested_arguments.push_back(std::make_shared<DataTypeFloat64>());

return nested_arguments;
return {arguments[0], std::make_shared<DataTypeUInt64>()};
}

/// Decimal128 and Decimal256 aren't supported
Expand Down Expand Up @@ -97,29 +83,17 @@ class AggregateFunctionCombinatorTimeWeighted final : public IAggregateFunctionC

const auto data_type = static_cast<const DataTypePtr>(arguments[0]);
const auto data_type_time_weight = static_cast<const DataTypePtr>(arguments[1]);
const WhichDataType dt(data_type), t_dt(data_type_time_weight);
const WhichDataType t_dt(data_type_time_weight);

if ((dt.isInt() || dt.isUInt() || dt.isFloat() || dt.isDecimal()) && (t_dt.isDateOrDate32() || t_dt.isDateTime()|| t_dt.isDateTime64()))
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Types {} and {} are non-conforming as arguments for aggregate function {}", data_type->getName(), data_type_time_weight->getName(), this->getName());
if (!t_dt.isDateOrDate32() && !t_dt.isDateTime() && !t_dt.isDateTime64())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Types {} are non-conforming as time weighted arguments for aggregate function {}", data_type_time_weight->getName(), this->getName());

if (arguments.size() == 3)
{
const auto data_type_third_arg = static_cast<const DataTypePtr>(arguments[2]);

if(data_type_third_arg != data_type_time_weight)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second and the third argument should be the same for aggregate function {}", this->getName());
// AggregateFunctionPtr ptr;

// // const bool left_decimal = isDecimal(data_type);
// // data_type_time_weight = UInt64;
// // auto data_type_uint64 = std::make_shared<DataTypeUInt64>();
// // if (left_decimal)
// // ptr.reset(create(*data_type, *data_type_time_weight, nested_function, arguments, params,
// // getDecimalScale(*data_type)));
// // else
// ptr.reset(create(*data_type, *data_type_time_weight, nested_function, arguments, params));
// return ptr;
if(data_type_third_arg->getTypeId() != data_type_time_weight->getTypeId())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second and the third argument should be the same for aggregate function {}", this->getName());
}
AggregateFunctionPtr ptr;
ptr.reset(create(*data_type, *data_type_time_weight, nested_function, arguments, params));
Expand Down
Loading

0 comments on commit 22733ff

Please sign in to comment.