-
Notifications
You must be signed in to change notification settings - Fork 124
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor to enable RayGraphAdapter and HamiltonTracker to work well t…
…ogether This is a squash commit: - issue=#1079 - PR=#1103 Describes what to do in `graph_functions.py` Adds comments to lifecycle base Update h_ray.py with comments for ray tracking compatibility Replicate previous error Inline function, unsure if catching errors and exceptions to be handadled differently BaseDoRemoteExecute has the added Callable function that snadwisched lifecycle hooks method fails, says AssertionError about ray.remote decorator simple script for now to check telemetry, execution yield the ray.remote AssertionError passing pointer through and arguments to lifecycle wrapper into ray.remote post-execute hook for node not called finally executed only when exception occurs, hamilton tracker not executed atexit.register does not work, node keeps running inui added stop() method, but doesn't get called Ray telemtry works for single node, problem with connected nodes Ray telemtry works for single node, problem with connected nodes Ray telemtry works for single node, problem with connected nodes Fixes ray object dereferencing Ray does not resolve nested arguments: https://docs.ray.io/en/latest/ray-core/objects.html#passing-object-arguments So one option is to make them all top level: - one way to do that is to make the other arguments not clash with any possible user parameters -- hence the `__` prefix. This is what I did. - another way would be in the ray adapter, wrap the incoming function, and explicitly do a ray.get() on any ray object references in the kwargs arguments. i.e. keep the nested structure, but when the ray task starts way for all inputs... not sure which is best, but this now works correctly. ray works checkpoint, pre-commit fixed fixed graph level telemtry proposal pinned ruff Correct output, added option to start ray cluster Unit test mimicks the DoNodeExecute unit test Refactored driver so all tests pass Workaround to not break ray by calling init on an open cluster raw_execute does not have post_graph_execute and is private now Correct version for depraction warning all tests work this looks better ruff version comment Refactored pre- and post-graph-execute hooks outside of raw_execute which now has deprecation warning added readme, notebook and made script cli interactive made cluster init optional through inserting config dict User has option to shutdown ray cluster Co-authored-by: Stefan Krawczyk <stefan@dagworks.io>
- Loading branch information
1 parent
004ac5e
commit 4c48161
Showing
17 changed files
with
609 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# Tracking telemetry in Hamilton UI for Ray clusters | ||
|
||
We show the ability to combine the [RayGraphAdapter](https://hamilton.dagworks.io/en/latest/reference/graph-adapters/RayGraphAdapter/) and [HamiltonTracker](https://hamilton.dagworks.io/en/latest/concepts/ui/) to run a dummy DAG. | ||
|
||
# ray_lineage.py | ||
Has three dummy functions: | ||
- waiting 5s | ||
- waiting 1s | ||
- raising an error | ||
|
||
That represent a basic DAG. | ||
|
||
# run_lineage.py | ||
Is where the driver code lives to create the DAG and exercise it. | ||
|
||
To exercise it: | ||
> Have an open instance of Hamilton UI: https://hamilton.dagworks.io/en/latest/concepts/ui/ | ||
|
||
```bash | ||
python -m run_lineage.py | ||
Usage: python -m run_lineage.py [OPTIONS] COMMAND [ARGS]... | ||
|
||
Options: | ||
--help Show this message and exit. | ||
|
||
Commands: | ||
project_id This command will select the created project in Hamilton UI | ||
username This command will input the correct username to access the selected project_id | ||
``` |
107 changes: 107 additions & 0 deletions
107
examples/ray/ray_Hamilton_UI_tracking/hamilton_notebook.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Hamilton UI Adapter\n", | ||
"\n", | ||
"Needs a running instance of Hamilton UI: https://hamilton.dagworks.io/en/latest/concepts/ui/" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from hamilton_sdk.adapters import HamiltonTracker\n", | ||
"\n", | ||
"# Inputs required to track into correct project in the UI\n", | ||
"project_id = 2\n", | ||
"username = \"admin\"\n", | ||
"\n", | ||
"tracker_ray = HamiltonTracker(\n", | ||
" project_id=project_id,\n", | ||
" username=username,\n", | ||
" dag_name=\"telemetry_with_ray\",)\n", | ||
"\n", | ||
"tracker_without_ray = HamiltonTracker(\n", | ||
" project_id=project_id,\n", | ||
" username=username,\n", | ||
" dag_name=\"telemetry_without_ray\",\n", | ||
" )" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Ray adapter\n", | ||
"\n", | ||
"https://hamilton.dagworks.io/en/latest/reference/graph-adapters/RayGraphAdapter/" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from hamilton import base\n", | ||
"from hamilton.plugins.h_ray import RayGraphAdapter\n", | ||
"\n", | ||
"rga = RayGraphAdapter(result_builder=base.PandasDataFrameResult())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Importing Hamilton and the DAG modules" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from hamilton import driver\n", | ||
"import ray_lineage" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"try:\n", | ||
" dr_ray = driver.Builder().with_modules(ray_lineage).with_adapters(rga, tracker_ray).build()\n", | ||
" result_ray = dr_ray.execute(\n", | ||
" final_vars=[\n", | ||
" \"node_5s\",\n", | ||
" \"node_1s_error\",\n", | ||
" \"add_1_to_previous\",\n", | ||
" ]\n", | ||
" )\n", | ||
" print(result_ray)\n", | ||
"\n", | ||
"except ValueError:\n", | ||
" print(\"UI should display failure.\")\n", | ||
"finally:\n", | ||
" dr_without_ray = driver.Builder().with_modules(ray_lineage).with_adapters(tracker).build()\n", | ||
" result_without_ray = dr_without_ray.execute(final_vars=[\"node_5s\", \"add_1_to_previous\"])\n", | ||
" print(result_without_ray) \n" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"language_info": { | ||
"name": "python" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import time | ||
|
||
|
||
def node_5s() -> float: | ||
start = time.time() | ||
time.sleep(5) | ||
return time.time() - start | ||
|
||
|
||
def add_1_to_previous(node_5s: float) -> float: | ||
start = time.time() | ||
time.sleep(1) | ||
return node_5s + (time.time() - start) | ||
|
||
|
||
def node_1s_error(node_5s: float) -> float: | ||
time.sleep(1) | ||
raise ValueError("Does not break telemetry if executed through ray") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
sf-hamilton[ray,sdk,ui] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import click | ||
import ray_lineage | ||
|
||
from hamilton import base, driver | ||
from hamilton.plugins.h_ray import RayGraphAdapter | ||
from hamilton_sdk import adapters | ||
|
||
|
||
@click.command() | ||
@click.option("--username", required=True, type=str) | ||
@click.option("--project_id", default=1, type=int) | ||
def run(project_id, username): | ||
try: | ||
tracker_ray = adapters.HamiltonTracker( | ||
project_id=project_id, | ||
username=username, | ||
dag_name="telemetry_with_ray", | ||
) | ||
rga = RayGraphAdapter(result_builder=base.PandasDataFrameResult()) | ||
dr_ray = driver.Builder().with_modules(ray_lineage).with_adapters(rga, tracker_ray).build() | ||
result_ray = dr_ray.execute( | ||
final_vars=[ | ||
"node_5s", | ||
"node_1s_error", | ||
"add_1_to_previous", | ||
] | ||
) | ||
print(result_ray) | ||
|
||
except ValueError: | ||
print("UI should display failure.") | ||
finally: | ||
tracker = adapters.HamiltonTracker( | ||
project_id=project_id, # modify this as needed | ||
username=username, | ||
dag_name="telemetry_without_ray", | ||
) | ||
dr_without_ray = driver.Builder().with_modules(ray_lineage).with_adapters(tracker).build() | ||
|
||
result_without_ray = dr_without_ray.execute(final_vars=["node_5s", "add_1_to_previous"]) | ||
print(result_without_ray) | ||
|
||
|
||
if __name__ == "__main__": | ||
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.