Skip to content

Commit

Permalink
refactor to a more generic Aggregate_With_Helper_Expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
radeusgd committed Aug 22, 2024
1 parent 997619c commit 9021a49
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Standard.Base.Errors.Deprecated.Deprecated
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument

import Standard.Table.Internal.Aggregate_Column_Helper
import Standard.Table.Internal.Aggregate_Column_Helper.Internal_Order_By_Column_Reference
import Standard.Table.Internal.Problem_Builder.Problem_Builder
from Standard.Table import Aggregate_Column
from Standard.Table.Aggregate_Column.Aggregate_Column import all
Expand Down Expand Up @@ -31,7 +32,7 @@ from project.Errors import Unsupported_Database_Operation
operation, list of input columns and a raw SQL IR Expression) and returns
the inferred type for the aggregation.
- problem_builder: A `Problem_Builder` instance used for reporting warnings.
make_aggregate_column : DB_Table -> Aggregate_Column -> Text -> Dialect -> (Any -> Any -> Any -> SQL_Type_Reference) -> Problem_Builder -> Internal_Column
make_aggregate_column : DB_Table -> Aggregate_Column -> Text -> Dialect -> (Text -> Vector -> SQL_Expression -> SQL_Type_Reference) -> Problem_Builder -> Internal_Column
make_aggregate_column table aggregate as dialect infer_return_type problem_builder -> Internal_Column =
simple_aggregate op_kind columns =
expression = dialect.cast_op_type op_kind columns (SQL_Expression.Operation op_kind (columns.map c->c.expression))
Expand Down Expand Up @@ -129,7 +130,7 @@ aggregate table:DB_Table group_by:(Vector | Text | Integer | Regex) columns:Vect
problem_builder = Problem_Builder.new
# If the dialect defines `custom_build_aggregate` we will use it, falling back to the default implementation if not defined.
aggregate_builder = Panic.catch No_Such_Method (dialect.custom_build_aggregate) _->
default_build_aggregate dialect
default_build_aggregate make_aggregate_column dialect
result = aggregate_builder base_table key_columns resolved_aggregates problem_builder
new_ctx = result.first
built_aggregates = result.second
Expand All @@ -150,15 +151,14 @@ aggregate table:DB_Table group_by:(Vector | Text | Integer | Regex) columns:Vect
Warning.attach (Deprecated.Warning "Standard.Table.Aggregate_Column.Aggregate_Column" "Group_By" "Deprecated: `Group_By` constructor has been deprecated, use the `group_by` argument instead.") result

## PRIVATE
default_build_aggregate dialect base_table key_columns resolved_aggregates problem_builder =
default_build_aggregate build_aggregate dialect base_table key_columns resolved_aggregates problem_builder =
key_expressions = key_columns.map .expression
new_ctx = base_table.context.set_groups key_expressions
# TODO new_ctx fits here? I think it makes more sense than base_table.context that used to be here
infer_return_type = make_infer_return_type dialect base_table.connection new_ctx
results = resolved_aggregates.map p->
agg = p.second
as = p.first
result = make_aggregate_column base_table agg as dialect infer_return_type problem_builder
result = build_aggregate base_table agg as dialect infer_return_type problem_builder
## If the `result` did contain an error, we catch it to be
able to store it in a vector and then we will partition the
created columns and failures.
Expand All @@ -182,93 +182,80 @@ make_infer_return_type dialect connection context =
infer_return_type

## PRIVATE
An implementation that can be used for `custom_build_aggregate` in dialects
that do not implement First/Last aggregations natively and instead need to
use a ROW_NUMBER sub-query trick. This is e.g. SQLite and Snowflake.
Setup for building aggregates that may require additional helper expressions.
The expressions are evaluated in the context of the base query, for each row
- so things like row number can be used. Then the actual aggregates are built
in the context of a subquery that can access these additional expressions as
fields of the parent query.

