Skip to content

Commit b9f9191

Browse files
authored
Merge pull request #328 from benjamin-awd/add-force-on-cluster
Add force on cluster config option
2 parents d2a63b0 + 81def6c commit b9f9191

File tree

4 files changed

+124
-7
lines changed

4 files changed

+124
-7
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,23 @@ if `cluster` is set in profile, `on_cluster_clause` now will return cluster info
137137
- Distributed materializations
138138
- Models with Replicated engines
139139

140+
141+
By default, tables and incremental materializations with non-replicated engines will not be affected by the `cluster` setting (model would be created on the connected node only).
142+
143+
To force relations to be created on a cluster regardless of their engine or materialization, use the `force_on_cluster` argument:
144+
```sql
145+
{{ config(
146+
engine='Null',
147+
materialized='materialized_view',
148+
force_on_cluster='true'
149+
)
150+
}}
151+
```
152+
140153
table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would
141154
be created on the connected node only).
142155

156+
143157
### Compatibility
144158

145159
If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML

dbt/adapters/clickhouse/impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
@dataclass
5252
class ClickHouseConfig(AdapterConfig):
5353
engine: str = 'MergeTree()'
54+
force_on_cluster: Optional[bool] = False
5455
order_by: Optional[Union[List[str], str]] = 'tuple()'
5556
partition_by: Optional[Union[List[str], str]] = None
5657
sharding_key: Optional[Union[List[str], str]] = 'rand()'

dbt/adapters/clickhouse/relation.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,20 +112,20 @@ def create_from(
112112
# If the database is set, and the source schema is "defaulted" to the source.name, override the
113113
# schema with the database instead, since that's presumably what's intended for clickhouse
114114
schema = relation_config.schema
115+
116+
cluster = quoting.credentials.cluster or ''
115117
can_on_cluster = None
116118
# We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages
117119
if relation_config.resource_type == NODE_TYPE_SOURCE:
118120
if schema == relation_config.source_name and relation_config.database:
119121
schema = relation_config.database
120122

123+
if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true":
124+
can_on_cluster = True
125+
121126
else:
122-
cluster = quoting.credentials.cluster if quoting.credentials.cluster else ''
123-
materialized = (
124-
relation_config.config.materialized if relation_config.config.materialized else ''
125-
)
126-
engine = (
127-
relation_config.config.get('engine') if relation_config.config.get('engine') else ''
128-
)
127+
materialized = relation_config.config.get('materialized') or ''
128+
engine = relation_config.config.get('engine') or ''
129129
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)
130130

131131
return cls.create(

tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,105 @@ def test_base(self, project):
249249
assert len(results) == 1
250250

251251
self.assert_total_count_correct(project)
252+
253+
254+
class TestMergeTreeForceClusterMaterialization(BaseSimpleMaterializations):
255+
'''Test MergeTree materialized view is created across a cluster using the
256+
`force_on_cluster` config argument
257+
'''
258+
259+
@pytest.fixture(scope="class")
260+
def models(self):
261+
config_force_on_cluster = """
262+
{{ config(
263+
engine='MergeTree',
264+
materialized='materialized_view',
265+
force_on_cluster='true'
266+
)
267+
}}
268+
"""
269+
270+
return {
271+
"force_on_cluster.sql": config_force_on_cluster + model_base,
272+
"schema.yml": schema_base_yml,
273+
}
274+
275+
@pytest.fixture(scope="class")
276+
def seeds(self):
277+
return {
278+
"schema.yml": base_seeds_schema_yml,
279+
"base.csv": seeds_base_csv,
280+
}
281+
282+
def assert_total_count_correct(self, project):
283+
'''Check if table is created on cluster'''
284+
cluster = project.test_config['cluster']
285+
286+
# check if data is properly distributed/replicated
287+
table_relation = relation_from_name(project.adapter, "force_on_cluster")
288+
# ClickHouse cluster in the docker-compose file
289+
# under tests/integration is configured with 3 nodes
290+
host_count = project.run_sql(
291+
f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'",
292+
fetch="one",
293+
)
294+
assert host_count[0] > 1
295+
296+
table_count = project.run_sql(
297+
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
298+
f"where database='{table_relation.schema}' and name='{table_relation.identifier}'",
299+
fetch="one",
300+
)
301+
302+
assert table_count[0] == 3
303+
304+
mv_count = project.run_sql(
305+
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
306+
f"where database='{table_relation.schema}' and name='{table_relation.identifier}_mv'",
307+
fetch="one",
308+
)
309+
310+
assert mv_count[0] == 3
311+
312+
@pytest.mark.skipif(
313+
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
314+
)
315+
def test_base(self, project):
316+
# cluster setting must exist
317+
cluster = project.test_config['cluster']
318+
assert cluster
319+
320+
# seed command
321+
results = run_dbt(["seed"])
322+
# seed result length
323+
assert len(results) == 1
324+
325+
# run command
326+
results = run_dbt()
327+
# run result length
328+
assert len(results) == 1
329+
330+
# names exist in result nodes
331+
check_result_nodes_by_name(results, ["force_on_cluster"])
332+
333+
# check relation types
334+
expected = {
335+
"base": "table",
336+
"replicated": "table",
337+
}
338+
check_relation_types(project.adapter, expected)
339+
340+
relation = relation_from_name(project.adapter, "base")
341+
# table rowcount
342+
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
343+
assert result[0] == 10
344+
345+
# relations_equal
346+
self.assert_total_count_correct(project)
347+
348+
# run full refresh
349+
results = run_dbt(['--debug', 'run', '--full-refresh'])
350+
# run result length
351+
assert len(results) == 1
352+
353+
self.assert_total_count_correct(project)

0 commit comments

Comments
 (0)