Skip to content

Commit

Permalink
Merge pull request #8 from carte-data/fixes-for-delta-extractor
Browse files Browse the repository at this point in the history
Fixes for glue extractor - for tables with BIG schemas
  • Loading branch information
Balint Haller authored Mar 31, 2022
2 parents 3c9fda2 + afdb29a commit 86dc1c7
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 50 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
__pycache__
.DS_Store
mock_path
.vscode
mock_path
dist/
55 changes: 43 additions & 12 deletions carte_cli/extractor/glue_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from typing import Iterator, Union, Dict, Any, List
from databuilder.extractor.base_extractor import Extractor
from carte_cli.model.carte_table_model import TableMetadata, ColumnMetadata, TableType
import json


class GlueExtractorException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)


class GlueExtractor(Extractor):
Expand Down Expand Up @@ -41,6 +47,40 @@ def _get_column_type(self, column: Dict) -> str:
else:
return col_type

def _get_glue_table_columns(
self, row: Dict[str, Any], table_name: str
) -> List[ColumnMetadata]:
columns = []
if "spark.sql.sources.schema" in row["Parameters"]:
# For delta and parquet tables if the schema is not too big
schema = json.loads(row["Parameters"]["spark.sql.sources.schema"])
elif "spark.sql.sources.schema.numParts" in row["Parameters"]:
# If the delta or parquet table's schema is very big, glue separates it to multiple properties ¯\_(ツ)_/¯
schema_parts_count = int(
row["Parameters"]["spark.sql.sources.schema.numParts"]
)
schema_str = "".join(
[
row["Parameters"][f"spark.sql.sources.schema.part.{part}"]
for part in range(schema_parts_count)
]
)
schema = json.loads(schema_str)
else:
raise GlueExtractorException(
f"Unsupported glue table format for {table_name}", row
)
fields = schema["fields"]
for column in fields:
columns.append(
ColumnMetadata(
name=column["name"],
column_type=self._get_column_type(column),
description=None,
)
)
return columns

def _get_extract_iter(self) -> Iterator[TableMetadata]:
for row in self._get_raw_extract_iter():
columns = []
Expand All @@ -56,6 +96,7 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:
is_view = row.get("TableType") == "VIRTUAL_VIEW"

if is_view:
table_type = TableType.VIEW
for column in row["StorageDescriptor"]["Columns"]:
columns.append(
ColumnMetadata(
Expand All @@ -65,18 +106,8 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:
)
)
else:
# This solution is more robust for Delta and Parquet tables.
# Both table types has these fields
for column in row["Parameters"]["spark.sql.sources.schema"]["fields"]:
columns.append(
ColumnMetadata(
name=column["name"],
column_type=self._get_column_type(column),
description=None,
)
)

table_type = TableType.VIEW if is_view else TableType.TABLE
columns = self._get_glue_table_columns(row, full_table_name)
table_type = TableType.TABLE

yield TableMetadata(
name=table_name,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carte-cli"
version = "0.3.20"
version = "0.3.21"
description = "A static site generator for data catalogs"
authors = ["Balint Haller <balint@hey.com>"]
license = "GPL-3.0-or-later"
Expand Down
129 changes: 93 additions & 36 deletions tests/extractor/test_glue_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,46 +62,46 @@ def test_extraction_with_single_result(self) -> None:
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"spark.sql.sources.schema": """{
"type": "struct",
"fields": [
{
"name": "col_id1",
"type": "bigint",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "col_id2",
"type": "bigint",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "source",
"type": "varchar",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "is_active",
"type": "boolean",
"nullable": True,
"nullable": "True"
},
{
"name": "etl_created_at",
"type": "timestamp",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "partition_key1",
"type": "string",
"nullable": True,
"metadata": {},
},
],
}
"nullable": "True",
"metadata": {}
}
]
}"""
},
}
]
Expand Down Expand Up @@ -159,35 +159,35 @@ def test_extraction_with_multiple_result(self) -> None:
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"spark.sql.sources.schema": """{
"type": "struct",
"fields": [
{
"name": "col_id1",
"type": "bigint",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "source",
"type": "varchar",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "ds",
"type": "varchar",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "partition_key1",
"type": "string",
"nullable": True,
"metadata": {},
},
],
}
"nullable": "True",
"metadata": {}
}
]
}"""
},
},
{
Expand Down Expand Up @@ -216,34 +216,76 @@ def test_extraction_with_multiple_result(self) -> None:
},
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"spark.sql.sources.schema": """{
"type": "struct",
"fields": [
{
"name": "col_name",
"type": "varchar",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "col_name2",
"type": "varchar",
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
},
{
"name": "event_properties",
"type": {
"type": "map",
"keyType": "string",
"valueType": "string",
"valueContainsNull": True,
"valueContainsNull": "True"
},
"nullable": True,
"metadata": {},
"nullable": "True",
"metadata": {}
}
]
}"""
},
},
{
"Name": "big_schema_test_table",
"DatabaseName": "test_schema1",
"Description": "test table for big schema",
"TableType": "EXTERNAL_TABLE",
"StorageDescriptor": {
"Location": "test_location3",
},
"Parameters": {
"spark.sql.sources.schema.numParts": "2",
"spark.sql.sources.schema.part.0": """{
"type": "struct",
"fields": [
{
"name": "col_name",
"type": "varchar",
"nullable": "Tr""",
"spark.sql.sources.schema.part.1": """ue",
"metadata": {}
},
{
"name": "col_name2",
"type": "varchar",
"nullable": "True",
"metadata": {}
},
],
{
"name": "event_properties",
"type": {
"type": "map",
"keyType": "string",
"valueType": "string",
"valueContainsNull": "True"
},
"nullable": "True",
"metadata": {}
}
]
}
""",
},
},
{
Expand Down Expand Up @@ -323,6 +365,21 @@ def test_extraction_with_multiple_result(self) -> None:
ColumnMetadata("event_properties", "map<string,string>", None),
],
)
self.assertEqual(expected.__repr__(), extractor.extract().__repr__())

expected = TableMetadata(
name="big_schema_test_table",
connection="test-connection",
database="test_schema1",
description=None,
location="test_location3",
table_type=TableType.TABLE,
columns=[
ColumnMetadata("col_name", "varchar", None),
ColumnMetadata("col_name2", "varchar", None),
ColumnMetadata("event_properties", "map<string,string>", None),
],
)

self.assertEqual(expected.__repr__(), extractor.extract().__repr__())

Expand Down

0 comments on commit 86dc1c7

Please sign in to comment.