Skip to content

Commit bf11cbe

Browse files
authored
Bug fixes related to model settings. (#214)
1 parent 3fec9a4 commit bf11cbe

File tree

12 files changed

+133
-38
lines changed

12 files changed

+133
-38
lines changed

CHANGELOG.md

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,26 @@
11
### Release [1.5.2], 2023-11-28
2-
#### Bug Fix
2+
#### Bug Fixes
33
- The `ON CLUSTER` clause was in the incorrect place for legacy incremental materializations. This has been fixed. Thanks to
44
[Steven Reitsma](https://github.com/StevenReitsma) for the fix!
55
- The `ON CLUSTER` DDL for drop tables did not include a SYNC modifier, which might be the cause of some "table already exists"
6-
errors
6+
errors. The `SYNC` modifier has been added to the `on_cluster` macro when dropping relations.
7+
- Fixed a bug where using table settings such as `allow_nullable_key` would break "legacy" incremental materializations. Closes
8+
https://github.com/ClickHouse/dbt-clickhouse/issues/209. Also see the new model `config` property `insert_settings` described
9+
below.
10+
- Fixed an issue where incremental materializations would incorrectly exclude duplicated inserted elements due to "automatic"
11+
ClickHouse deduplication on replicated tables. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/213. The fix consists
12+
of always sending a `replicated_deduplication_window=0` table setting when creating the incremental relations. This
13+
behavior can be overridden by setting the new profile parameter `allow_automatic_deduplication` to `True`, although for
14+
general dbt operations this is probably not necessary and not recommended. Finally thanks to Andy(https://github.com/andy-miracl)
15+
for the report and debugging help!
16+
17+
#### Improvements
18+
- Added a new profile property `allow_automatic_deduplication`, which defaults to `False`. ClickHouse Replicated deduplication is
19+
now disable for incremental inserts, but this property can be set to true if for some reason the default ClickHouse behavior
20+
for inserted blocks is desired.
21+
- Added a new model `config` property `query_settings` for any ClickHouse settings that should be sent with the `INSERT INTO`
22+
or `DELETE_FROM` queries used with materializations. Note this is distinct from the existing property `settings` which is
23+
used for ClickHouse "table" settings in DDL statements like `CREATE TABLE ... AS`.
724

825
### Release [1.5.1], 2023-11-27
926
#### Bug Fix

README.md

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ your_profile_name:
7777
use_lw_deletes: [False] Use the strategy `delete+insert` as the default incremental strategy.
7878
check_exchange: [True] # Validate that clickhouse support the atomic EXCHANGE TABLES command. (Not needed for most ClickHouse versions)
7979
local_suffix [_local] # Table suffix of local tables on shards for distributed materializations.
80+
allow_automatic_deduplication [False] # Enable ClickHouse automatic deduplication for Replicated tables
8081
custom_settings: [{}] # A dictionary/mapping of custom ClickHouse settings for the connection - default is empty.
8182
8283
# Native (clickhouse-driver) connection settings
@@ -87,17 +88,27 @@ your_profile_name:
8788

8889
## Model Configuration
8990

90-
| Option | Description | Required? |
91-
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
92-
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
93-
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
94-
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |
95-
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | Optional (default: `rand()`) |
96-
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key |
97-
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | Optional |
98-
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | Optional |
99-
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | Optional (default: `default`) |
100-
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy |
91+
| Option | Description | Default if any |
92+
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
93+
| engine | The table engine (type of table) to use when creating tables | `MergeTree()` |
94+
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` |
95+
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | |
96+
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) |
97+
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
98+
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
99+
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | |
100+
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | `default` |
101+
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
102+
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
103+
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |
104+
105+
## A Note on Model Settings
106+
ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are configurable. `settings` means the `SETTINGS`
107+
clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the specific ClickHouse table engine. The new
108+
`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization (including incremental materializations).
109+
There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" setting (although the latter are generally
110+
available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties should be carefully researched and tested.
111+
101112
## ClickHouse Cluster
102113

103114
`cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster.

dbt/adapters/clickhouse/credentials.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class ClickHouseCredentials(Credentials):
3333
custom_settings: Optional[Dict[str, Any]] = None
3434
use_lw_deletes: bool = False
3535
local_suffix: str = 'local'
36+
allow_automatic_deduplication = False
3637

3738
@property
3839
def type(self):
@@ -73,4 +74,5 @@ def _connection_keys(self):
7374
'check_exchange',
7475
'custom_settings',
7576
'use_lw_deletes',
77+
'allow_automatic_deduplication',
7678
)

dbt/adapters/clickhouse/dbclient.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import uuid
22
from abc import ABC, abstractmethod
3+
from typing import Dict
34

45
from dbt.exceptions import DbtDatabaseError, FailedToConnectError
56

@@ -8,6 +9,7 @@
89

910
LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
1011
ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
12+
DEDUP_WINDOW_SETTING = 'replicated_deduplication_window'
1113

1214

1315
def get_db_client(credentials: ClickHouseCredentials):
@@ -79,6 +81,9 @@ def __init__(self, credentials: ClickHouseCredentials):
7981
except Exception as ex:
8082
self.close()
8183
raise ex
84+
self._model_settings = {}
85+
if not credentials.allow_automatic_deduplication:
86+
self._model_settings[DEDUP_WINDOW_SETTING] = '0'
8287

8388
@abstractmethod
8489
def query(self, sql: str, **kwargs):
@@ -115,6 +120,11 @@ def _set_client_database(self):
115120
def _server_version(self):
116121
pass
117122

123+
def update_model_settings(self, model_settings: Dict[str, str]):
124+
for key, value in self._model_settings.items():
125+
if key not in model_settings:
126+
model_settings[key] = value
127+
118128
def _check_lightweight_deletes(self, requested: bool):
119129
lw_deletes = self.get_ch_setting(LW_DELETE_SETTING)
120130
nd_mutations = self.get_ch_setting(ND_MUTATION_SETTING)

dbt/adapters/clickhouse/impl.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,17 @@ def run_sql_for_tests(self, sql, fetch, conn):
367367

368368
@available
369369
def get_model_settings(self, model):
370-
settings = model['config'].get('settings', dict())
370+
settings = model['config'].get('settings', {})
371+
conn = self.connections.get_if_exists()
372+
conn.handle.update_model_settings(settings)
373+
res = []
374+
for key in settings:
375+
res.append(f' {key}={settings[key]}')
376+
return '' if len(res) == 0 else 'SETTINGS ' + ', '.join(res) + '\n'
377+
378+
@available
379+
def get_model_query_settings(self, model):
380+
settings = model['config'].get('query_settings', {})
371381
res = []
372382
for key in settings:
373383
res.append(f' {key}={settings[key]}')

dbt/include/clickhouse/macros/materializations/incremental/incremental.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,15 @@
178178
select {{ unique_key }}
179179
from {{ inserting_relation }}
180180
)
181-
{{ adapter.get_model_settings(model) }}
181+
{{ adapter.get_model_query_settings(model) }}
182182
{% endcall %}
183183

184184
-- Insert all of the new data into the temporary table
185185
{% call statement('insert_new_data') %}
186186
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
187187
select {{ dest_cols_csv }}
188188
from {{ inserting_relation }}
189-
{{ adapter.get_model_settings(model) }}
189+
{{ adapter.get_model_query_settings(model) }}
190190
{% endcall %}
191191

192192
{% do adapter.drop_relation(new_data_relation) %}
@@ -228,13 +228,14 @@
228228
{% for predicate in incremental_predicates %}
229229
and {{ predicate }}
230230
{% endfor %}
231-
{%- endif -%};
231+
{%- endif -%}
232+
{{ adapter.get_model_query_settings(model) }}
232233
{% endcall %}
233234

234235
{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
235236
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
236237
{% call statement('insert_new_data') %}
237-
insert into {{ existing_relation }} select {{ dest_cols_csv }} from {{ inserting_relation }}
238+
insert into {{ existing_relation }} {{ adapter.get_model_query_settings(model) }} select {{ dest_cols_csv }} from {{ inserting_relation }}
238239
{% endcall %}
239240
{% do adapter.drop_relation(new_data_relation) %}
240241
{{ drop_relation_if_exists(distributed_new_data_relation) }}

dbt/include/clickhouse/macros/materializations/seed.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
{% set sql -%}
66
insert into {{ this.render() }} ({{ cols_sql }})
7-
{{ adapter.get_model_settings(model) }}
7+
{{ adapter.get_model_query_settings(model) }}
88
format CSV
99
{{ data_sql }}
1010
{%- endset %}

dbt/include/clickhouse/macros/materializations/table.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,13 @@
188188
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
189189
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
190190

191-
insert into {{ target_relation }} ({{ dest_cols_csv }})
191+
insert into {{ target_relation }}
192+
({{ dest_cols_csv }})
192193
{%- if has_contract -%}
193194
-- Use a subquery to get columns in the right order
194195
SELECT {{ dest_cols_csv }} FROM ( {{ sql }} )
195196
{%- else -%}
196197
{{ sql }}
198+
{{ adapter.get_model_query_settings(model) }}
197199
{%- endif -%}
198200
{%- endmacro %}

tests/integration/adapter/basic/test_basic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
column_types:
3434
val2: Nullable(UInt32)
3535
str1: Nullable(String)
36+
settings:
37+
allow_nullable_key: 1
3638
"""
3739

3840
replicated_seeds_schema_yml = """

tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@ class TestMergeTreeTableMaterialization(BaseSimpleMaterializations):
1818
@pytest.fixture(scope="class")
1919
def models(self):
2020
config_materialized_table = """
21-
{{ config(order_by='(some_date, id, name)', engine='MergeTree()', materialized='table',
22-
settings={'allow_nullable_key': 1}) }}
21+
{{ config(
22+
order_by='(some_date, id, name)',
23+
engine='MergeTree()',
24+
materialized='table',
25+
settings={'allow_nullable_key': 1},
26+
query_settings={'allow_nondeterministic_mutations': 1})
27+
}}
2328
"""
2429
base_table_sql = config_materialized_table + model_base
2530
return {
@@ -204,7 +209,7 @@ def assert_total_count_correct(self, project):
204209
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
205210
)
206211
def test_base(self, project):
207-
# cluster setting must exists
212+
# cluster setting must exist
208213
cluster = project.test_config['cluster']
209214
assert cluster
210215

tests/integration/adapter/constraints/test_constraints.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def test__contract_wrong_column_names(self, project):
6060
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])
6161

6262
def test__contract_wrong_column_data_types(self, project, data_types):
63-
for (sql_column_value, schema_data_type, error_data_type) in data_types:
63+
for sql_column_value, schema_data_type, error_data_type in data_types:
6464
# Write parametrized data_type to sql file
6565
write_file(
6666
my_model_data_type_sql.format(sql_value=sql_column_value),
@@ -91,7 +91,7 @@ def test__contract_wrong_column_data_types(self, project, data_types):
9191
assert all([(exp in log_output or exp.upper() in log_output) for exp in expected])
9292

9393
def test__contract_correct_column_data_types(self, project, data_types):
94-
for (sql_column_value, schema_data_type, _) in data_types:
94+
for sql_column_value, schema_data_type, _ in data_types:
9595
# Write parametrized data_type to sql file
9696
write_file(
9797
my_model_data_type_sql.format(sql_value=sql_column_value),

0 commit comments

Comments
 (0)