Skip to content

Commit

Permalink
[DOP-16663] Add OpenLineage models
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 5, 2024
1 parent 6572f7b commit 1dc2219
Show file tree
Hide file tree
Showing 42 changed files with 1,861 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,4 @@ cython_debug/
.vscode/

.DS_Store
.python-version
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Secondly, create virtualenv and install dependencies:

.. code:: bash
make venv-init
make venv
If you already have venv, but need to install dependencies required for development:

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ help: ##@Help Show this help



venv-init: venv-cleanup venv-install##@Env Init venv and install poetry dependencies
venv: venv-cleanup venv-install##@Env Init venv and install poetry dependencies

venv-cleanup: ##@Env Cleanup venv
@rm -rf .venv || true
Expand Down
10 changes: 5 additions & 5 deletions data_rentgen/consumer/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from faststream import Logger
from faststream.kafka import KafkaRouter

from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent

router = KafkaRouter()


@router.subscriber("input")
@router.publisher("output")
async def base_handler(body: dict, logger: Logger):
logger.info("Test handler, %s", body)
return {"handler": body}
@router.subscriber("input.runs")
async def runs_handler(msg: OpenLineageRunEvent, logger: Logger):
logger.info("Successfully handled, %s", msg)
9 changes: 9 additions & 0 deletions data_rentgen/consumer/openlineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
"""OpenLineage models & facets.
Currently, openlineage-python [does not support](https://github.com/OpenLineage/OpenLineage/issues/2629) deserialization from JSON.
So we have to write our own deserialization logic.
Also FastStream support only ``pydantic`` models whether openlineage-python provides ``attrs`` models.
"""
10 changes: 10 additions & 0 deletions data_rentgen/consumer/openlineage/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import BaseModel, ConfigDict


class OpenLineageBase(BaseModel):
"""Base class for all OpenLineage models."""

model_config = ConfigDict(extra="ignore", frozen=True, arbitrary_types_allowed=True)
37 changes: 37 additions & 0 deletions data_rentgen/consumer/openlineage/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field

from data_rentgen.consumer.openlineage.base import OpenLineageBase
from data_rentgen.consumer.openlineage.dataset_facets import (
OpenLineageDatasetFacetsDict,
OpenLineageInputDatasetFacetsDict,
OpenLineageOutputDatasetFacetsDict,
)


