Skip to content

Commit

Permalink
Prep for 1.8.0b2 (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Mar 11, 2024
1 parent 18560b8 commit 10ffa7f
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 12 deletions.
11 changes: 8 additions & 3 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,17 @@ def _describe_relation(
)

kwargs = {"relation": relation}
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["information_schema.views"] = cls._get_information_schema_views(adapter, kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
return results

@staticmethod
def _get_information_schema_views(adapter: DatabricksAdapter, kwargs: Dict[str, Any]) -> Row:
row = get_first_row(adapter.execute_macro("get_view_description", kwargs=kwargs))
if "view_definition" in row.keys() and row["view_definition"] is not None:
return row
return get_first_row(adapter.execute_macro("get_view_description_alt", kwargs=kwargs))


class StreamingTableAPI(RelationAPIBase[StreamingTableConfig]):
relation_type = DatabricksRelationType.StreamingTable
Expand Down
19 changes: 17 additions & 2 deletions dbt/include/databricks/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,23 @@

{% macro get_view_description(relation) %}
{% call statement('get_view_description', fetch_result=True) -%}
select * from {{ relation.information_schema() }}.`views` where table_schema = '{{ relation.schema }}' and table_name = '{{ relation.identifier }}'
{% endcall %}
select *
from {{ relation.information_schema() }}.`views`
where table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
{%- endcall -%}

{% do return(load_result('get_view_description').table) %}
{% endmacro %}

{% macro get_view_description_alt(relation) %}
{% call statement('get_view_description_alt', fetch_result=True) -%}
select *
from `system`.`information_schema`.`views`
where table_catalog = '{{ relation.database }}'
and table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
{% endcall %}

{% do return(load_result('get_view_description_alt').table) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
{%- set comment = materialized_view.config["comment"].comment -%}
{%- set refresh = materialized_view.config["refresh"] -%}
create materialized view {{ relation }}
{{ get_create_sql_partition_by(partition_by) }}
{{ get_create_sql_comment(comment) }}
{{ get_create_sql_tblproperties(tblproperties) }}
{{ get_create_sql_refresh_schedule(refresh.cron, refresh.time_zone_value) }}
as
{{ sql }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
{% macro get_refresh_materialized_view_sql(relation) -%}
{{ adapter.dispatch('get_refresh_materialized_view_sql', 'dbt')(relation) }}
{%- endmacro %}

{% macro databricks__get_refresh_materialized_view_sql(relation) -%}
{% macro databricks__refresh_materialized_view(relation) -%}
refresh materialized view {{ relation }}
{% endmacro %}
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
databricks-sql-connector>=3.1.0, <3.2.0
dbt-spark>=1.8.0b1
dbt-spark~=1.8.0b1
databricks-sdk==0.17.0
keyring>=23.13.0
pandas<2.2.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _get_plugin_version() -> str:
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
include_package_data=True,
install_requires=[
"dbt-spark @ git+https://github.com/dbt-labs/dbt-spark",
"dbt-spark~=1.8.0b1",
"databricks-sql-connector>=3.1.0, <3.2.0",
"databricks-sdk==0.17.0",
"keyring>=23.13.0",
Expand Down
96 changes: 96 additions & 0 deletions tests/functional/adapter/materialized_view_tests/test_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Optional
from dbt.tests.adapter.materialized_view.changes import (
MaterializedViewChanges,
MaterializedViewChangesApplyMixin,
MaterializedViewChangesContinueMixin,
MaterializedViewChangesFailMixin,
)
from dbt.adapters.base import BaseRelation
from dbt.tests import util
import pytest
from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig

from tests.functional.adapter.materialized_view_tests import fixtures


def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict):
final_tblproperties = {
k: v for k, v in tblproperties.tblproperties.items() if not k.startswith("pipeline")
}
assert final_tblproperties == expected


class MaterializedViewChangesMixin(MaterializedViewChanges):
@pytest.fixture(scope="class", autouse=True)
def models(self):
return {"my_materialized_view.sql": fixtures.materialized_view}

@staticmethod
def check_start_state(project, materialized_view):
with util.get_connection(project.adapter):
results = project.adapter.get_relation_config(materialized_view)
assert isinstance(results, MaterializedViewConfig)
assert results.config["partition_by"].partition_by == ["id"]
assert results.config["query"].query.startswith("select * from")
_check_tblproperties(results.config["tblproperties"], {"key": "value"})
assert results.config["refresh"].cron == "0 0 * * * ? *"
assert results.config["refresh"].time_zone_value == "Etc/UTC"

@staticmethod
def change_config_via_alter(project, materialized_view):
initial_model = util.get_model_file(project, materialized_view)
new_model = initial_model.replace("'cron': '0 0 * * * ? *'", "'cron': '0 5 * * * ? *'")
util.set_model_file(project, materialized_view, new_model)

@staticmethod
def check_state_alter_change_is_applied(project, materialized_view):
with util.get_connection(project.adapter):
results = project.adapter.get_relation_config(materialized_view)
assert isinstance(results, MaterializedViewConfig)
assert results.config["refresh"].cron == "0 5 * * * ? *"
assert results.config["refresh"].time_zone_value == "Etc/UTC"

@staticmethod
def change_config_via_replace(project, materialized_view):
initial_model = util.get_model_file(project, materialized_view)
new_model = (
initial_model.replace("partition_by='id',", "")
.replace("select *", "select id, value")
.replace("'key': 'value'", "'other': 'other'")
)
util.set_model_file(project, materialized_view, new_model)

@staticmethod
def check_state_replace_change_is_applied(project, materialized_view):
with util.get_connection(project.adapter):
results = project.adapter.get_relation_config(materialized_view)
assert isinstance(results, MaterializedViewConfig)
assert results.config["partition_by"].partition_by == []
assert results.config["query"].query.startswith("select id, value")
_check_tblproperties(results.config["tblproperties"], {"other": "other"})

@staticmethod
def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
return fixtures.query_relation_type(project, relation)


@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
class TestMaterializedViewApplyChanges(
MaterializedViewChangesMixin, MaterializedViewChangesApplyMixin
):
pass


@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
class TestMaterializedViewContinueOnChanges(
MaterializedViewChangesMixin, MaterializedViewChangesContinueMixin
):
pass


@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
class TestMaterializedViewFailOnChanges(
MaterializedViewChangesMixin, MaterializedViewChangesFailMixin
):
pass

0 comments on commit 10ffa7f

Please sign in to comment.