Skip to content

Commit

Permalink
Add support for Iceberg table format in Dynamic Tables (#1183)
Browse files Browse the repository at this point in the history
* add support for iceberg dynamic tables
* remove is_dynamic-related guards as that is ga now
* simplify dynamic table testing
* add iceberg dynamic tables to existing dynamic table tests
* add standard incremental tables into the relation swap scenarios
* account for the fact that snowflake does not support renaming iceberg relations
* account for all scenarios when swapping relation types, including those which currently require a full refresh
* make it clearer which scenarios are included in each run and why by pulling the criteria into one function

---------

Co-authored-by: Mila Page <67295367+VersusFacit@users.noreply.github.com>
  • Loading branch information
mikealfare and VersusFacit authored Sep 27, 2024
1 parent 423111f commit 0521395
Show file tree
Hide file tree
Showing 15 changed files with 593 additions and 66 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240917-100505.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table format in Dynamic Tables
time: 2024-09-17T10:05:05.609859-04:00
custom:
Author: mikealfare
Issue: "1183"
15 changes: 15 additions & 0 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeCatalogConfigChange,
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
Expand Down Expand Up @@ -114,6 +115,12 @@ def dynamic_table_config_changeset(
context=new_dynamic_table.refresh_mode,
)

if new_dynamic_table.catalog != existing_dynamic_table.catalog:
config_change_collection.catalog = SnowflakeCatalogConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.catalog,
)

if config_change_collection.has_changes:
return config_change_collection
return None
Expand All @@ -132,6 +139,14 @@ def as_case_sensitive(self) -> "SnowflakeRelation":

return self.replace_path(**path_part_map)

@property
def can_be_renamed(self) -> bool:
"""
Standard tables and dynamic tables can be renamed, but Snowflake does not support renaming iceberg relations.
The iceberg standard does support renaming, so this may change in the future.
"""
return self.type in self.renameable_relations and not self.is_iceberg_format

def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as
Expand Down
6 changes: 5 additions & 1 deletion dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
123 changes: 123 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TYPE_CHECKING, Set

if TYPE_CHECKING:
import agate

from dbt.adapters.relation_configs import (
RelationConfigChange,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtConfigError
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.formats import TableFormat


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
The following parameters are configurable by dbt:
- table_format: format for interfacing with the table, e.g. default, iceberg
- external_volume: name of the external volume in Snowflake
- base_location: the directory within the external volume that contains the data
*Note*: This directory can’t be changed after you create a table.
The following parameters are not currently configurable by dbt:
- name: snowflake
"""

table_format: Optional[TableFormat] = TableFormat.default()
name: Optional[str] = "SNOWFLAKE"
external_volume: Optional[str] = None
base_location: Optional[str] = None

@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.base_location is not None),
DbtConfigError("Please provide a `base_location` when using iceberg"),
),
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"),
DbtConfigError(
"Only Snowflake catalogs are currently supported when using iceberg"
),
),
}

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": config_dict.get("name"),
"external_volume": config_dict.get("external_volume"),
"base_location": config_dict.get("base_location"),
}
if table_format := config_dict.get("table_format"):
kwargs_dict["table_format"] = TableFormat(table_format)
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:

if relation_config.config.extra.get("table_format") is None:
return {}

config_dict = {
"table_format": relation_config.config.extra.get("table_format"),
"name": "SNOWFLAKE", # this is not currently configurable
}

if external_volume := relation_config.config.extra.get("external_volume"):
config_dict["external_volume"] = external_volume

if base_location := relation_config.config.extra.get("base_location_subpath"):
config_dict["base_location"] = base_location

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}

if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfigChange(RelationConfigChange):
context: Optional[SnowflakeCatalogConfig] = None

@property
def requires_full_refresh(self) -> bool:
return True
24 changes: 17 additions & 7 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -69,12 +75,12 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
return dynamic_table
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
Expand All @@ -85,18 +91,19 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
config_dict.update(refresh_mode=refresh_mode.upper())
config_dict["refresh_mode"] = refresh_mode.upper()

if initialize := relation_config.config.extra.get("initialize"):
config_dict.update(initialize=initialize.upper())
config_dict["initialize"] = initialize.upper()

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0]

config_dict = {
Expand All @@ -106,6 +113,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}
Expand Down Expand Up @@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None
catalog: Optional[SnowflakeCatalogConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
Expand All @@ -157,9 +166,10 @@ def requires_full_refresh(self) -> bool:
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
self.catalog.requires_full_refresh if self.catalog else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog])
5 changes: 5 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self


class TableFormat(StrEnum):
Expand All @@ -10,5 +11,9 @@ class TableFormat(StrEnum):
DEFAULT = "default"
ICEBERG = "iceberg"

@classmethod
def default(cls) -> Self:
return cls("default")

def __str__(self):
return self.value
79 changes: 73 additions & 6 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic table
--
-- Args:
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Globals:
-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig
-- Returns:
-- A valid DDL statement which will result in a new dynamic table.
-#}

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
{%- endif -%}

{%- endmacro %}


{% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a standard dynamic table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic standard table.
-#}

create dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)

{%- endmacro %}


{% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic iceberg table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = {{ dynamic_table.catalog.base_location }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)
Expand Down
Loading

0 comments on commit 0521395

Please sign in to comment.