|
1 | 1 | import csv
|
2 | 2 | import io
|
3 |
| -from concurrent.futures import Future |
4 | 3 | from dataclasses import dataclass
|
5 |
| -from typing import Any, Callable, Dict, List, Optional, Set, Union |
| 4 | +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union |
6 | 5 |
|
7 | 6 | import agate
|
8 | 7 | from dbt.adapters.base import AdapterConfig, available
|
9 |
| -from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport, catch_as_completed |
| 8 | +from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport |
10 | 9 | from dbt.adapters.base.relation import BaseRelation, InformationSchema
|
| 10 | +from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support |
11 | 11 | from dbt.adapters.sql import SQLAdapter
|
12 | 12 | from dbt.contracts.graph.manifest import Manifest
|
13 | 13 | from dbt.contracts.graph.nodes import ConstraintType, ModelLevelConstraint
|
14 |
| -from dbt.contracts.relation import RelationType |
| 14 | +from dbt.contracts.relation import Path, RelationType |
15 | 15 | from dbt.events.functions import warn_or_error
|
16 | 16 | from dbt.events.types import ConstraintNotSupported
|
17 | 17 | from dbt.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
|
18 |
| -from dbt.utils import executor, filter_null_values |
| 18 | +from dbt.utils import filter_null_values |
19 | 19 |
|
| 20 | +import dbt |
20 | 21 | from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
|
21 | 22 | from dbt.adapters.clickhouse.column import ClickHouseColumn
|
22 | 23 | from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
|
@@ -56,6 +57,13 @@ class ClickHouseAdapter(SQLAdapter):
|
56 | 57 | ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
|
57 | 58 | }
|
58 | 59 |
|
| 60 | + _capabilities: CapabilityDict = CapabilityDict( |
| 61 | + { |
| 62 | + Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Unsupported), |
| 63 | + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Unsupported), |
| 64 | + } |
| 65 | + ) |
| 66 | + |
59 | 67 | def __init__(self, config):
|
60 | 68 | BaseAdapter.__init__(self, config)
|
61 | 69 | self.cache = ClickHouseRelationsCache()
|
@@ -295,37 +303,29 @@ def get_ch_database(self, schema: str):
|
295 | 303 | except DbtRuntimeError:
|
296 | 304 | return None
|
297 | 305 |
|
298 |
| - def get_catalog(self, manifest): |
299 |
| - schema_map = self._get_catalog_schemas(manifest) |
300 |
| - |
301 |
| - with executor(self.config) as tpe: |
302 |
| - futures: List[Future[agate.Table]] = [] |
303 |
| - for info, schemas in schema_map.items(): |
304 |
| - for schema in schemas: |
305 |
| - futures.append( |
306 |
| - tpe.submit_connected( |
307 |
| - self, |
308 |
| - schema, |
309 |
| - self._get_one_catalog, |
310 |
| - info, |
311 |
| - [schema], |
312 |
| - manifest, |
313 |
| - ) |
314 |
| - ) |
315 |
| - catalogs, exceptions = catch_as_completed(futures) |
316 |
| - return catalogs, exceptions |
317 |
| - |
318 |
| - def _get_one_catalog( |
319 |
| - self, |
320 |
| - information_schema: InformationSchema, |
321 |
| - schemas: Set[str], |
322 |
| - manifest: Manifest, |
323 |
| - ) -> agate.Table: |
324 |
| - if len(schemas) != 1: |
325 |
| - raise DbtRuntimeError( |
326 |
| - f"Expected only one schema in clickhouse _get_one_catalog, found ' f'{schemas}'" |
327 |
| - ) |
328 |
| - return super()._get_one_catalog(information_schema, schemas, manifest) |
| 306 | + def get_catalog(self, manifest) -> Tuple[agate.Table, List[Exception]]: |
| 307 | + relations = self._get_catalog_relations(manifest) |
| 308 | + schemas = set(relation.schema for relation in relations) |
| 309 | + if schemas: |
| 310 | + catalog = self._get_one_catalog(InformationSchema(Path()), schemas, manifest) |
| 311 | + else: |
| 312 | + catalog = dbt.clients.agate_helper.empty_table() |
| 313 | + return catalog, [] |
| 314 | + |
| 315 | + def get_filtered_catalog( |
| 316 | + self, manifest: Manifest, relations: Optional[Set[BaseRelation]] = None |
| 317 | + ): |
| 318 | + catalog, exceptions = self.get_catalog(manifest) |
| 319 | + if relations and catalog: |
| 320 | + relation_map = {(r.schema, r.identifier) for r in relations} |
| 321 | + |
| 322 | + def in_map(row: agate.Row): |
| 323 | + s = _expect_row_value("table_schema", row) |
| 324 | + i = _expect_row_value("table_name", row) |
| 325 | + return (s, i) in relation_map |
| 326 | + |
| 327 | + catalog = catalog.where(in_map) |
| 328 | + return catalog, exceptions |
329 | 329 |
|
330 | 330 | def get_rows_different_sql(
|
331 | 331 | self,
|
|
0 commit comments