diff --git a/.gitignore b/.gitignore index e95cd67..636d240 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ __pycache__ .DS_Store -mock_path \ No newline at end of file +.vscode +mock_path +dist/ \ No newline at end of file diff --git a/carte_cli/extractor/glue_extractor.py b/carte_cli/extractor/glue_extractor.py index 60dc610..40b61c6 100644 --- a/carte_cli/extractor/glue_extractor.py +++ b/carte_cli/extractor/glue_extractor.py @@ -6,6 +6,12 @@ from typing import Iterator, Union, Dict, Any, List from databuilder.extractor.base_extractor import Extractor from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType +import json + + +class GlueExtractorException(Exception): + def __init__(self, *args: object) -> None: + super().__init__(*args) class GlueExtractor(Extractor): @@ -41,6 +47,40 @@ def _get_column_type(self, column: Dict) -> str: else: return col_type + def _get_glue_table_columns( + self, row: Dict[str, Any], table_name: str + ) -> List[ColumnMetadata]: + columns = [] + if "spark.sql.sources.schema" in row["Parameters"]: + # For delta and parquet tables if the schema is not too big + schema = json.loads(row["Parameters"]["spark.sql.sources.schema"]) + elif "spark.sql.sources.schema.numParts" in row["Parameters"]: + # If the delta or parquet table's schema is very big, glue separates it to multiple properties ¯\_(ツ)_/¯ + schema_parts_count = int( + row["Parameters"]["spark.sql.sources.schema.numParts"] + ) + schema_str = "".join( + [ + row["Parameters"][f"spark.sql.sources.schema.part.{part}"] + for part in range(schema_parts_count) + ] + ) + schema = json.loads(schema_str) + else: + raise GlueExtractorException( + f"Unsupported glue table format for {table_name}", row + ) + fields = schema["fields"] + for column in fields: + columns.append( + ColumnMetadata( + name=column["name"], + column_type=self._get_column_type(column), + description=None, + ) + ) + return columns + def _get_extract_iter(self) -> Iterator[TableMetadata]: for row in self._get_raw_extract_iter(): columns = [] @@ -56,6 +96,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: is_view = row.get("TableType") == "VIRTUAL_VIEW" if is_view: + table_type = TableType.VIEW for column in row["StorageDescriptor"]["Columns"]: columns.append( ColumnMetadata( @@ -65,18 +106,8 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: ) ) else: - # This solution is more robust for Delta and Parquet tables. - # Both table types has these fields - for column in row["Parameters"]["spark.sql.sources.schema"]["fields"]: - columns.append( - ColumnMetadata( - name=column["name"], - column_type=self._get_column_type(column), - description=None, - ) - ) - - table_type = TableType.VIEW if is_view else TableType.TABLE + columns = self._get_glue_table_columns(row, full_table_name) + table_type = TableType.TABLE yield TableMetadata( name=table_name, diff --git a/pyproject.toml b/pyproject.toml index c028af7..d5e97c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carte-cli" -version = "0.3.20" +version = "0.3.21" description = "A static site generator for data catalogs" authors = ["Balint Haller "] license = "GPL-3.0-or-later" diff --git a/tests/extractor/test_glue_extractor.py b/tests/extractor/test_glue_extractor.py index 4b5d8f3..9dba980 100644 --- a/tests/extractor/test_glue_extractor.py +++ b/tests/extractor/test_glue_extractor.py @@ -62,46 +62,46 @@ def test_extraction_with_single_result(self) -> None: ], "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_id1", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "col_id2", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "source", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "is_active", "type": "boolean", - "nullable": True, + "nullable": "True" }, { "name": "etl_created_at", "type": "timestamp", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "partition_key1", "type": "string", - "nullable": True, - "metadata": {}, - }, - ], - } + "nullable": "True", + "metadata": {} + } + ] + }""" }, } ] @@ -159,35 +159,35 @@ def test_extraction_with_multiple_result(self) -> None: ], "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_id1", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "source", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "ds", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "partition_key1", "type": "string", - "nullable": True, - "metadata": {}, - }, - ], - } + "nullable": "True", + "metadata": {} + } + ] + }""" }, }, { @@ -216,20 +216,20 @@ def test_extraction_with_multiple_result(self) -> None: }, "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_name", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "col_name2", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "event_properties", @@ -237,13 +237,55 @@ def test_extraction_with_multiple_result(self) -> None: "type": "map", "keyType": "string", "valueType": "string", - "valueContainsNull": True, + "valueContainsNull": "True" }, - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} + } + ] + }""" + }, + }, + { + "Name": "big_schema_test_table", + "DatabaseName": "test_schema1", + "Description": "test table for big schema", + "TableType": "EXTERNAL_TABLE", + "StorageDescriptor": { + "Location": "test_location3", + }, + "Parameters": { + "spark.sql.sources.schema.numParts": "2", + "spark.sql.sources.schema.part.0": """{ + "type": "struct", + "fields": [ + { + "name": "col_name", + "type": "varchar", + "nullable": "Tr""", + "spark.sql.sources.schema.part.1": """ue", + "metadata": {} + }, + { + "name": "col_name2", + "type": "varchar", + "nullable": "True", + "metadata": {} }, - ], + { + "name": "event_properties", + "type": { + "type": "map", + "keyType": "string", + "valueType": "string", + "valueContainsNull": "True" + }, + "nullable": "True", + "metadata": {} + } + ] } + """, }, }, { @@ -323,6 +365,21 @@ def test_extraction_with_multiple_result(self) -> None: ColumnMetadata("event_properties", "map", None), ], ) + self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) + + expected = TableMetadata( + name="big_schema_test_table", + connection="test-connection", + database="test_schema1", + description=None, + location="test_location3", + table_type=TableType.TABLE, + columns=[ + ColumnMetadata("col_name", "varchar", None), + ColumnMetadata("col_name2", "varchar", None), + ColumnMetadata("event_properties", "map", None), + ], + ) self.assertEqual(expected.__repr__(), extractor.extract().__repr__())