If a First/Last aggregation is requested, a sub-query containing a row number
with requested ordering is created, and that row number expression will be
passed to the aggregate instead of the ordering.
If several aggregations with different orderings are requested, a separate
row number column is created for each ordering. Aggregations with the same
ordering will re-use the same column.
The `create_helper_expressions` method takes one aggregate and the vector of
key columns, and it should return a vector of expressions
needed for a specific query, along with their name hints - these hints are
used for generating the temporary column names - to make queries a bit easier
to debug. Then, expressions corresponding to the requested ones (but
transformed to refer to the subquery) are passed to `build_aggregate` as the
third argument.

If no First/Last aggregations are requested, no sub-queries are created - in
fact we just delegate to the default implementation.
build_aggregate_with_row_number_for_first_last dialect base_table key_columns resolved_aggregates problem_builder =
if (has_first_last resolved_aggregates).not then default_build_aggregate dialect base_table key_columns resolved_aggregates problem_builder else
needed_orderings = resolved_aggregates.flat_map agg-> case agg.second of
Aggregate_Column.First _ _ _ order_by -> if is_non_empty_selector order_by then [order_by] else []
Aggregate_Column.Last _ _ _ order_by -> if is_non_empty_selector order_by then [order_by] else []
_ -> []
The expressions are deduplicated - if multiple aggregations rely on the same
expression, only one will be added for 'efficiency'.

# This function will infer return type at the context of the current query - used to extend it with row numbers.
# infer_return_type_in_orderings = make_infer_return_type dialect base_table.connection base_table.context
name_generator = base_table.column_naming_helper.create_unique_name_strategy
name_generator.mark_used base_table.column_names
key_expressions_for_orderings = key_columns.map .expression
numbering_columns = Dictionary.from_vector <| needed_orderings.distinct.map orderings->
name = name_generator.make_unique "row-number"
order_descriptors = orderings.map o-> dialect.prepare_order_descriptor o.column o.direction text_ordering=Nothing
expression = Row_Number_Helpers.make_row_number 1 1 order_descriptors key_expressions_for_orderings
# We can ignore the type for this column as it is used only internally.
column = Internal_Column.Value name SQL_Type_Reference.null expression
[orderings, column]
If no aggregation requires additional expressions, no additional subquery is
created.
type Aggregate_With_Helper_Expressions
## PRIVATE
Value (create_helper_expressions : Aggregate_Column -> Vector Internal_Column -> Vector (Pair Text SQL_Expression)) (make_aggregate : Aggregate_Column -> Text -> Vector Internal_Column -> DB_Table -> (Text -> Vector -> SQL_Expression -> SQL_Type_Reference) -> Problem_Builder -> Internal_Column)

subquery_setup = base_table.context.as_subquery base_table.name [base_table.internal_columns, numbering_columns.values]
updated_numbering_columns = numbering_columns.map subquery_setup.remap_column
## PRIVATE
This method should be declared as result of `dialect.custom_build_aggregate` to use this setup.
build self dialect base_table key_columns resolved_aggregates problem_builder =
helper_expressions_for_aggregates = resolved_aggregates.map p-> self.create_helper_expressions p.second key_columns
needed_expressions = helper_expressions_for_aggregates.flatten.distinct
case needed_expressions.is_empty of
# If no special expressions needed, we fallback to the `default_build_aggregate` but still use any overrides from `make_aggregate` method.
True ->
adapted_make_aggregate base_table aggregate as _ infer_return_type problem_builder =
self.make_aggregate aggregate as [] base_table infer_return_type problem_builder
default_build_aggregate adapted_make_aggregate dialect base_table key_columns resolved_aggregates problem_builder
False ->
name_generator = base_table.column_naming_helper.create_unique_name_strategy
name_generator.mark_used base_table.column_names
helper_columns = needed_expressions.map p->
name = name_generator.make_unique p.first
Internal_Column.Value name SQL_Type_Reference.null p.second

