Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22366] Basic column lineage handling in consumer #155

Merged
merged 11 commits into from
Feb 19, 2025
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"tests.test_database.fixtures.alembic",
"tests.test_consumer.fixtures.consumer_app_settings",
"tests.test_consumer.fixtures.test_broker",
"tests.test_consumer.test_extractors.fixtures.extracted_dto",
"tests.test_consumer.test_extractors.fixtures.column_lineage_facets",
"tests.test_server.fixtures.server_app_settings",
"tests.test_server.fixtures.test_server_app",
"tests.test_server.fixtures.test_client",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_consumer/resources/events_spark.jsonl

Large diffs are not rendered by default.

196 changes: 196 additions & 0 deletions tests/test_consumer/test_extractors/fixtures/column_lineage_facets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from datetime import datetime, timezone

import pytest
from uuid6 import UUID

from data_rentgen.consumer.openlineage.dataset import (
OpenLineageInputDataset,
OpenLineageOutputDataset,
)
from data_rentgen.consumer.openlineage.dataset_facets import (
OpenLineageColumnLineageDatasetFacet,
OpenLineageColumnLineageDatasetFacetField,
OpenLineageColumnLineageDatasetFacetFieldRef,
OpenLineageColumnLineageDatasetFacetFieldTransformation,
OpenLineageDatasetFacets,
)
from data_rentgen.consumer.openlineage.job import OpenLineageJob
from data_rentgen.consumer.openlineage.job_facets import (
OpenLineageJobFacets,
OpenLineageJobIntegrationType,
OpenLineageJobProcessingType,
OpenLineageJobType,
OpenLineageJobTypeJobFacet,
)
from data_rentgen.consumer.openlineage.run import OpenLineageRun
from data_rentgen.consumer.openlineage.run_event import (
OpenLineageRunEvent,
OpenLineageRunEventType,
)
from data_rentgen.consumer.openlineage.run_facets import (
OpenLineageParentJob,
OpenLineageParentRun,
OpenLineageParentRunFacet,
OpenLineageRunFacets,
)


@pytest.fixture
def output_event_with_one_to_two_direct_column_lineage() -> OpenLineageOutputDataset:
return OpenLineageOutputDataset(
namespace="hdfs://test-hadoop:9820",
name="/user/hive/warehouse/mydb.db/mytable",
facets=OpenLineageDatasetFacets(
columnLineage=OpenLineageColumnLineageDatasetFacet(
fields={
"column_1": OpenLineageColumnLineageDatasetFacetField(
inputFields=[
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_1",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="DIRECT",
subtype="AGGREGATION",
),
],
),
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_2",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="DIRECT",
subtype="TRANSFORMATION",
),
],
),
],
),
},
),
),
)


@pytest.fixture
def output_event_with_one_to_two_direct_and_indirect_column_lineage() -> OpenLineageOutputDataset:
return OpenLineageOutputDataset(
namespace="hdfs://test-hadoop:9820",
name="/user/hive/warehouse/mydb.db/mytable",
facets=OpenLineageDatasetFacets(
columnLineage=OpenLineageColumnLineageDatasetFacet(
fields={
"column_1": OpenLineageColumnLineageDatasetFacetField(
inputFields=[
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_1",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="DIRECT",
subtype="AGGREGATION",
),
],
),
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_1",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="INDIRECT",
subtype="JOIN",
),
],
),
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_2",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="DIRECT",
subtype="TRANSFORMATION",
),
],
),
],
),
},
dataset=[
OpenLineageColumnLineageDatasetFacetFieldRef(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
field="source_col_2",
transformations=[
OpenLineageColumnLineageDatasetFacetFieldTransformation(
type="INDIRECT",
subtype="SORT",
masking=False,
),
],
),
],
),
),
)


def get_run_event_with_column_lineage(
operation_id: UUID,
column_lineage_facet: OpenLineageColumnLineageDatasetFacet,
) -> OpenLineageRunEvent:
"""
Function for generating run events. One event = one operation.
Args is: operation_id and column lineage facet, which will be add to outputs.
Input is always PG dataset, output - hive dataset.
"""
event_time = datetime(2024, 7, 5, 9, 7, 15, 642000, tzinfo=timezone.utc)
run_id = UUID("01908224-8410-79a2-8de6-a769ad6944c9")
return OpenLineageRunEvent(
eventType=OpenLineageRunEventType.RUNNING,
eventTime=event_time,
job=OpenLineageJob(
namespace="local://some.host.com",
name="mysession.execute_some_command",
facets=OpenLineageJobFacets(
jobType=OpenLineageJobTypeJobFacet(
jobType=OpenLineageJobType.JOB,
processingType=OpenLineageJobProcessingType.BATCH,
integration=OpenLineageJobIntegrationType.SPARK,
),
),
),
run=OpenLineageRun(
runId=operation_id,
facets=OpenLineageRunFacets(
parent=OpenLineageParentRunFacet(
job=OpenLineageParentJob(
namespace="local://some.host.com",
name="mysession",
),
run=OpenLineageParentRun(
runId=run_id,
),
),
),
),
inputs=[
OpenLineageInputDataset(
namespace="postgres://192.168.1.1:5432",
name="mydb.myschema.mytable",
facets=OpenLineageDatasetFacets(),
),
],
outputs=[
OpenLineageOutputDataset(
namespace="hive://test-hadoop:9083",
name="mydb.mytable",
facets=OpenLineageDatasetFacets(columnLineage=column_lineage_facet),
),
],
)
Loading
Loading