From 904aaa13e84ec0300c75678f2c1d5929c58a60c8 Mon Sep 17 00:00:00 2001 From: Istvan Meszaros Date: Wed, 30 Mar 2022 12:53:39 +0200 Subject: [PATCH 1/5] tests fixed --- .gitignore | 3 +- carte_cli/extractor/glue_extractor.py | 47 +++++++++++----- pyproject.toml | 2 +- tests/extractor/test_glue_extractor.py | 76 +++++++++++++------------- 4 files changed, 74 insertions(+), 54 deletions(-) diff --git a/.gitignore b/.gitignore index e95cd67..fd70d22 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ __pycache__ .DS_Store -mock_path \ No newline at end of file +mock_path +dist/ \ No newline at end of file diff --git a/carte_cli/extractor/glue_extractor.py b/carte_cli/extractor/glue_extractor.py index 60dc610..7b7c382 100644 --- a/carte_cli/extractor/glue_extractor.py +++ b/carte_cli/extractor/glue_extractor.py @@ -6,7 +6,7 @@ from typing import Iterator, Union, Dict, Any, List from databuilder.extractor.base_extractor import Extractor from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType - +import json class GlueExtractor(Extractor): def __init__(self, connection_name: str): @@ -41,6 +41,34 @@ def _get_column_type(self, column: Dict) -> str: else: return col_type + def _get_glue_table_columns(self, row: Dict[str, Any], table_name: str) -> List[ColumnMetadata]: + columns = [] + if "spark.sql.sources.schema" in row["Parameters"]: + # For delta and parquet tables if the schema is not too big + schema = json.loads(row["Parameters"]["spark.sql.sources.schema"]) + elif 'spark.sql.sources.schema.numParts' in row["Parameters"]: + # If the delta or parquet table's schema is very big, glue separates it to multiple properties ¯\_(ツ)_/¯ + schema_parts_count = int(row["Parameters"]["spark.sql.sources.schema.numParts"]) + schema_str = "" + for part in range(0,schema_parts_count): + schema_str = schema_str+row["Parameters"][f"spark.sql.sources.schema.part.{part}"] + schema = json.loads(schema_str) + else: + print(table_name) + print(row) + raise Exception("Unsupported glue table format") + fields = schema["fields"] + for column in fields: + columns.append( + ColumnMetadata( + name=column["name"], + column_type=self._get_column_type(column), + description=None, + ) + ) + return columns + + def _get_extract_iter(self) -> Iterator[TableMetadata]: for row in self._get_raw_extract_iter(): columns = [] @@ -56,6 +84,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: is_view = row.get("TableType") == "VIRTUAL_VIEW" if is_view: + table_type = TableType.VIEW for column in row["StorageDescriptor"]["Columns"]: columns.append( ColumnMetadata( @@ -64,19 +93,9 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: description=None, ) ) - else: - # This solution is more robust for Delta and Parquet tables. - # Both table types has these fields - for column in row["Parameters"]["spark.sql.sources.schema"]["fields"]: - columns.append( - ColumnMetadata( - name=column["name"], - column_type=self._get_column_type(column), - description=None, - ) - ) - - table_type = TableType.VIEW if is_view else TableType.TABLE + else: + columns = self._get_glue_table_columns(row, full_table_name) + table_type = TableType.TABLE yield TableMetadata( name=table_name, diff --git a/pyproject.toml b/pyproject.toml index c028af7..d5e97c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carte-cli" -version = "0.3.20" +version = "0.3.21" description = "A static site generator for data catalogs" authors = ["Balint Haller "] license = "GPL-3.0-or-later" diff --git a/tests/extractor/test_glue_extractor.py b/tests/extractor/test_glue_extractor.py index 4b5d8f3..7b9a3bf 100644 --- a/tests/extractor/test_glue_extractor.py +++ b/tests/extractor/test_glue_extractor.py @@ -62,46 +62,46 @@ def test_extraction_with_single_result(self) -> None: ], "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_id1", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "col_id2", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "source", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "is_active", "type": "boolean", - "nullable": True, + "nullable": "True" }, { "name": "etl_created_at", "type": "timestamp", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "partition_key1", "type": "string", - "nullable": True, - "metadata": {}, - }, - ], - } + "nullable": "True", + "metadata": {} + } + ] + }""" }, } ] @@ -159,35 +159,35 @@ def test_extraction_with_multiple_result(self) -> None: ], "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_id1", "type": "bigint", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "source", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "ds", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "partition_key1", "type": "string", - "nullable": True, - "metadata": {}, - }, - ], - } + "nullable": "True", + "metadata": {} + } + ] + }""" }, }, { @@ -216,20 +216,20 @@ def test_extraction_with_multiple_result(self) -> None: }, "TableType": "EXTERNAL_TABLE", "Parameters": { - "spark.sql.sources.schema": { + "spark.sql.sources.schema": """{ "type": "struct", "fields": [ { "name": "col_name", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "col_name2", "type": "varchar", - "nullable": True, - "metadata": {}, + "nullable": "True", + "metadata": {} }, { "name": "event_properties", @@ -237,13 +237,13 @@ def test_extraction_with_multiple_result(self) -> None: "type": "map", "keyType": "string", "valueType": "string", - "valueContainsNull": True, + "valueContainsNull": "True" }, - "nullable": True, - "metadata": {}, - }, - ], - } + "nullable": "True", + "metadata": {} + } + ] + }""" }, }, { From 72d8ea6aff269dab274dda6d26051853fc1a23f6 Mon Sep 17 00:00:00 2001 From: Istvan Meszaros Date: Wed, 30 Mar 2022 12:55:43 +0200 Subject: [PATCH 2/5] formatting --- .gitignore | 1 + carte_cli/extractor/glue_extractor.py | 27 +++++++++++++++++---------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index fd70d22..636d240 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __pycache__ .DS_Store +.vscode mock_path dist/ \ No newline at end of file diff --git a/carte_cli/extractor/glue_extractor.py b/carte_cli/extractor/glue_extractor.py index 7b7c382..3d88f28 100644 --- a/carte_cli/extractor/glue_extractor.py +++ b/carte_cli/extractor/glue_extractor.py @@ -8,6 +8,7 @@ from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType import json + class GlueExtractor(Extractor): def __init__(self, connection_name: str): super().__init__() @@ -41,17 +42,24 @@ def _get_column_type(self, column: Dict) -> str: else: return col_type - def _get_glue_table_columns(self, row: Dict[str, Any], table_name: str) -> List[ColumnMetadata]: + def _get_glue_table_columns( + self, row: Dict[str, Any], table_name: str + ) -> List[ColumnMetadata]: columns = [] if "spark.sql.sources.schema" in row["Parameters"]: - # For delta and parquet tables if the schema is not too big - schema = json.loads(row["Parameters"]["spark.sql.sources.schema"]) - elif 'spark.sql.sources.schema.numParts' in row["Parameters"]: + # For delta and parquet tables if the schema is not too big + schema = json.loads(row["Parameters"]["spark.sql.sources.schema"]) + elif "spark.sql.sources.schema.numParts" in row["Parameters"]: # If the delta or parquet table's schema is very big, glue separates it to multiple properties ¯\_(ツ)_/¯ - schema_parts_count = int(row["Parameters"]["spark.sql.sources.schema.numParts"]) + schema_parts_count = int( + row["Parameters"]["spark.sql.sources.schema.numParts"] + ) schema_str = "" - for part in range(0,schema_parts_count): - schema_str = schema_str+row["Parameters"][f"spark.sql.sources.schema.part.{part}"] + for part in range(0, schema_parts_count): + schema_str = ( + schema_str + + row["Parameters"][f"spark.sql.sources.schema.part.{part}"] + ) schema = json.loads(schema_str) else: print(table_name) @@ -67,8 +75,7 @@ def _get_glue_table_columns(self, row: Dict[str, Any], table_name: str) -> List[ ) ) return columns - - + def _get_extract_iter(self) -> Iterator[TableMetadata]: for row in self._get_raw_extract_iter(): columns = [] @@ -93,7 +100,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]: description=None, ) ) - else: + else: columns = self._get_glue_table_columns(row, full_table_name) table_type = TableType.TABLE From 6c5c4994162b8eb14cc0f9467e51502fe3674914 Mon Sep 17 00:00:00 2001 From: Istvan Meszaros Date: Wed, 30 Mar 2022 13:11:24 +0200 Subject: [PATCH 3/5] added another test for BIG schemas --- tests/extractor/test_glue_extractor.py | 57 ++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/extractor/test_glue_extractor.py b/tests/extractor/test_glue_extractor.py index 7b9a3bf..9dba980 100644 --- a/tests/extractor/test_glue_extractor.py +++ b/tests/extractor/test_glue_extractor.py @@ -246,6 +246,48 @@ def test_extraction_with_multiple_result(self) -> None: }""" }, }, + { + "Name": "big_schema_test_table", + "DatabaseName": "test_schema1", + "Description": "test table for big schema", + "TableType": "EXTERNAL_TABLE", + "StorageDescriptor": { + "Location": "test_location3", + }, + "Parameters": { + "spark.sql.sources.schema.numParts": "2", + "spark.sql.sources.schema.part.0": """{ + "type": "struct", + "fields": [ + { + "name": "col_name", + "type": "varchar", + "nullable": "Tr""", + "spark.sql.sources.schema.part.1": """ue", + "metadata": {} + }, + { + "name": "col_name2", + "type": "varchar", + "nullable": "True", + "metadata": {} + }, + { + "name": "event_properties", + "type": { + "type": "map", + "keyType": "string", + "valueType": "string", + "valueContainsNull": "True" + }, + "nullable": "True", + "metadata": {} + } + ] + } + """, + }, + }, { "Name": "test_view1", "DatabaseName": "test_schema1", @@ -323,6 +365,21 @@ def test_extraction_with_multiple_result(self) -> None: ColumnMetadata("event_properties", "map", None), ], ) + self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) + + expected = TableMetadata( + name="big_schema_test_table", + connection="test-connection", + database="test_schema1", + description=None, + location="test_location3", + table_type=TableType.TABLE, + columns=[ + ColumnMetadata("col_name", "varchar", None), + ColumnMetadata("col_name2", "varchar", None), + ColumnMetadata("event_properties", "map", None), + ], + ) self.assertEqual(expected.__repr__(), extractor.extract().__repr__()) From 48d4e55fef2fc906351102ba822bfd7fbbad7761 Mon Sep 17 00:00:00 2001 From: Istvan Meszaros Date: Wed, 30 Mar 2022 15:50:02 +0200 Subject: [PATCH 4/5] Update carte_cli/extractor/glue_extractor.py Co-authored-by: Balint Haller --- carte_cli/extractor/glue_extractor.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/carte_cli/extractor/glue_extractor.py b/carte_cli/extractor/glue_extractor.py index 3d88f28..c40ad5e 100644 --- a/carte_cli/extractor/glue_extractor.py +++ b/carte_cli/extractor/glue_extractor.py @@ -54,12 +54,7 @@ def _get_glue_table_columns( schema_parts_count = int( row["Parameters"]["spark.sql.sources.schema.numParts"] ) - schema_str = "" - for part in range(0, schema_parts_count): - schema_str = ( - schema_str - + row["Parameters"][f"spark.sql.sources.schema.part.{part}"] - ) +schema_str = ''.join([row["Parameters"][f"spark.sql.sources.schema.part.{part}"] for part in range(schema_parts_count)]) schema = json.loads(schema_str) else: print(table_name) From afdb29a950a1b5958cf5a2b1a27219351f2a28ea Mon Sep 17 00:00:00 2001 From: Istvan Meszaros Date: Wed, 30 Mar 2022 15:58:28 +0200 Subject: [PATCH 5/5] exception fix --- carte_cli/extractor/glue_extractor.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/carte_cli/extractor/glue_extractor.py b/carte_cli/extractor/glue_extractor.py index c40ad5e..40b61c6 100644 --- a/carte_cli/extractor/glue_extractor.py +++ b/carte_cli/extractor/glue_extractor.py @@ -9,6 +9,11 @@ import json +class GlueExtractorException(Exception): + def __init__(self, *args: object) -> None: + super().__init__(*args) + + class GlueExtractor(Extractor): def __init__(self, connection_name: str): super().__init__() @@ -54,12 +59,17 @@ def _get_glue_table_columns( schema_parts_count = int( row["Parameters"]["spark.sql.sources.schema.numParts"] ) -schema_str = ''.join([row["Parameters"][f"spark.sql.sources.schema.part.{part}"] for part in range(schema_parts_count)]) + schema_str = "".join( + [ + row["Parameters"][f"spark.sql.sources.schema.part.{part}"] + for part in range(schema_parts_count) + ] + ) schema = json.loads(schema_str) else: - print(table_name) - print(row) - raise Exception("Unsupported glue table format") + raise GlueExtractorException( + f"Unsupported glue table format for {table_name}", row + ) fields = schema["fields"] for column in fields: columns.append(