# The context referring to the outer query that contains the subquery with additional columns and also is now grouped for the aggregation:
remapped_key_expressions = key_columns.map key_column->
subquery_setup.remap_column key_column . expression
new_ctx = (Context.for_subquery subquery_setup.subquery).set_groups remapped_key_expressions
subquery_setup = base_table.context.as_subquery base_table.name [base_table.internal_columns, helper_columns]
remapped_key_expressions = key_columns.map key_column->
subquery_setup.remap_column key_column . expression
new_ctx = (Context.for_subquery subquery_setup.subquery).set_groups remapped_key_expressions

# This function will infer return type at the context of the subquery with aggregation groupings.
infer_return_type_in_new_context = make_infer_return_type dialect base_table.connection new_ctx
results = resolved_aggregates.map p->
# TODO throw if ordering selector is empty...
original_aggregate = p.second
as = p.first
updated_aggregate = map_column_inputs subquery_setup.remap_column original_aggregate
# Mapping from a requested expression represented by (Pair Text SQL_Expression) to the column generated for that expression, in the subquery.
helper_columns_mapping = Dictionary.from_vector <| needed_expressions.zip subquery_setup.new_columns.second

make_first_last op_kind =
row_number = updated_numbering_columns.at (original_aggregate.order_by)
op = case updated_aggregate.ignore_nothing of
False -> op_kind
True -> op_kind + "_NOT_NULL"
# We just inherit the type of the source column, as the FIRST/LAST element of a column should have the same type.
sql_type_reference = updated_aggregate.column.sql_type_reference
Internal_Column.Value as sql_type_reference (SQL_Expression.Operation op [updated_aggregate.column.expression, row_number.expression])
# For each aggregate, we map the requested expressions to the columns generated in the subquery.
helper_columns_for_aggregates = helper_expressions_for_aggregates.map requested_expressions->
requested_expressions.map helper_columns_mapping.at

case updated_aggregate of
Aggregate_Column.First _ _ _ _ ->
make_first_last "FIRST"
Aggregate_Column.Last _ _ _ _ ->
make_first_last "LAST"
_ ->
# For all other aggregates we fallback to the default implementation.
result = make_aggregate_column base_table updated_aggregate as dialect infer_return_type_in_new_context problem_builder
infer_return_type_in_new_context = make_infer_return_type dialect base_table.connection new_ctx
results = resolved_aggregates.zip helper_columns_for_aggregates p-> helper_columns->
original_aggregate = p.second
as = p.first
updated_aggregate = map_column_inputs subquery_setup.remap_column original_aggregate
result = self.make_aggregate updated_aggregate as helper_columns dialect infer_return_type_in_new_context problem_builder
result.catch Any error->(DB_Wrapped_Error.Value error)
Pair.new new_ctx results


## PRIVATE
We check if aggregates requiring special handling are present.
has_first_last resolved_aggregates = resolved_aggregates.any agg->
case agg.second of
Aggregate_Column.First _ _ _ _ -> True
Aggregate_Column.Last _ _ _ _ -> True
_ -> False
Pair.new new_ctx results

## PRIVATE
Applies a mapping to column inputs of the aggregate.
It does not modify the columns inside of the `order_by` argument though.
map_column_inputs f:Function aggregate_column:Aggregate_Column -> Aggregate_Column =
update_order_by order_by = if order_by.is_nothing then Nothing else
order_by.map x-> case x of
Internal_Order_By_Column_Reference.Value c direction -> Internal_Order_By_Column_Reference.Value (f c) direction