class OpenLineageDataset(OpenLineageBase):
"""Generic dataset model.
See [Dataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

namespace: str = Field(json_schema_extra={"format": "uri"})
name: str
facets: OpenLineageDatasetFacetsDict = Field(default_factory=OpenLineageDatasetFacetsDict) # type: ignore[arg-type]


class OpenLineageInputDataset(OpenLineageDataset):
"""Input dataset model.
See [InputDataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

inputFacets: OpenLineageInputDatasetFacetsDict = Field(default_factory=OpenLineageInputDatasetFacetsDict) # type: ignore[arg-type]


class OpenLineageOutputDataset(OpenLineageDataset):
"""Output dataset model.
See [OutputDataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

outputFacets: OpenLineageOutputDatasetFacetsDict = Field(default_factory=OpenLineageOutputDatasetFacetsDict) # type: ignore[arg-type]
88 changes: 88 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from typing import TypedDict

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.dataquality_metrics import (
OpenLineageDataQualityMetricsInputDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.datasource import (
OpenLineageDatasourceDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.documentation import (
OpenLineageDocumentationDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.lifecycle_change import (
OpenLineageDatasetLifecycleStateChange,
OpenLineageDatasetPreviousIdentifier,
OpenLineageLifecycleStateChangeDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.output_statistics import (
OpenLineageOutputStatisticsOutputDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.schema import (
OpenLineageSchemaDatasetFacet,
OpenLineageSchemaField,
)
from data_rentgen.consumer.openlineage.dataset_facets.storage import (
OpenLineageStorageDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.symlinks import (
OpenLineageSymlinkIdentifier,
OpenLineageSymlinksDatasetFacet,
OpenLineageSymlinkType,
)

__all__ = [
"OpenLineageDatasetFacet",
"OpenLineageDataQualityMetricsInputDatasetFacet",
"OpenLineageDatasourceDatasetFacet",
"OpenLineageDocumentationDatasetFacet",
"OpenLineageLifecycleStateChangeDatasetFacet",
"OpenLineageDatasetPreviousIdentifier",
"OpenLineageDatasetLifecycleStateChange",
"OpenLineageOutputStatisticsOutputDatasetFacet",
"OpenLineageSchemaDatasetFacet",
"OpenLineageSchemaField",
"OpenLineageStorageDatasetFacet",
"OpenLineageSymlinksDatasetFacet",
"OpenLineageSymlinkType",
"OpenLineageSymlinkIdentifier",
"OpenLineageDatasetFacetsDict",
"OpenLineageInputDatasetFacetsDict",
"OpenLineageOutputDatasetFacetsDict",
]


class OpenLineageDatasetFacetsDict(TypedDict, total=False):
"""All possible dataset facets.
See [Dataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

documentation: OpenLineageDocumentationDatasetFacet
dataQualityMetrics: OpenLineageDataQualityMetricsInputDatasetFacet
dataSource: OpenLineageDatasourceDatasetFacet
lifecycleStateChange: OpenLineageLifecycleStateChangeDatasetFacet
outputStatistics: OpenLineageOutputStatisticsOutputDatasetFacet
schema: OpenLineageSchemaDatasetFacet
storage: OpenLineageStorageDatasetFacet
symlinks: OpenLineageSymlinksDatasetFacet


class OpenLineageInputDatasetFacetsDict(TypedDict, total=False):
"""All possible input dataset facets.
See [InputDataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

dataQualityMetrics: OpenLineageDataQualityMetricsInputDatasetFacet


class OpenLineageOutputDatasetFacetsDict(TypedDict, total=False):
"""All possible output dataset facets.
See [InputDataset](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""

outputStatistics: OpenLineageOutputStatisticsOutputDatasetFacet
22 changes: 22 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.openlineage.base import OpenLineageBase


class OpenLineageDatasetFacet(OpenLineageBase):
"""Base class for all dataset facets.
See [DatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""


class OpenLineageInputDatasetFacet(OpenLineageDatasetFacet):
"""Base class for input dataset facets.
See [DatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""


class OpenLineageOutputDatasetFacet(OpenLineageDatasetFacet):
"""Base class for output dataset facets.
See [DatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json).
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageOutputDatasetFacet,
)


class OpenLineageDataQualityMetricsInputDatasetFacet(OpenLineageOutputDatasetFacet):
"""Dataset facet describing data quality metrics.
See [DataQualityMetricsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/DataQualityMetricsInputDatasetFacet.json).
Note: `columnMetrics` are ignored.
"""

rows: int | None = Field(default=None, alias="rowCount")
bytes: int | None = None
files: int | None = Field(default=None, alias="fileCount")
15 changes: 15 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageDatasourceDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing data source information.
See [DatasourceDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/DatasourceDatasetFacet.json).
"""

name: str
uri: str
14 changes: 14 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/documentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageDocumentationDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing documentation.
See [DocumentationDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/DocumentationDatasetFacet.json).
"""

description: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from enum import Enum

from data_rentgen.consumer.openlineage.base import OpenLineageBase
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageDatasetLifecycleStateChange(str, Enum):
"""Lifecycle state change type.
See [LifecycleStateChangeDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/LifecycleStateChangeDatasetFacet.json).
"""

ALTER = "ALTER"
CREATE = "CREATE"
DROP = "DROP"
OVERWRITE = "OVERWRITE"
RENAME = "RENAME"
TRUNCATE = "TRUNCATE"

def __str__(self) -> str:
return self.value


class OpenLineageDatasetPreviousIdentifier(OpenLineageBase):
"""Previous identifier information. Used only if `lifecycleStateChange=RENAME`.
See [LifecycleStateChangeDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/LifecycleStateChangeDatasetFacet.json).
"""

namespace: str
name: str


class OpenLineageLifecycleStateChangeDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing lifecycle state change.
See [LifecycleStateChangeDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/LifecycleStateChangeDatasetFacet.json).
"""

lifecycleStateChange: OpenLineageDatasetLifecycleStateChange
previousIdentifier: OpenLineageDatasetPreviousIdentifier | None = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageOutputDatasetFacet,
)


class OpenLineageOutputStatisticsOutputDatasetFacet(OpenLineageOutputDatasetFacet):
"""Dataset facet describing output statistics.
See [OutputStatisticsOutputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/OutputStatisticsOutputDatasetFacet.json).
"""

rows: int | None = Field(default=None, alias="rowCount")
bytes: int | None = Field(default=None, alias="size")
files: int | None = Field(default=None, alias="fileCount")
28 changes: 28 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from data_rentgen.consumer.openlineage.base import OpenLineageBase
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageSchemaField(OpenLineageBase):
"""Dataset field information.
See [SchemaDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SchemaDatasetFacet.json).
"""

name: str
type: str | None = None
description: str | None = None
fields: list[OpenLineageSchemaField] | None = None


class OpenLineageSchemaDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing schema.
See [SchemaDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SchemaDatasetFacet.json).
"""

fields: list[OpenLineageSchemaField]
15 changes: 15 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageStorageDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing storage information.
See [StorageDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/StorageDatasetFacet.json).
"""

storageLayer: str
fileFormat: str
Loading

0 comments on commit 1dc2219

Please sign in to comment.