Skip to content

Commit 64174d9

Browse files
authored
Add parquet importer (datacontract#494)
* Add parquet importer - WIP * Add parquet importer - WIP * Add pyarrow mapping * Update README * Update CHANGELOG * Make ruff order imports like in ci action * Move pyarrow to parquet extra
1 parent d6d7e57 commit 64174d9

File tree

8 files changed

+169
-5
lines changed

8 files changed

+169
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
## [Unreleased]
1010

1111
### Added
12+
- Support for model import from parquet file metadata.
1213

1314
### Changed
1415

README.md

+8-3
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,16 @@ A list of available extras:
194194
| Avro Support | `pip install datacontract-cli[avro]` |
195195
| Google BigQuery | `pip install datacontract-cli[bigquery]` |
196196
| Databricks Integration | `pip install datacontract-cli[databricks]` |
197+
| Iceberg | `pip install datacontract-cli[iceberg]` |
197198
| Kafka Integration | `pip install datacontract-cli[kafka]` |
198199
| PostgreSQL Integration | `pip install datacontract-cli[postgres]` |
199200
| S3 Integration | `pip install datacontract-cli[s3]` |
200201
| Snowflake Integration | `pip install datacontract-cli[snowflake]` |
201202
| Microsoft SQL Server | `pip install datacontract-cli[sqlserver]` |
202203
| Trino | `pip install datacontract-cli[trino]` |
203204
| Dbt | `pip install datacontract-cli[dbt]` |
205+
| Dbml | `pip install datacontract-cli[dbml]` |
206+
| Parquet | `pip install datacontract-cli[parquet]` |
204207

205208

206209

@@ -930,8 +933,8 @@ models:
930933
Create a data contract from the given source location. Prints to stdout.
931934

932935
╭─ Options ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
933-
│ * --format [sql|avro|dbt|glue|jsonschema|bigquery|odcs The format of the source file. │
934-
│ |unity|spark|iceberg] [default: None]
936+
│ * --format [sql|avro|dbt|dbml|glue|jsonschema|bigquery The format of the source file. │
937+
│ |odcs|unity|spark|iceberg|parquet] [default: None]
935938
[required]
936939
│ --source TEXT The path to the file or Glue Database that │
937940
│ should be imported. │
@@ -962,7 +965,8 @@ models:
962965
│ empty for all tables in the file). │
963966
[default: None]
964967
│ --iceberg-table TEXT Table name to assign to the model created │
965-
│ from the Iceberg schema. [default: None]
968+
│ from the Iceberg schema. │
969+
[default: None]
966970
│ --help Show this message and exit. │
967971
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
968972
```
@@ -989,6 +993,7 @@ Available import options:
989993
| `dbml` | Import from DBML models ||
990994
| `protobuf` | Import from Protobuf schemas | TBD |
991995
| `iceberg` | Import from an Iceberg JSON Schema Definition | partial |
996+
| `parquet` | Import from Parquet File Metadta ||
992997
| Missing something? | Please create an issue on GitHub | TBD |
993998

994999

datacontract/imports/importer.py

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class ImportFormat(str, Enum):
3030
unity = "unity"
3131
spark = "spark"
3232
iceberg = "iceberg"
33+
parquet = "parquet"
3334

3435
@classmethod
3536
def get_supported_formats(cls):

datacontract/imports/importer_factory.py

+5
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,8 @@ def load_module_class(module_path, class_name):
9999
module_path="datacontract.imports.iceberg_importer",
100100
class_name="IcebergImporter",
101101
)
102+
importer_factory.register_lazy_importer(
103+
name=ImportFormat.parquet,
104+
module_path="datacontract.imports.parquet_importer",
105+
class_name="ParquetImporter",
106+
)
+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import os.path
2+
3+
import pyarrow
4+
from pyarrow import parquet
5+
6+
from datacontract.imports.importer import Importer
7+
from datacontract.model.data_contract_specification import (
8+
DataContractSpecification,
9+
Field,
10+
Model,
11+
)
12+
from datacontract.model.exceptions import DataContractException
13+
14+
15+
class ParquetImporter(Importer):
16+
def import_source(
17+
self, data_contract_specification: DataContractSpecification, source: str, import_args: dict
18+
) -> DataContractSpecification:
19+
return import_parquet(data_contract_specification, source)
20+
21+
22+
def import_parquet(data_contract_specification: DataContractSpecification, source: str) -> DataContractSpecification:
23+
# use filename as schema name, remove .parquet suffix, avoid breaking the yaml output by replacing dots
24+
schema_name = os.path.basename(source).removesuffix(".parquet").replace(".", "_")
25+
26+
fields: dict[str, Field] = {}
27+
28+
arrow_schema = parquet.read_schema(source)
29+
for field_name in arrow_schema.names:
30+
parquet_field = arrow_schema.field(field_name)
31+
32+
field = map_pyarrow_field_to_specification_field(parquet_field, "parquet")
33+
34+
if not parquet_field.nullable:
35+
field.required = True
36+
37+
fields[field_name] = field
38+
39+
data_contract_specification.models[schema_name] = Model(fields=fields)
40+
41+
return data_contract_specification
42+
43+
44+
def map_pyarrow_field_to_specification_field(pyarrow_field: pyarrow.Field, file_format: str) -> Field:
45+
if pyarrow.types.is_boolean(pyarrow_field.type):
46+
return Field(type="boolean")
47+
if pyarrow.types.is_int32(pyarrow_field.type):
48+
return Field(type="int")
49+
if pyarrow.types.is_int64(pyarrow_field.type):
50+
return Field(type="long")
51+
if pyarrow.types.is_integer(pyarrow_field.type):
52+
return Field(type="number")
53+
if pyarrow.types.is_float32(pyarrow_field.type):
54+
return Field(type="float")
55+
if pyarrow.types.is_float64(pyarrow_field.type):
56+
return Field(type="double")
57+
if pyarrow.types.is_decimal(pyarrow_field.type):
58+
return Field(type="decimal", precision=pyarrow_field.type.precision, scale=pyarrow_field.type.scale)
59+
if pyarrow.types.is_timestamp(pyarrow_field.type):
60+
return Field(type="timestamp")
61+
if pyarrow.types.is_date(pyarrow_field.type):
62+
return Field(type="date")
63+
if pyarrow.types.is_null(pyarrow_field.type):
64+
return Field(type="null")
65+
if pyarrow.types.is_binary(pyarrow_field.type):
66+
return Field(type="bytes")
67+
if pyarrow.types.is_string(pyarrow_field.type):
68+
return Field(type="string")
69+
if pyarrow.types.is_map(pyarrow_field.type) or pyarrow.types.is_dictionary(pyarrow_field.type):
70+
return Field(type="map")
71+
if pyarrow.types.is_struct(pyarrow_field.type):
72+
return Field(type="struct")
73+
if pyarrow.types.is_list(pyarrow_field.type):
74+
return Field(type="array")
75+
76+
raise DataContractException(
77+
type="schema",
78+
name=f"Parse {file_format} schema",
79+
reason=f"{pyarrow_field.type} currently not supported.",
80+
engine="datacontract",
81+
)

pyproject.toml

+5-2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,12 @@ dbml = [
9494
"pydbml>=1.1.1"
9595
]
9696

97+
parquet = [
98+
"pyarrow>=12.0.0"
99+
]
100+
97101
all = [
98-
"datacontract-cli[kafka,bigquery,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg]"
102+
"datacontract-cli[kafka,bigquery,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet]"
99103
]
100104

101105
dev = [
@@ -105,7 +109,6 @@ dev = [
105109
"moto==5.0.18",
106110
"pandas>=2.1.0",
107111
"pre-commit>=3.7.1,<3.9.0",
108-
"pyarrow>=12.0.0",
109112
"pytest",
110113
"pytest-xdist",
111114
"pymssql==2.3.1",
Binary file not shown.

tests/test_import_parquet.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from typer.testing import CliRunner
2+
3+
from datacontract.cli import app
4+
from datacontract.data_contract import DataContract
5+
6+
parquet_file_path = "fixtures/parquet/data/combined_no_time.parquet"
7+
8+
9+
def test_cli():
10+
runner = CliRunner()
11+
result = runner.invoke(
12+
app,
13+
[
14+
"import",
15+
"--format",
16+
"parquet",
17+
"--source",
18+
parquet_file_path,
19+
],
20+
)
21+
assert result.exit_code == 0
22+
23+
24+
def test_import_parquet():
25+
result = DataContract().import_from_source(format="parquet", source=parquet_file_path)
26+
27+
expected = """dataContractSpecification: 1.1.0
28+
id: my-data-contract-id
29+
info:
30+
title: My Data Contract
31+
version: 0.0.1
32+
models:
33+
combined_no_time:
34+
fields:
35+
string_field:
36+
type: string
37+
blob_field:
38+
type: bytes
39+
boolean_field:
40+
type: boolean
41+
decimal_field:
42+
type: decimal
43+
precision: 10
44+
scale: 2
45+
float_field:
46+
type: float
47+
double_field:
48+
type: double
49+
integer_field:
50+
type: int
51+
bigint_field:
52+
type: long
53+
struct_field:
54+
type: struct
55+
array_field:
56+
type: array
57+
list_field:
58+
type: array
59+
map_field:
60+
type: map
61+
date_field:
62+
type: date
63+
timestamp_field:
64+
type: timestamp
65+
"""
66+
67+
assert result.to_yaml() == expected
68+
assert DataContract(data_contract_str=expected).lint(enabled_linters=set()).has_passed()

0 commit comments

Comments
 (0)