Skip to content

Commit

Permalink
Merge pull request #7 from carte-data/delta-tables-from-glue
Browse files Browse the repository at this point in the history
Delta tables from glue
  • Loading branch information
Balint Haller authored Mar 29, 2022
2 parents 0884a82 + 884efcb commit 8c7d662
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 15 deletions.
46 changes: 32 additions & 14 deletions carte_cli/extractor/glue_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ def extract(self) -> Union[TableMetadata, None]:
def get_scope(self):
return "carte.extractor.glue"

def _get_column_type(self, column: Dict) -> str:
col_type = column["type"]
if type(col_type) == dict:
col_sub_type = col_type["type"]
if col_sub_type == "map":
return f"map<{col_type['keyType']},{col_type['valueType']}>"
return col_sub_type
else:
return col_type

def _get_extract_iter(self) -> Iterator[TableMetadata]:
for row in self._get_raw_extract_iter():
columns = []
Expand All @@ -43,22 +53,30 @@ def _get_extract_iter(self) -> Iterator[TableMetadata]:
):
continue

for column in row["StorageDescriptor"]["Columns"] + row.get(
"PartitionKeys", []
):
columns.append(
ColumnMetadata(
name=column["Name"],
column_type=column["Type"],
description=None,
is_view = row.get("TableType") == "VIRTUAL_VIEW"

if is_view:
for column in row["StorageDescriptor"]["Columns"]:
columns.append(
ColumnMetadata(
name=column["Name"],
column_type=column["Type"],
description=None,
)
)
else:
# This solution is more robust for Delta and Parquet tables.
# Both table types has these fields
for column in row["Parameters"]["spark.sql.sources.schema"]["fields"]:
columns.append(
ColumnMetadata(
name=column["name"],
column_type=self._get_column_type(column),
description=None,
)
)
)

table_type = (
TableType.VIEW
if (row.get("TableType") == "VIRTUAL_VIEW")
else TableType.TABLE
)
table_type = TableType.VIEW if is_view else TableType.TABLE

yield TableMetadata(
name=table_name,
Expand Down
111 changes: 110 additions & 1 deletion tests/extractor/test_glue_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ def test_extraction_with_single_result(self) -> None:
},
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"type": "struct",
"fields": [
{
"name": "col_id1",
"type": "bigint",
"nullable": True,
"metadata": {},
},
{
"name": "col_id2",
"type": "bigint",
"nullable": True,
"metadata": {},
},
{
"name": "source",
"type": "varchar",
"nullable": True,
"metadata": {},
},
{
"name": "is_active",
"type": "boolean",
"nullable": True,
},
{
"name": "etl_created_at",
"type": "timestamp",
"nullable": True,
"metadata": {},
},
{
"name": "partition_key1",
"type": "string",
"nullable": True,
"metadata": {},
},
],
}
},
}
]

Expand All @@ -77,8 +119,8 @@ def test_extraction_with_single_result(self) -> None:
columns=[
ColumnMetadata("col_id1", "bigint", None),
ColumnMetadata("col_id2", "bigint", None),
ColumnMetadata("is_active", "boolean", None),
ColumnMetadata("source", "varchar", None),
ColumnMetadata("is_active", "boolean", None),
ColumnMetadata("etl_created_at", "timestamp", None),
ColumnMetadata("partition_key1", "string", None),
],
Expand Down Expand Up @@ -116,6 +158,37 @@ def test_extraction_with_multiple_result(self) -> None:
},
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"type": "struct",
"fields": [
{
"name": "col_id1",
"type": "bigint",
"nullable": True,
"metadata": {},
},
{
"name": "source",
"type": "varchar",
"nullable": True,
"metadata": {},
},
{
"name": "ds",
"type": "varchar",
"nullable": True,
"metadata": {},
},
{
"name": "partition_key1",
"type": "string",
"nullable": True,
"metadata": {},
},
],
}
},
},
{
"Name": "test_table2",
Expand All @@ -133,10 +206,45 @@ def test_extraction_with_multiple_result(self) -> None:
"Type": "varchar",
"Comment": "description of col_name2",
},
{
"Name": "col_name3",
"Type": "map<string,string>",
"Comment": "description of col_name2",
},
],
"Location": "test_location2",
},
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"spark.sql.sources.schema": {
"type": "struct",
"fields": [
{
"name": "col_name",
"type": "varchar",
"nullable": True,
"metadata": {},
},
{
"name": "col_name2",
"type": "varchar",
"nullable": True,
"metadata": {},
},
{
"name": "event_properties",
"type": {
"type": "map",
"keyType": "string",
"valueType": "string",
"valueContainsNull": True,
},
"nullable": True,
"metadata": {},
},
],
}
},
},
{
"Name": "test_view1",
Expand Down Expand Up @@ -212,6 +320,7 @@ def test_extraction_with_multiple_result(self) -> None:
columns=[
ColumnMetadata("col_name", "varchar", None),
ColumnMetadata("col_name2", "varchar", None),
ColumnMetadata("event_properties", "map<string,string>", None),
],
)

Expand Down

0 comments on commit 8c7d662

Please sign in to comment.