case aggregate_column of
Group_By c as -> Group_By (f c) as
Count as -> Count as
Count_Distinct c as ignore_nothing ->
Count_Distinct (c.map f) as ignore_nothing
Count_Distinct ((c:Vector).map f) as ignore_nothing
Count_Not_Nothing c as -> Count_Not_Nothing (f c) as
Count_Nothing c as -> Count_Nothing (f c) as
Count_Not_Empty c as -> Count_Not_Empty (f c) as
Expand All @@ -280,8 +267,8 @@ map_column_inputs f:Function aggregate_column:Aggregate_Column -> Aggregate_Colu
Mode c as -> Mode (f c) as
Standard_Deviation c as population -> Standard_Deviation (f c) as population
Concatenate c as separator prefix suffix quote_char -> Concatenate (f c) as separator prefix suffix quote_char
First c as ignore_nothing order_by -> First (f c) as ignore_nothing order_by
Last c as ignore_nothing order_by -> Last (f c) as ignore_nothing order_by
First c as ignore_nothing order_by -> First (f c) as ignore_nothing (update_order_by order_by)
Last c as ignore_nothing order_by -> Last (f c) as ignore_nothing (update_order_by order_by)
Maximum c as -> Maximum (f c) as
Minimum c as -> Minimum (f c) as
Shortest c as -> Shortest (f c) as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import Standard.Database.DB_Column.DB_Column
import Standard.Database.DB_Table.DB_Table
import Standard.Database.Dialect
import Standard.Database.Internal.Aggregate_Helper
import Standard.Database.Internal.Aggregate_Helper.Aggregate_With_Helper_Expressions
import Standard.Database.Internal.Base_Generator
import Standard.Database.Internal.Common.Database_Distinct_Helper
import Standard.Database.Internal.Common.Database_Join_Helper
import Standard.Database.Internal.Common.Row_Number_Helpers
import Standard.Database.Internal.Error_Mapper.Error_Mapper
import Standard.Database.Internal.Internals_Access
import Standard.Database.Internal.IR.Context.Context
Expand Down Expand Up @@ -316,7 +318,38 @@ type Snowflake_Dialect
False

## PRIVATE
custom_build_aggregate self = Aggregate_Helper.build_aggregate_with_row_number_for_first_last self
custom_build_aggregate self =
create_row_number_for orderings key_columns =
order_descriptors = orderings.map o-> self.prepare_order_descriptor o.column o.direction text_ordering=Nothing
key_expressions_for_orderings = key_columns.map .expression
expression = Row_Number_Helpers.make_row_number 1 1 order_descriptors key_expressions_for_orderings
Pair.new "row-number" expression

create_helper_expressions aggregate_column key_columns =
make_selector_for_first_last order_by =
if Aggregate_Helper.is_non_empty_selector order_by then [create_row_number_for order_by key_columns] else []
case aggregate_column of
Aggregate_Column.First _ _ _ order_by -> make_selector_for_first_last order_by
Aggregate_Column.Last _ _ _ order_by -> make_selector_for_first_last order_by
_ -> []

make_aggregate aggregate_column as helper_expressions base_table infer_return_type problem_builder =
make_first_last op_kind = if Aggregate_Helper.is_non_empty_selector aggregate_column.order_by . not then Aggregate_Helper.throw_ordering_required (op_kind.to_case ..Title) else
Runtime.assert (helper_expressions.length == 1)
row_number = helper_expressions.first
op = case aggregate_column.ignore_nothing of
False -> op_kind
True -> op_kind + "_NOT_NULL"
# We just inherit the type of the source column, as the FIRST/LAST element of a column should have the same type.
sql_type_reference = aggregate_column.column.sql_type_reference
Internal_Column.Value as sql_type_reference (SQL_Expression.Operation op [aggregate_column.column.expression, row_number.expression])
case aggregate_column of
Aggregate_Column.First _ _ _ _ -> make_first_last "FIRST"
Aggregate_Column.Last _ _ _ _ -> make_first_last "LAST"
_ -> Aggregate_Helper.make_aggregate_column base_table aggregate_column as self infer_return_type problem_builder

setup = Aggregate_With_Helper_Expressions.Value create_helper_expressions make_aggregate
setup.build self

## PRIVATE
make_dialect_operations =
Expand Down

0 comments on commit 9021a49

Please sign in to comment.