diff --git a/.circleci/config.yml b/.circleci/config.yml
index 99154fb64..d77f70bc7 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -25,6 +25,15 @@ jobs:
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
+ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
+ --conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0
+ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+ --conf spark.driver.userClassPathFirst=true
+ --conf spark.hadoop.datanucleus.autoCreateTables=true
+ --conf spark.hadoop.datanucleus.schema.autoCreateTables=true
+ --conf spark.hadoop.datanucleus.fixedDatastore=false
+ --conf spark.sql.hive.convertMetastoreParquet=false
+ --conf hive.metastore.schema.verification=false
- image: postgres:9.6.17-alpine
environment:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7405e9e0e..335994cfe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,10 +6,12 @@
### Under the hood
- Add `unique_field` to better understand adapter adoption in anonymous usage tracking ([#211](https://github.com/dbt-labs/dbt-spark/pull/211))
+- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies: Issue ([#187](https://github.com/dbt-labs/dbt-spark/issues/187))
### Contributors
- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204))
+- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210))
## dbt-spark 0.21.0b2 (August 20, 2021)
diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py
index 6acbcd446..1025d9254 100644
--- a/dbt/adapters/spark/impl.py
+++ b/dbt/adapters/spark/impl.py
@@ -68,6 +68,13 @@ class SparkAdapter(SQLAdapter):
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
+ HUDI_METADATA_COLUMNS = [
+ '_hoodie_commit_time',
+ '_hoodie_commit_seqno',
+ '_hoodie_record_key',
+ '_hoodie_partition_path',
+ '_hoodie_file_name'
+ ]
Relation = SparkRelation
Column = SparkColumn
@@ -143,12 +150,14 @@ def list_relations_without_caching(
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
+ is_hudi = 'Provider: hudi' in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
+ is_hudi=is_hudi,
)
relations.append(relation)
@@ -222,6 +231,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)
+
+ # strip hudi metadata columns.
+ columns = [x for x in columns
+ if x.name not in self.HUDI_METADATA_COLUMNS]
return columns
def parse_columns_from_information(
diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py
index 5fc096550..043cabfa0 100644
--- a/dbt/adapters/spark/relation.py
+++ b/dbt/adapters/spark/relation.py
@@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
+ is_hudi: Optional[bool] = None
information: str = None
def __post_init__(self):
diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql
index fcdc46c6d..fe13c5159 100644
--- a/dbt/include/spark/macros/adapters.sql
+++ b/dbt/include/spark/macros/adapters.sql
@@ -15,6 +15,17 @@
{% macro options_clause() -%}
{%- set options = config.get('options') -%}
+ {%- if config.get('file_format') == 'hudi' -%}
+ {%- set unique_key = config.get('unique_key') -%}
+ {%- if unique_key is not none and options is none -%}
+ {%- set options = {'primaryKey': config.get('unique_key')} -%}
+ {%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%}
+ {%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%}
+ {%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%}
+ {{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }}
+ {%- endif %}
+ {%- endif %}
+
{%- if options is not none %}
options (
{%- for option in options -%}
@@ -181,7 +192,7 @@
{% endmacro %}
{% macro spark__alter_column_comment(relation, column_dict) %}
- {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
+ {% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql
index 400a2eee5..3e9de359b 100644
--- a/dbt/include/spark/macros/materializations/incremental/validate.sql
+++ b/dbt/include/spark/macros/materializations/incremental/validate.sql
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}
- {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
+ {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}
{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
@@ -26,7 +26,7 @@
{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
- You can only choose this strategy when file_format is set to 'delta'
+ You can only choose this strategy when file_format is set to 'delta' or 'hudi'
{%- endset %}
{% set invalid_insert_overwrite_delta_msg -%}
@@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
- {% if raw_strategy == 'merge' and file_format != 'delta' %}
+ {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql
index 6dad51a02..c398b045e 100644
--- a/dbt/include/spark/macros/materializations/snapshot.sql
+++ b/dbt/include/spark/macros/materializations/snapshot.sql
@@ -82,18 +82,18 @@
identifier=target_table,
type='table') -%}
- {%- if file_format != 'delta' -%}
+ {%- if file_format not in ['delta', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
- Snapshot functionality requires file_format be set to 'delta'
+ Snapshot functionality requires file_format be set to 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
{%- if target_relation_exists -%}
- {%- if not target_relation.is_delta -%}
+ {%- if not target_relation.is_delta or not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
- The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta'
+ The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
diff --git a/docker-compose.yml b/docker-compose.yml
index 869e4ecd2..8054dfd75 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -14,6 +14,7 @@ services:
volumes:
- ./.spark-warehouse/:/spark-warehouse/
- ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml
+ - ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
environment:
- WAIT_FOR=dbt-hive-metastore:5432
diff --git a/docker/hive-site.xml b/docker/hive-site.xml
index a92e87b76..457d04f31 100644
--- a/docker/hive-site.xml
+++ b/docker/hive-site.xml
@@ -39,4 +39,8 @@
dbt
+
+ hive.metastore.schema.verification
+ false
+
diff --git a/docker/spark-defaults.conf b/docker/spark-defaults.conf
new file mode 100644
index 000000000..48a0501c2
--- /dev/null
+++ b/docker/spark-defaults.conf
@@ -0,0 +1,7 @@
+spark.hadoop.datanucleus.autoCreateTables true
+spark.hadoop.datanucleus.schema.autoCreateTables true
+spark.hadoop.datanucleus.fixedDatastore false
+spark.serializer org.apache.spark.serializer.KryoSerializer
+spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0
+spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+spark.driver.userClassPathFirst true
diff --git a/test/custom/incremental_strategies/models_hudi/append.sql b/test/custom/incremental_strategies/models_hudi/append.sql
new file mode 100644
index 000000000..9be27bec3
--- /dev/null
+++ b/test/custom/incremental_strategies/models_hudi/append.sql
@@ -0,0 +1,19 @@
+{{ config(
+ materialized = 'incremental',
+ incremental_strategy = 'append',
+ file_format = 'hudi',
+) }}
+
+{% if not is_incremental() %}
+
+select cast(1 as bigint) as id, 'hello' as msg
+union all
+select cast(2 as bigint) as id, 'goodbye' as msg
+
+{% else %}
+
+select cast(2 as bigint) as id, 'yo' as msg
+union all
+select cast(3 as bigint) as id, 'anyway' as msg
+
+{% endif %}
diff --git a/test/custom/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql b/test/custom/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql
new file mode 100644
index 000000000..081374089
--- /dev/null
+++ b/test/custom/incremental_strategies/models_hudi/insert_overwrite_no_partitions.sql
@@ -0,0 +1,19 @@
+{{ config(
+ materialized = 'incremental',
+ incremental_strategy = 'insert_overwrite',
+ file_format = 'hudi',
+) }}
+
+{% if not is_incremental() %}
+
+select cast(1 as bigint) as id, 'hello' as msg
+union all
+select cast(2 as bigint) as id, 'goodbye' as msg
+
+{% else %}
+
+select cast(2 as bigint) as id, 'yo' as msg
+union all
+select cast(3 as bigint) as id, 'anyway' as msg
+
+{% endif %}
diff --git a/test/custom/incremental_strategies/models_hudi/insert_overwrite_partitions.sql b/test/custom/incremental_strategies/models_hudi/insert_overwrite_partitions.sql
new file mode 100644
index 000000000..0f74cfdb3
--- /dev/null
+++ b/test/custom/incremental_strategies/models_hudi/insert_overwrite_partitions.sql
@@ -0,0 +1,20 @@
+{{ config(
+ materialized = 'incremental',
+ incremental_strategy = 'insert_overwrite',
+ partition_by = 'id',
+ file_format = 'hudi',
+) }}
+
+{% if not is_incremental() %}
+
+select cast(1 as bigint) as id, 'hello' as msg
+union all
+select cast(2 as bigint) as id, 'goodbye' as msg
+
+{% else %}
+
+select cast(2 as bigint) as id, 'yo' as msg
+union all
+select cast(3 as bigint) as id, 'anyway' as msg
+
+{% endif %}
diff --git a/test/custom/incremental_strategies/models_hudi/merge_no_key.sql b/test/custom/incremental_strategies/models_hudi/merge_no_key.sql
new file mode 100644
index 000000000..8def11ddf
--- /dev/null
+++ b/test/custom/incremental_strategies/models_hudi/merge_no_key.sql
@@ -0,0 +1,19 @@
+{{ config(
+ materialized = 'incremental',
+ incremental_strategy = 'merge',
+ file_format = 'hudi',
+) }}
+
+{% if not is_incremental() %}
+
+select cast(1 as bigint) as id, 'hello' as msg
+union all
+select cast(2 as bigint) as id, 'goodbye' as msg
+
+{% else %}
+
+select cast(2 as bigint) as id, 'yo' as msg
+union all
+select cast(3 as bigint) as id, 'anyway' as msg
+
+{% endif %}
diff --git a/test/custom/incremental_strategies/models_hudi/merge_unique_key.sql b/test/custom/incremental_strategies/models_hudi/merge_unique_key.sql
new file mode 100644
index 000000000..ee72860d2
--- /dev/null
+++ b/test/custom/incremental_strategies/models_hudi/merge_unique_key.sql
@@ -0,0 +1,20 @@
+{{ config(
+ materialized = 'incremental',
+ incremental_strategy = 'merge',
+ file_format = 'hudi',
+ unique_key = 'id',
+) }}
+
+{% if not is_incremental() %}
+
+select cast(1 as bigint) as id, 'hello' as msg
+union all
+select cast(2 as bigint) as id, 'goodbye' as msg
+
+{% else %}
+
+select cast(2 as bigint) as id, 'yo' as msg
+union all
+select cast(3 as bigint) as id, 'anyway' as msg
+
+{% endif %}
diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py
index 64966ece5..75b7ac45f 100644
--- a/test/custom/incremental_strategies/test_incremental_strategies.py
+++ b/test/custom/incremental_strategies/test_incremental_strategies.py
@@ -16,6 +16,10 @@ def project_config(self):
},
}
+ def seed_and_run_once(self):
+ self.run_dbt(["seed"])
+ self.run_dbt(["run"])
+
def seed_and_run_twice(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
@@ -78,6 +82,26 @@ def test_delta_strategies_databricks_cluster(self):
self.run_and_test()
+class TestHudiStrategies(TestIncrementalStrategies):
+ @property
+ def models(self):
+ return "models_hudi"
+
+ def run_and_test(self):
+ self.seed_and_run_once()
+ self.assertTablesEqual("append", "expected_append")
+ self.assertTablesEqual("merge_no_key", "expected_append")
+ self.assertTablesEqual("merge_unique_key", "expected_upsert")
+ self.assertTablesEqual(
+ "insert_overwrite_no_partitions", "expected_overwrite")
+ self.assertTablesEqual(
+ "insert_overwrite_partitions", "expected_upsert")
+
+ @use_profile("apache_spark")
+ def test_hudi_strategies_apache_spark(self):
+ self.run_and_test()
+
+
class TestBadStrategies(TestIncrementalStrategies):
@property
def models(self):
diff --git a/test/custom/persist_docs/models/schema.yml b/test/custom/persist_docs/models/schema.yml
index 78dcda799..35f478266 100644
--- a/test/custom/persist_docs/models/schema.yml
+++ b/test/custom/persist_docs/models/schema.yml
@@ -49,6 +49,30 @@ models:
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}
+
+ - name: table_hudi_model
+ description: |
+ Table model description "with double quotes"
+ and with 'single quotes' as welll as other;
+ '''abc123'''
+ reserved -- characters
+ --
+ /* comment */
+ Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
+ columns:
+ - name: id
+ description: |
+ id Column description "with double quotes"
+ and with 'single quotes' as welll as other;
+ '''abc123'''
+ reserved -- characters
+ --
+ /* comment */
+ Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
+ - name: name
+ description: |
+ Some stuff here and then a call to
+ {{ doc('my_fun_doc')}}
- name: view_model
description: |
diff --git a/test/custom/persist_docs/models/table_hudi_model.sql b/test/custom/persist_docs/models/table_hudi_model.sql
new file mode 100644
index 000000000..60b95c684
--- /dev/null
+++ b/test/custom/persist_docs/models/table_hudi_model.sql
@@ -0,0 +1,2 @@
+{{ config(materialized='table', file_format='hudi') }}
+select 1 as id, 'Vino' as name
diff --git a/test/integration/spark-thrift-hudi.dbtspec b/test/integration/spark-thrift-hudi.dbtspec
new file mode 100644
index 000000000..21546c97f
--- /dev/null
+++ b/test/integration/spark-thrift-hudi.dbtspec
@@ -0,0 +1,34 @@
+target:
+ type: spark
+ host: localhost
+ user: dbt
+ method: thrift
+ port: 10000
+ connect_retries: 5
+ connect_timeout: 60
+ schema: "analytics_{{ var('_dbt_random_suffix') }}"
+projects:
+ - overrides: snapshot_strategy_check_cols
+ dbt_project_yml: &file_format_hudi
+ # we're going to UPDATE the seed tables as part of testing, so we must make them hudi format
+ seeds:
+ dbt_test_project:
+ file_format: hudi
+ snapshots:
+ dbt_test_project:
+ file_format: hudi
+ - overrides: snapshot_strategy_timestamp
+ dbt_project_yml: *file_format_delta
+sequences:
+ test_dbt_empty: empty
+ test_dbt_base: base
+ test_dbt_ephemeral: ephemeral
+ test_dbt_incremental: incremental
+ # snapshots require hudi format
+ test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
+ test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
+ test_dbt_data_test: data_test
+ test_dbt_schema_test: schema_test
+ # the local cluster currently tests on spark 2.x, which does not support this
+ # if we upgrade it to 3.x, we can enable this test
+ # test_dbt_ephemeral_data_tests: data_test_ephemeral_models
diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py
index 151631e08..06ce202a7 100644
--- a/test/unit/test_macros.py
+++ b/test/unit/test_macros.py
@@ -43,6 +43,10 @@ def test_macros_create_table_as_file_format(self):
sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip()
self.assertEqual(sql, "create or replace table my_table using delta as select 1")
+ self.config['file_format'] = 'hudi'
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip()
+ self.assertEqual(sql, "create table my_table using hudi as select 1")
+
def test_macros_create_table_as_options(self):
template = self.__get_template('adapters.sql')
@@ -51,6 +55,30 @@ def test_macros_create_table_as_options(self):
sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip()
self.assertEqual(sql, 'create or replace table my_table using delta options (compression "gzip" ) as select 1')
+ self.config['file_format'] = 'hudi'
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip()
+ self.assertEqual(sql, 'create table my_table using hudi options (compression "gzip" ) as select 1')
+
+ def test_macros_create_table_as_hudi_options(self):
+ template = self.__get_template('adapters.sql')
+
+ self.config['file_format'] = 'hudi'
+ self.config['unique_key'] = 'id'
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id').strip()
+ self.assertEqual(sql, 'create table my_table using hudi options (primaryKey "id" ) as select 1 as id')
+
+ self.config['file_format'] = 'hudi'
+ self.config['unique_key'] = 'id'
+ self.config['options'] = {'primaryKey': 'id'}
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id').strip()
+ self.assertEqual(sql, 'create table my_table using hudi options (primaryKey "id" ) as select 1 as id')
+
+ self.config['file_format'] = 'hudi'
+ self.config['unique_key'] = 'uuid'
+ self.config['options'] = {'primaryKey': 'id'}
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1 as id')
+ self.assertIn('mock.raise_compiler_error()', sql)
+
def test_macros_create_table_as_partition(self):
template = self.__get_template('adapters.sql')
@@ -113,3 +141,10 @@ def test_macros_create_table_as_all(self):
sql,
"create or replace table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1"
)
+
+ self.config['file_format'] = 'hudi'
+ sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip()
+ self.assertEqual(
+ sql,
+ "create table my_table using hudi partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1"
+ )