Skip to content

Commit

Permalink
Merge remote-tracking branch 'vingov/apache_hudi_support' into 0.21.l…
Browse files Browse the repository at this point in the history
…atest
  • Loading branch information
cadl committed Oct 13, 2021
2 parents 143c39a + 3671926 commit 24e9fc6
Show file tree
Hide file tree
Showing 20 changed files with 272 additions and 8 deletions.
9 changes: 9 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ jobs:
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.driver.userClassPathFirst=true
--conf spark.hadoop.datanucleus.autoCreateTables=true
--conf spark.hadoop.datanucleus.schema.autoCreateTables=true
--conf spark.hadoop.datanucleus.fixedDatastore=false
--conf spark.sql.hive.convertMetastoreParquet=false
--conf hive.metastore.schema.verification=false
- image: postgres:9.6.17-alpine
environment:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

### Under the hood
- Add `unique_field` to better understand adapter adoption in anonymous usage tracking ([#211](https://github.com/dbt-labs/dbt-spark/pull/211))
- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies: Issue ([#187](https://github.com/dbt-labs/dbt-spark/issues/187))

### Contributors
- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204))
- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210))

## dbt-spark 0.21.0b2 (August 20, 2021)

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class SparkAdapter(SQLAdapter):
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
HUDI_METADATA_COLUMNS = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name'
]

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -143,12 +150,14 @@ def list_relations_without_caching(
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
is_hudi = 'Provider: hudi' in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

Expand Down Expand Up @@ -222,6 +231,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

# strip hudi metadata columns.
columns = [x for x in columns
if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: str = None

def __post_init__(self):
Expand Down
13 changes: 12 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@

{% macro options_clause() -%}
{%- set options = config.get('options') -%}
{%- if config.get('file_format') == 'hudi' -%}
{%- set unique_key = config.get('unique_key') -%}
{%- if unique_key is not none and options is none -%}
{%- set options = {'primaryKey': config.get('unique_key')} -%}
{%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%}
{%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%}
{%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%}
{{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }}
{%- endif %}
{%- endif %}

{%- if options is not none %}
options (
{%- for option in options -%}
Expand Down Expand Up @@ -181,7 +192,7 @@
{% endmacro %}

{% macro spark__alter_column_comment(relation, column_dict) %}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,7 +26,7 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format != 'delta' %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
8 changes: 4 additions & 4 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@
identifier=target_table,
type='table') -%}

{%- if file_format != 'delta' -%}
{%- if file_format not in ['delta', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta'
Snapshot functionality requires file_format be set to 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{%- if target_relation_exists -%}
{%- if not target_relation.is_delta -%}
{%- if not target_relation.is_delta or not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta'
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
volumes:
- ./.spark-warehouse/:/spark-warehouse/
- ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml
- ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
environment:
- WAIT_FOR=dbt-hive-metastore:5432

Expand Down
4 changes: 4 additions & 0 deletions docker/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@
<value>dbt</value>
</property>

<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
7 changes: 7 additions & 0 deletions docker/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spark.hadoop.datanucleus.autoCreateTables true
spark.hadoop.datanucleus.schema.autoCreateTables true
spark.hadoop.datanucleus.fixedDatastore false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.userClassPathFirst true
19 changes: 19 additions & 0 deletions test/custom/incremental_strategies/models_hudi/append.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
19 changes: 19 additions & 0 deletions test/custom/incremental_strategies/models_hudi/merge_no_key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
24 changes: 24 additions & 0 deletions test/custom/incremental_strategies/test_incremental_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def project_config(self):
},
}

def seed_and_run_once(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])

def seed_and_run_twice(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
Expand Down Expand Up @@ -78,6 +82,26 @@ def test_delta_strategies_databricks_cluster(self):
self.run_and_test()


class TestHudiStrategies(TestIncrementalStrategies):
@property
def models(self):
return "models_hudi"

def run_and_test(self):
self.seed_and_run_once()
self.assertTablesEqual("append", "expected_append")
self.assertTablesEqual("merge_no_key", "expected_append")
self.assertTablesEqual("merge_unique_key", "expected_upsert")
self.assertTablesEqual(
"insert_overwrite_no_partitions", "expected_overwrite")
self.assertTablesEqual(
"insert_overwrite_partitions", "expected_upsert")

@use_profile("apache_spark")
def test_hudi_strategies_apache_spark(self):
self.run_and_test()


class TestBadStrategies(TestIncrementalStrategies):
@property
def models(self):
Expand Down
24 changes: 24 additions & 0 deletions test/custom/persist_docs/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ models:
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}
- name: table_hudi_model
description: |
Table model description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
columns:
- name: id
description: |
id Column description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
- name: name
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}
- name: view_model
description: |
Expand Down
2 changes: 2 additions & 0 deletions test/custom/persist_docs/models/table_hudi_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(materialized='table', file_format='hudi') }}
select 1 as id, 'Vino' as name
34 changes: 34 additions & 0 deletions test/integration/spark-thrift-hudi.dbtspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
target:
type: spark
host: localhost
user: dbt
method: thrift
port: 10000
connect_retries: 5
connect_timeout: 60
schema: "analytics_{{ var('_dbt_random_suffix') }}"
projects:
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_hudi
# we're going to UPDATE the seed tables as part of testing, so we must make them hudi format
seeds:
dbt_test_project:
file_format: hudi
snapshots:
dbt_test_project:
file_format: hudi
- overrides: snapshot_strategy_timestamp
dbt_project_yml: *file_format_delta
sequences:
test_dbt_empty: empty
test_dbt_base: base
test_dbt_ephemeral: ephemeral
test_dbt_incremental: incremental
# snapshots require hudi format
test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
test_dbt_data_test: data_test
test_dbt_schema_test: schema_test
# the local cluster currently tests on spark 2.x, which does not support this
# if we upgrade it to 3.x, we can enable this test
# test_dbt_ephemeral_data_tests: data_test_ephemeral_models
Loading

0 comments on commit 24e9fc6

Please sign in to comment.