diff --git a/src/AggregateFunctions/AggregateFunctionAvgWeighted.h b/src/AggregateFunctions/AggregateFunctionAvgWeighted.h index 71c1cb149a..9e05b661d0 100644 --- a/src/AggregateFunctions/AggregateFunctionAvgWeighted.h +++ b/src/AggregateFunctions/AggregateFunctionAvgWeighted.h @@ -15,7 +15,7 @@ using AvgWeightedFieldType = std::conditional_t, NearestFieldType>>; template -using MaxFieldType = std::conditional_t<(sizeof(AvgWeightedFieldType) > sizeof(AvgWeightedFieldType)), +using MaxFieldType = std::conditional_t<(sizeof(AvgWeightedFieldType) >= sizeof(AvgWeightedFieldType)), AvgWeightedFieldType, AvgWeightedFieldType>; template @@ -30,7 +30,7 @@ class AggregateFunctionAvgWeighted final : using Numerator = typename Base::Numerator; using Denominator = typename Base::Denominator; - using Fraction = typename Base::Fraction; + using Fraction = typename Base::Fraction; void NO_SANITIZE_UNDEFINED add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override { diff --git a/src/AggregateFunctions/AggregateFunctionFactory.cpp b/src/AggregateFunctions/AggregateFunctionFactory.cpp index ee6e212fe5..053d07e7aa 100644 --- a/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -179,6 +179,17 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl( query_context->addQueryFactoriesInfo(Context::QueryLogFactories::AggregateFunctionCombinator, combinator_name); String nested_name = name.substr(0, name.size() - combinator_name.size()); + + if (combinator_name == "_time_weighted") + { + if (nested_name == "avg") + nested_name = "avg_weighted"; + else if (nested_name == "median") + nested_name = "median_exact_weighted"; + else + throw Exception(ErrorCodes::ILLEGAL_AGGREGATION, "Unknown aggregate function '{}'", name); + } + /// Nested identical combinators (i.e. uniqCombinedIfIf) is not /// supported (since they don't work -- silently). /// diff --git a/src/AggregateFunctions/AggregateFunctionTimeWeighted.cpp b/src/AggregateFunctions/AggregateFunctionTimeWeighted.cpp new file mode 100644 index 0000000000..deef4b90d9 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionTimeWeighted.cpp @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace +{ + +class AggregateFunctionCombinatorTimeWeighted final : public IAggregateFunctionCombinator +{ +public: + String getName() const override { return "_time_weighted"; } + + DataTypes transformArguments(const DataTypes & arguments) const override + { + if (arguments.size() != 2 && arguments.size() != 3) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Incorrect number of arguments for aggregate function with {} suffix", this->getName()); + + const auto & data_type_time_weight = arguments[1]; + const WhichDataType t_dt(data_type_time_weight); + + if (!t_dt.isDateOrDate32() && !t_dt.isDateTime() && !t_dt.isDateTime64()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Types {} are non-conforming as time weighted arguments for aggregate function {}", data_type_time_weight->getName(), this->getName()); + + if (arguments.size() == 3) + { + const auto & data_type_third_arg = arguments[2]; + + if(!data_type_third_arg->equals(*data_type_time_weight)) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second and the third argument should be the same for aggregate function {}, but now it's {} and {}", this->getName(), data_type_third_arg->getName(), data_type_time_weight->getName()); + } + + return {arguments[0], std::make_shared()}; + } + + /// Decimal128 and Decimal256 aren't supported + #define AT_SWITCH(LINE) \ + switch (which.idx) \ + { \ + LINE(Int8); LINE(Int16); LINE(Int32); LINE(Int64); LINE(Int128); LINE(Int256); \ + LINE(UInt8); LINE(UInt16); LINE(UInt32); LINE(UInt64); LINE(UInt128); LINE(UInt256); \ + LINE(Decimal32); LINE(Decimal64); \ + LINE(Float32); LINE(Float64); \ + default: return nullptr; \ + } + + // Not using helper functions because there are no templates for binary decimal/numeric function. + template + IAggregateFunction * create(const IDataType & first_type, const IDataType & second_type, TArgs && ... args) const + { + const WhichDataType which(first_type); + + #define LINE(Type) \ + case TypeIndex::Type: return create(second_type, std::forward(args)...) + AT_SWITCH(LINE) + #undef LINE + } + template + IAggregateFunction * create(const IDataType & second_type, TArgs && ... args) const + { + const WhichDataType which(second_type); + + switch (which.idx) + { + case TypeIndex::Date: return new AggregateFunctionTimeWeighted(std::forward(args)...); + case TypeIndex::Date32: return new AggregateFunctionTimeWeighted(std::forward(args)...); + case TypeIndex::DateTime: return new AggregateFunctionTimeWeighted(std::forward(args)...); + case TypeIndex::DateTime64: return new AggregateFunctionTimeWeighted(std::forward(args)...); + default: return nullptr; + } + } + + AggregateFunctionPtr transformAggregateFunction( + const AggregateFunctionPtr & nested_function, + const AggregateFunctionProperties &, + const DataTypes & arguments, + const Array & params) const override + { + AggregateFunctionPtr ptr; + const auto & data_type = arguments[0]; + const auto & data_type_time_weight = arguments[1]; + ptr.reset(create(*data_type, *data_type_time_weight, nested_function, arguments, params)); + if(!ptr) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal argument types existed in {} function", this->getName()); + + return ptr; + } +}; +} + +void registerAggregateFunctionCombinatorTimeWeighted(AggregateFunctionCombinatorFactory & factory) +{ + factory.registerCombinator(std::make_shared()); +} +} diff --git a/src/AggregateFunctions/AggregateFunctionTimeWeighted.h b/src/AggregateFunctions/AggregateFunctionTimeWeighted.h new file mode 100644 index 0000000000..27d7f2ea1e --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionTimeWeighted.h @@ -0,0 +1,378 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int INVALID_DATA; +} + +struct Settings; + +template +struct TimeWeightedData +{ + struct Last + { + Field last_value; + TimeType last_time; + }; + std::optional last; + std::optional start_time; + std::optional end_time; + +}; + +template +class AggregateFunctionTimeWeighted: + public IAggregateFunctionDataHelper, + AggregateFunctionTimeWeighted> + +{ +protected: + AggregateFunctionPtr nested_func; + size_t prefix_size; + size_t arguments_num; + Poco::Logger * logger; + + AggregateDataPtr getNestedPlace(AggregateDataPtr __restrict place) const noexcept + { + return place + prefix_size; + } + + ConstAggregateDataPtr getNestedPlace(ConstAggregateDataPtr __restrict place) const noexcept + { + return place + prefix_size; + } +public: + using Base = IAggregateFunctionDataHelper, + AggregateFunctionTimeWeighted>; + + AggregateFunctionTimeWeighted(AggregateFunctionPtr nested_func_, const DataTypes & arguments, const Array & params_) + : Base(arguments, params_) + , nested_func(nested_func_) + , arguments_num(arguments.size()) + , logger(&Poco::Logger::get("AggregateFunctionTimeWeighted")) + { + size_t nested_size = nested_func->alignOfData(); + prefix_size = (sizeof(TimeWeightedData) + nested_size - 1) / nested_size * nested_size; + } + + void calculateLastTime(const TimeType & end_time, AggregateDataPtr __restrict place, Arena * arena) const + { + auto & data = this->data(place); + /// last time caculation + if (data.last.has_value()) + { + MutableColumnPtr value_column, weight_column; + value_column = this->argument_types[0]->createColumn(); + weight_column = ColumnUInt64::create(); + if (end_time >= data.last->last_time) [[likely]] + { + value_column->insert(data.last->last_value); + weight_column->insert(static_cast(end_time - data.last->last_time)); + } + else + { + LOG_WARNING(logger, "Illegal time argument, should be in ascending order, {}, {}" , data.last->last_time, end_time); + } + + ColumnRawPtrs raw_columns{value_column.get(), weight_column.get()}; + nested_func->add(getNestedPlace(place), raw_columns.data(), 0, arena); + } + + } + + void storeLastData(size_t last_row_pos, AggregateDataPtr __restrict place, const IColumn ** columns) const + { + auto & data = this->data(place); + const auto & value_data = assert_cast &>(*columns[0]).getData(); + const auto & time_data = assert_cast &>(*columns[1]).getData(); + data.last = { + static_cast(value_data[last_row_pos]), + static_cast(time_data[last_row_pos]) + }; + + /// remember current time + if (this->argument_types.size() == 3) + data.end_time = assert_cast &>(*columns[2]).getData()[last_row_pos]; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + const auto & time_data = assert_cast &>(*columns[1]).getData(); + + /// remember start time, only works in the first time + if (!this->data(place).start_time.has_value()) + this->data(place).start_time = time_data[row_num]; + + calculateLastTime(time_data[row_num], place, arena); + storeLastData(row_num, place, columns); + } + + void addBatchSinglePlace( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** columns, + Arena * arena, + ssize_t if_argument_pos, + const IColumn * delta_col [[maybe_unused]]) const final + { + + if (if_argument_pos >= 0 || delta_col != nullptr) + return nested_func->addBatchSinglePlace(row_begin, row_end, place, columns, arena, if_argument_pos, delta_col); + else if (row_end - row_begin == 1) + return add(place, columns, row_begin, arena); + + const auto & time_data = assert_cast &>(*columns[1]).getData(); + calculateLastTime(time_data[row_begin], place, arena); + + /// remember start time, only works in the first time + if (!this->data(place).start_time.has_value()) + this->data(place).start_time = time_data[row_begin]; + + auto last_row_pos = row_end - 1; + /// caculate time + MutableColumnPtr weight_column = ColumnUInt64::create(); + for (size_t i = row_begin; i < last_row_pos; i++) + { + if (time_data[i + 1] >= time_data[i]) [[likely]] + weight_column->insert(static_cast(time_data[i + 1] - time_data[i])); + else + LOG_WARNING(logger, "Illegal time argument, should be in ascending order, {}, {}" ,time_data[i] ,time_data[i + 1]); + } + + /// prepare data + ColumnRawPtrs raw_columns{columns[0], weight_column.get()}; + nested_func->addBatchSinglePlace(row_begin, last_row_pos, getNestedPlace(place), raw_columns.data(), arena, if_argument_pos, delta_col); + + storeLastData(last_row_pos, place, columns); + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** columns, + const UInt8 * null_map, + Arena * arena, + ssize_t if_argument_pos, + const IColumn * delta_col [[maybe_unused]]) + const final + { + if (if_argument_pos >= 0 || delta_col != nullptr) + return nested_func->addBatchSinglePlaceNotNull(row_begin, row_end, place, columns, null_map, arena, if_argument_pos, delta_col); + else if (row_end - row_begin == 1) + return add(place, columns, row_begin, arena); + + // const auto & value_data = assert_cast &>(*columns[0]).getData(); + const auto & time_data = assert_cast &>(*columns[1]).getData(); + calculateLastTime(time_data[row_begin], place, arena); + + /// remember start time, only works in the first time + if (!this->data(place).start_time.has_value()) + this->data(place).start_time = time_data[row_begin]; + + auto last_row_pos = row_end - 1; + /// caculate time + MutableColumnPtr weight_column = ColumnUInt64::create(); + for (size_t i = row_begin; i < row_end - 1; i++) + { + if (!null_map[i]) + { + if (time_data[i + 1] < time_data[i]) + LOG_WARNING(logger, "Illegal time argument, should be in ascending order, {}, {}" ,time_data[i] ,time_data[i + 1]); + else + weight_column->insert(static_cast(time_data[i + 1] - time_data[i])); + } + } + ColumnRawPtrs raw_columns{columns[0], weight_column.get()}; + + nested_func-> addBatchSinglePlaceNotNull(row_begin, last_row_pos, getNestedPlace(place), raw_columns.data(), null_map, arena, if_argument_pos, delta_col); + + storeLastData(last_row_pos, place, columns); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + /// FIXME, time disorder may happen, the outcome might not be accurate + auto & data = this->data(place); + auto & rhs_data = this->data(rhs); + if (data.last.has_value()) + { + if (rhs_data.start_time.has_value()) + { + if (rhs_data.start_time.value() < data.last->last_time) + throw Exception(ErrorCodes::INVALID_DATA, "Illegal time argument, should be in ascending order, {}, {}" ,data.last->last_time ,rhs_data.start_time.value()); + + MutableColumnPtr value_column, weight_column; + value_column = this->argument_types[0]->createColumn(); + weight_column = ColumnUInt64::create(); + + value_column->insert(data.last->last_value); + weight_column->insert(static_cast(rhs_data.start_time.value() - data.last->last_time)); + if (rhs_data.last.has_value()) + data.last = rhs_data.last; + if (rhs_data.end_time.has_value()) + data.end_time = rhs_data.end_time; + + ColumnRawPtrs raw_columns{value_column.get(), weight_column.get()}; + nested_func->add(getNestedPlace(place), raw_columns.data(), 0, arena); + } + } + else + { + /// if current data is empty, we will directly use the rhs data. + data.last = rhs_data.last; + data.start_time = rhs_data.start_time; + data.end_time = rhs_data.end_time; + } + + nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version) const override + { + auto & data = this->data(place); + auto has_last = data.last.has_value(); + writeBinary(has_last, buf); + if (has_last) + { + writeBinary(true, buf); + writeFieldBinary(data.last->last_value, buf); + writeBinary(data.last->last_time, buf); + } + + auto has_start_time = data.start_time.has_value(); + writeBinary(has_start_time, buf); + if (has_start_time) + { + writeBinary(true, buf); + writeBinary(data.start_time.value(), buf); + } + + auto has_end_time = data.end_time.has_value(); + writeBinary(has_end_time, buf); + if (has_end_time) + { + writeBinary(true, buf); + writeBinary(data.end_time.value(), buf); + } + + nested_func->serialize(getNestedPlace(place), buf, version); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional version , Arena * arena) const override + { + auto & data = this->data(place); + bool last_has_value; + readBinary(last_has_value, buf); + if (last_has_value) + { + data.last->last_value = readFieldBinary(buf); + readBinary(data.last->last_time, buf); + } + + bool start_has_value; + readBinary(start_has_value, buf); + if (start_has_value) + readBinary(data.start_time.emplace(), buf); + + bool end_has_value; + readBinary(end_has_value, buf); + if (end_has_value) + readBinary(data.end_time.emplace(), buf); + + nested_func->deserialize(getNestedPlace(place), buf, version, arena); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override + { + auto & data = this->data(place); + if (data.end_time.has_value()) + calculateLastTime(data.end_time.value(), place, arena); + + nested_func->insertResultInto(getNestedPlace(place), to, arena); + } + + size_t sizeOfData() const override + { + return prefix_size + nested_func->sizeOfData(); + } + + void create(AggregateDataPtr __restrict place) const override + { + new (place) TimeWeightedData; + nested_func->create(getNestedPlace(place)); + } + + void destroy(AggregateDataPtr __restrict place) const noexcept override + { + this->data(place).~TimeWeightedData(); + nested_func->destroy(getNestedPlace(place)); + } + + bool hasTrivialDestructor() const override + { + return std::is_trivially_destructible_v> && nested_func->hasTrivialDestructor(); + } + + void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override + { + this->data(place).~TimeWeightedData(); + nested_func->destroyUpToState(getNestedPlace(place)); + } + + String getName() const override + { + return nested_func->getName() + "_time_weighted"; + } + + DataTypePtr getReturnType() const override + { + return nested_func->getReturnType(); + } + + bool allocatesMemoryInArena() const override + { + return nested_func->allocatesMemoryInArena(); + } + + bool isState() const override + { + return nested_func->isState(); + } + + bool isVersioned() const override + { + return nested_func->isVersioned(); + } + + size_t getVersionFromRevision(size_t revision) const override + { + return nested_func->getVersionFromRevision(revision); + } + + size_t getDefaultVersion() const override + { + return nested_func->getDefaultVersion(); + } + + AggregateFunctionPtr getNestedFunction() const override { return nested_func; } + +}; +} diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp index 216162332f..ef0f708c24 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -87,6 +87,7 @@ void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactor void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &); void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory &); void registerAggregateFunctionCombinatorMap(AggregateFunctionCombinatorFactory & factory); +void registerAggregateFunctionCombinatorTimeWeighted(AggregateFunctionCombinatorFactory &); void registerWindowFunctions(AggregateFunctionFactory & factory); @@ -224,7 +225,8 @@ void registerAggregateFunctions() registerAggregateFunctionCombinatorResample(factory); registerAggregateFunctionCombinatorDistinct(factory); registerAggregateFunctionCombinatorMap(factory); - + registerAggregateFunctionCombinatorTimeWeighted(factory); + /// proton: starts. Streaming::registerAggregateFunctionCombinatorDistinct(factory); Streaming::registerAggregateFunctionCombinatorDistinctRetract(factory); diff --git a/tests/queries_ported/0_stateless/99010_time_weighted.reference b/tests/queries_ported/0_stateless/99010_time_weighted.reference new file mode 100644 index 0000000000..510d6dbf6c --- /dev/null +++ b/tests/queries_ported/0_stateless/99010_time_weighted.reference @@ -0,0 +1,9 @@ +0 +3.5 4 +3.8 4.35 +3.8 4.35 +3.5 4 +4 6 +4 6 +4 6 +4 6 diff --git a/tests/queries_ported/0_stateless/99010_time_weighted.sql b/tests/queries_ported/0_stateless/99010_time_weighted.sql new file mode 100644 index 0000000000..29c9b4f5f9 --- /dev/null +++ b/tests/queries_ported/0_stateless/99010_time_weighted.sql @@ -0,0 +1,23 @@ +DROP STREAM IF EXISTS test_99010; + +CREATE STREAM test_99010 (val int, a DateTime, b Date, c Date32, d DateTime64); + +INSERT INTO test_99010(val, a, b, c, d) VALUES (1, to_datetime('2024-11-29 12:12:13'), '2024-11-29', '2024-11-29', to_datetime64('2024-11-29 12:12:13.123', 3)); +INSERT INTO test_99010(val, a, b, c, d) VALUES (2, to_datetime('2024-11-29 12:12:16'), '2024-11-30', '2024-11-30', to_datetime64('2024-11-29 12:12:13.126', 3)); +INSERT INTO test_99010(val, a, b, c, d) VALUES (3, to_datetime('2024-11-29 12:12:17'), '2024-12-01', '2024-12-01', to_datetime64('2024-11-29 12:12:13.127', 3)); +INSERT INTO test_99010(val, a, b, c, d) VALUES (4, to_datetime('2024-11-29 12:12:18'), '2024-12-03', '2024-12-03', to_datetime64('2024-11-29 12:12:13.128', 3)); +INSERT INTO test_99010(val, a, b, c, d) VALUES (5, to_datetime('2024-11-29 12:12:19'), '2024-12-28', '2024-12-28', to_datetime64('2024-11-29 12:12:13.129', 3)); +INSERT INTO test_99010(val, a, b, c, d) VALUES (6, to_datetime('2024-11-29 12:12:25'), '2024-12-29', '2024-12-29', to_datetime64('2024-11-29 12:12:13.135', 3)); +SELECT sleep(3); + +SELECT avg_time_weighted(val, a), avg_time_weighted(val, a, to_datetime('2024-11-29 12:12:28')) FROM (SELECT * FROM table(test_99010) ORDER BY a); +SELECT avg_time_weighted(val, b), avg_time_weighted(val, b, cast(to_date('2025-01-08'), 'date')) FROM (SELECT * FROM table(test_99010) ORDER BY b); +SELECT avg_time_weighted(val, c), avg_time_weighted(val, c, to_date('2025-01-08')) FROM (SELECT * FROM table(test_99010) ORDER BY c); +SELECT avg_time_weighted(val, d), avg_time_weighted(val, d, to_datetime64('2024-11-29 12:12:13.138', 3)) FROM (SELECT * FROM table(test_99010) ORDER BY d); +SELECT median_time_weighted(val, a), median_time_weighted(val, a, to_datetime('2024-11-29 12:13:28')) FROM (SELECT * FROM table(test_99010) ORDER BY a); +SELECT median_time_weighted(val, b), median_time_weighted(val, b, cast(to_date('2025-02-07'), 'date')) FROM (SELECT * FROM table(test_99010) ORDER BY b); +SELECT median_time_weighted(val, c), median_time_weighted(val, c, to_date('2025-02-07')) FROM (SELECT * FROM table(test_99010) ORDER BY c); +SELECT median_time_weighted(val, d), median_time_weighted(val, d, to_datetime64('2024-11-29 12:12:14.138', 3)) FROM (SELECT * FROM table(test_99010) ORDER BY d); + +DROP STREAM IF EXISTS test_99010; + diff --git a/tests/stream/test_stream_smoke/0035_streaming_func.json b/tests/stream/test_stream_smoke/0035_streaming_func.json new file mode 100644 index 0000000000..00b8f7cb9e --- /dev/null +++ b/tests/stream/test_stream_smoke/0035_streaming_func.json @@ -0,0 +1,124 @@ +{ + "test_suite_name": "streaming_func", + "tag": "smoke", + "test_suite_config":{ + "setup": { + "statements": [ + ] + }, + "tests_2_run": {"ids_2_run": ["all"], "tags_2_run":[], "tags_2_skip":{"default":["todo", "to_support", "change", "bug", "sample"],"cluster": ["view", "cluster_table_bug"]}} + }, + "comments": "Tests covering query state checkpointing smoke test cases", + "tests": [ + { + "id": 1, + "tags": ["query_state"], + "name": "global_aggr_with_fun_avg_time_weighted", + "description": "global aggregation with function avg_time_weighted state checkpoint", + "steps":[ + { + "statements": [ + {"client":"python", "query_type": "table", "query":"drop stream if exists test35_state_stream1"}, + {"client":"python", "query_type": "table", "exist":"test35_state_stream1", "exist_wait":2, "wait":1, "query":"create stream test35_state_stream1 (val int32, timestamp datetime64(3) default now64(3))"}, + {"client":"python", "query_type": "stream", "query_id":"3600", "depends_on_stream":"test35_state_stream1", "wait":1, "terminate":"manual", "query":"subscribe to select avg_time_weighted(val, timestamp) from test35_state_stream1 emit periodic 1s settings checkpoint_interval=1"}, + {"client":"python", "query_type": "table", "depends_on":"3600", "kill":"3600", "kill_wait":5, "wait":3, "query": "insert into test35_state_stream1(val, timestamp) values (1, '2020-02-02 20:00:00'), (2, '2020-02-02 20:00:01'), (3, '2020-02-02 20:00:03'), (3, '2020-02-02 20:00:04'), (3, '2020-02-02 20:00:05')"}, + {"client":"python", "query_type": "table", "depends_on":"3600", "kill":"3600", "kill_wait":5, "wait":3, "query": "insert into test35_state_stream1(val, timestamp) values (6, '2020-02-02 20:00:10')"}, + {"client":"python", "query_type": "table", "wait":1, "query":"unsubscribe to '3600'"} + ] + } + ], + "expected_results": [ + { + "query_id":"3600", + "expected_results":[ + ["2.2"] + ] + } + ] + }, + { + "id": 2, + "tags": ["query_state"], + "name": "global_aggr_with_fun_median_time_weighted", + "description": "global aggregation with function median_time_weighted state checkpoint", + "steps":[ + { + "statements": [ + {"client":"python", "query_type": "table", "query":"drop stream if exists test35_state_stream1"}, + {"client":"python", "query_type": "table", "exist":"test35_state_stream1", "exist_wait":2, "wait":1, "query":"create stream test35_state_stream1 (val int32, timestamp datetime64(3) default now64(3))"}, + {"client":"python", "query_type": "stream", "query_id":"3602", "depends_on_stream":"test35_state_stream1", "wait":1, "terminate":"manual", "query":"subscribe to select median_time_weighted(val, timestamp) from test35_state_stream1 emit periodic 1s settings checkpoint_interval=1"}, + {"client":"python", "query_type": "table", "depends_on":"3602", "kill":"3602", "kill_wait":5, "wait":3, "query": "insert into test35_state_stream1(val, timestamp) values (1, '2020-02-02 20:00:00'), (2, '2020-02-02 20:00:01'), (3, '2020-02-02 20:00:03'), (3, '2020-02-02 20:00:04'), (3, '2020-02-02 20:00:05')"}, + {"client":"python", "query_type": "table", "wait":1, "query":"unsubscribe to '3602'"} + ] + } + ], + "expected_results": [ + { + "query_id":"3602", + "expected_results":[ + ["2"] + ] + } + ] + }, + { + "id": 3, + "tags": ["hopping window"], + "name": "stream hopping window avg_time_weighted(value) aggregate group by id, window_start, window_end, with emit_version()", + "description": "tumble window aggregate by avg_time_weighted and group by id, window_start, window_end", + "steps": [ + { + "statements": [ + {"client":"python", "query_type": "table", "query":"drop stream if exists test35_state_stream1"}, + {"client":"python", "query_type": "table", "exist":"test35_state_stream1", "exist_wait":2, "wait":1, "query":"create stream test35_state_stream1 (id string, value float64, timestamp datetime)"}, + {"client":"python","query_id":"3603", "query_type": "stream","depends_on_stream":"test35_state_stream1", "terminate":"manual","query":"subscribe to select avg_time_weighted(value, timestamp, window_end) from hop(test35_state_stream1, timestamp, interval 3 second, interval 5 second) group by id, window_start, window_end emit stream settings checkpoint_interval=1"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 10.0, '2020-02-02 20:00:00')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 10.5, '2020-02-02 20:00:01')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 12.0, '2020-02-02 20:00:02')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 15.2, '2020-02-02 20:00:03')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 18.3, '2020-02-02 20:00:04')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 21.0, '2020-02-02 20:00:05')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 28.0, '2020-02-02 20:00:07')"}, + {"client":"python", "query_type": "table", "depends_on":"3603", "kill":"3603", "kill_wait":3, "wait":1, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 30.0, '2020-02-02 20:00:08')"}, + {"client":"python", "query_type": "table", "wait":1, "query":"unsubscribe to '3603'"} + ] + } + ], + "expected_results": [ + { + "query_id":"3603", "expected_results":[ + ["10.25"], + ["13.2"], + ["20.7"] + ] + } + ] + }, + { + "id": 4, + "tags": ["hopping window"], + "name": "stream hopping window median_time_weighted(value) aggregate group by id, window_start, window_end, with emit_version()", + "description": "tumble window aggregate by median_time_weighted and group by id, window_start, window_end", + "steps": [ + { + "statements": [ + {"client":"python", "query_type": "table", "query":"drop stream if exists test35_state_stream1"}, + {"client":"python", "query_type": "table", "exist":"test35_state_stream1", "exist_wait":2, "wait":1, "query":"create stream test35_state_stream1 (id string, value float64, timestamp datetime)"}, + {"client":"python","query_id":"3604", "query_type": "stream","depends_on_stream":"test35_state_stream1", "terminate":"manual","query":"subscribe to select median_time_weighted (value, timestamp, window_end) from hop(test35_state_stream1, timestamp, interval 3 second, interval 5 second) group by id, window_start, window_end emit stream settings checkpoint_interval=1"}, + {"client":"python", "query_type": "table", "depends_on":"3604", "kill":"3603", "kill_wait":8, "wait":5, "query": "insert into test35_state_stream1(id, value, timestamp) values ('dev1', 10, '2020-02-02 20:00:00'), ('dev1', 10.5, '2020-02-02 20:00:01'), ('dev1', 12.0, '2020-02-02 20:00:02'), ('dev1', 18.3, '2020-02-02 20:00:04'), ('dev1', 25.5, '2020-02-02 20:00:06'), ('dev1', 28.0, '2020-02-02 20:00:07'), ('dev1', 30.0, '2020-02-02 20:00:08')"}, + {"client":"python", "query_type": "table", "wait":1, "query":"unsubscribe to '3604'"} + ] + } + ], + "expected_results": [ + { + "query_id":"3604", "expected_results":[ + ["10"], + ["12"], + ["18.3"] + ] + } + ] + } + ] +}