Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add openlineage adapter #1123

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
43 changes: 43 additions & 0 deletions docs/concepts/_snippets/simple_materializer_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pandas as pd
import xgboost

from hamilton.function_modifiers import dataloader, datasaver
from hamilton.io import utils


@dataloader()
def raw_df(data_path: str) -> tuple[pd.DataFrame, dict]:
"""Load raw data from parquet file"""
df = pd.read_parquet(data_path)
return df, utils.get_file_and_dataframe_metadata(data_path, df)


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
"""preprocess raw data"""
return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
"""Train model on preprocessed data"""
return ...


@datasaver()
def save_model(model: xgboost.XGBModel, model_dir: str) -> dict:
"""Save trained model to JSON format"""
model.save_model(f"{model_dir}/model.json")
return utils.get_file_metadata(f"{model_dir}/model.json")


if __name__ == "__main__":
import __main__

from hamilton import driver

dr = driver.Builder().with_modules(__main__).build()
data_path = "..."
model_dir = "..."
inputs = dict(data_path=data_path, model_dir=model_dir)
final_vars = ["save_model"]
results = dr.execute(final_vars, inputs=inputs)
# results["save_model"] == None
8 changes: 7 additions & 1 deletion docs/concepts/_snippets/static_materializer_ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
path=f"{model_dir}/model.json",
),
]
dr = driver.Builder().with_modules(__main__).with_materializers(*materializers).build()
dr = (
driver.Builder()
.with_modules(__main__)
.with_materializers(*materializers)
.build()
)

results = dr.execute(["model", "model__json"])
# results["model"] <- the model
# results["model__json"] <- metadata from saving the model
34 changes: 22 additions & 12 deletions docs/concepts/materialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ On this page, you'll learn:
Different ways to write the same dataflow
-----------------------------------------

Below are 5 ways to write a dataflow that:
Below are 6 ways to write a dataflow that:

1. loads a dataframe from a parquet file
2. preprocesses the dataframe
3. trains a machine learning model
4. saves the trained model

The first two options don't use the concept of materialization and the next three do.
The first two options don't use the concept of materialization and the next four do.

Without materialization
-----------------------
Expand All @@ -49,7 +49,7 @@ Observations:
Limitations
~~~~~~~~~~~~

Materializations aims to solve 3 limitations:
Hamilton's approach to "materializations" aims to solve 3 limitations:

1. **Redundancy**: deduplicate loading & saving code to improve maintainability and debugging
2. **Observability**: include loading & saving in the dataflow for full observability and allow hooks
Expand All @@ -62,15 +62,25 @@ With materialization
.. table::
:align: left

+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+
| 3) Static materializers | 4) Dynamic materializers | 5) Function modifiers |
+=============================================================+=============================================================+=================================================+
| .. literalinclude:: _snippets/static_materializer_ctx.py | .. literalinclude:: _snippets/dynamic_materializer_ctx.py | .. literalinclude:: _snippets/decorator_ctx.py |
| | | |
+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+
| .. image:: _snippets/static_materializer_ctx.png | .. image:: _snippets/dynamic_materializer_ctx.png | .. image:: _snippets/decorator_ctx.png |
| :width: 500px | :width: 500px | :width: 500px |
+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+
+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+
| 3) Simple Materialization | 4) Static materializers | 5) Dynamic materializers | 6) Function modifiers |
+=============================================================+=============================================================+=============================================================+=================================================+
| .. literalinclude:: _snippets/simple_materializer_ctx.py | .. literalinclude:: _snippets/static_materializer_ctx.py | .. literalinclude:: _snippets/dynamic_materializer_ctx.py | .. literalinclude:: _snippets/decorator_ctx.py |
| | | | |
+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+
| .. image:: _snippets/simple_materializer_ctx.png | .. image:: _snippets/static_materializer_ctx.png | .. image:: _snippets/dynamic_materializer_ctx.png | .. image:: _snippets/decorator_ctx.png |
| :width: 500px | :width: 500px | :width: 500px | :width: 500px |
+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------+

Simple Materialization
~~~~~~~~~~~~~~~~~~~~~~~
When you don't need to hide the implementation details of how you read and write, but you
want to track what was read and written, you need to expose extra metadata. This is where
the :doc:`@datasaver() <../reference/decorators/datasaver/>` and :doc:`@dataloader() <../reference/decorators/dataloader/>` decorators come in. They allow you to return
metadata about what was read and written, and this metadata is then used to track what
was read and written.

This is our recommended first step when you're starting to use materialization in Hamilton.


Static materializers
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ This section showcases how Hamilton integrates with popular frameworks.
Spark <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark>
Vaex <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/vaex>
Narwhals <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/narwhals>
OpenLineage <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/openlineage>
8 changes: 8 additions & 0 deletions docs/reference/lifecycle-hooks/OpenLineageAdapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
========================================
plugins.h_openlineage.OpenLineageAdapter
========================================

.. autoclass:: hamilton.plugins.h_openlineage.OpenLineageAdapter
:special-members: __init__
:members:
:inherited-members:
1 change: 1 addition & 0 deletions docs/reference/lifecycle-hooks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ Recall to add lifecycle adapters, you just need to call the ``with_adapters`` me
Narwhals
MLFlowTracker
NoEdgeAndInputTypeChecking
OpenLineageAdapter
27 changes: 27 additions & 0 deletions examples/openlineage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# OpenLineage Adapter

This is an example of how to use the OpenLineage adapter that can be used to send metadata to an OpenLineage server.

## Motivation
OpenLineage is an open standard for data lineage.
With Hamilton you can read and write data, and with OpenLineage you can track the lineage of that data.

## Steps
1. Build your project with Hamilton.
2. Use one of the [materialization approaches](https://hamilton.dagworks.io/en/latest/concepts/materialization/) to surface metadata about what is loaded and saved.
3. Use the OpenLineage adapter to send metadata to an OpenLineage server.

## To run this example:

1. Install the requirements:
```bash
pip install -r requirements.txt
```
2. Run the example:
```bash
python run.py
```
Or run the example in a notebook:
```bash
jupyter notebook
```
Loading