Skip to content

Commit

Permalink
Merge pull request #349 from BentsiLeviav/main
Browse files Browse the repository at this point in the history
Fix multiple projections issue
  • Loading branch information
BentsiLeviav authored Sep 1, 2024
2 parents 150a827 + 656cc23 commit facacf9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
6 changes: 3 additions & 3 deletions dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@
{{ create_table }}
{% endcall %}
{% if config.get('projections')%}
{% call statement('add_projections') %}
{{ projection_statement(relation) }}
{%endcall %}
{% endif %}


Expand All @@ -162,10 +160,12 @@
{%- set projections = config.get('projections', default=[]) -%}

{%- for projection in projections %}
ALTER TABLE {{ relation }} ADD PROJECTION {{ projection.get('name') }}
{% call statement('add_projections') %}
ALTER TABLE {{ relation }} ADD PROJECTION {{ projection.get('name') }}
(
{{ projection.get('query') }}
)
{%endcall %}
{%- endfor %}
{%- endmacro %}

Expand Down
83 changes: 83 additions & 0 deletions tests/integration/adapter/projections/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@
from {{ source('raw', 'people') }}
"""

PEOPLE_MODEL_WITH_MULTIPLE_PROJECTIONS = """
{{ config(
materialized='%s',
projections=[
{
'name': 'projection_avg_age',
'query': 'SELECT department, avg(age) AS avg_age GROUP BY department'
},
{
'name': 'projection_sum_age',
'query': 'SELECT department, sum(age) AS avg_age GROUP BY department'
}
]
) }}
select
id,
name,
age,
department
from {{ source('raw', 'people') }}
"""

SEED_SCHEMA_YML = """
version: 2
Expand All @@ -58,6 +81,8 @@ def models(self):
"people_with_projection.sql": PEOPLE_MODEL_WITH_PROJECTION % "table",
"distributed_people_with_projection.sql": PEOPLE_MODEL_WITH_PROJECTION
% "distributed_table",
"people_with_multiple_projections.sql": PEOPLE_MODEL_WITH_MULTIPLE_PROJECTIONS
% "table",
}

def test_create_and_verify_projection(self, project):
Expand Down Expand Up @@ -90,6 +115,64 @@ def test_create_and_verify_projection(self, project):

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age']

def test_create_and_verify_multiple_projections(self, project):
run_dbt(["seed"])
run_dbt()

relation = relation_from_name(project.adapter, "people_with_multiple_projections")

# test the first projection
unique_query_identifier = str(uuid.uuid4())
query = f""" -- {unique_query_identifier}
SELECT department, avg(age) AS avg_age FROM {project.test_schema}.{relation.name}
GROUP BY department ORDER BY department"""

# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]

# waiting for system.log table to be created
time.sleep(10)

# check that the latest query used the projection
result = project.run_sql(
f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
f"WHERE query like '%{unique_query_identifier}%' "
f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
fetch="all",
)
assert len(result) > 0
assert query in result[0][0]

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age']

# test the second projection
unique_query_identifier = str(uuid.uuid4())
query = f""" -- {unique_query_identifier}
SELECT department, sum(age) AS sum_age FROM {project.test_schema}.{relation.name}
GROUP BY department ORDER BY department"""

# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 131), ('malware', 40), ('sales', 25)]

# waiting for system.log table to be created
time.sleep(10)

# check that the latest query used the projection
result = project.run_sql(
f"SELECT query, projections FROM clusterAllReplicas(default, 'system.query_log') "
f"WHERE query like '%{unique_query_identifier}%' "
f"and query not like '%system.query_log%' and read_rows > 0 ORDER BY query_start_time DESC",
fetch="all",
)
assert len(result) > 0
assert query in result[0][0]

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_sum_age']

@pytest.mark.xfail
@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
Expand Down

0 comments on commit facacf9

Please sign in to comment.