Skip to content

Commit ea76678

Browse files
garylvovglvov-bdai
andauthored
Clarifies Ray Documentation and Fixes Minor Issues (#1717)
# Description This PR cleans up the Ray documentation to be more clear, and fixes some small issues in the code. - Moved local set up to be an easier Docker-based thing - Added Wget to docker container to fix issue where Ray head would fail health check on GKE where the workers wouldn't start - Removed redundant information from Documentation - Added a local quickstart - (investigated whether https mlflow was possible, added flag to jinja env)@kellyguo11 - Added better compatibility with other workflows to address #1703 - Avoided early exit due to buffer overflow to address #1703 (thank you @giulioturrisi for helping find this) <!-- As a practice, it is recommended to open an issue to have discussions on the proposed pull request. This makes it easier for the community to keep track of what is being developed or added, and if a given feature is demanded by more than one party. --> ## Type of change <!-- As you go through the list, delete the ones that are not applicable. --> - Bug fix (non-breaking change which fixes an issue) - This change requires a documentation update ![image](https://github.com/user-attachments/assets/eb38b3c8-8e9c-438d-9218-8b0662146f96) ## Checklist - [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with `./isaaclab.sh --format` - [x] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have updated the changelog and the corresponding version in the extension's `config/extension.toml` file - [x] I have added my name to the `CONTRIBUTORS.md` or my name already exists there <!-- As you go through the checklist above, you can mark something as done by putting an x character in it For example, - [x] I have done this task - [ ] I have not done this task --> --------- Signed-off-by: garylvov <67614381+garylvov@users.noreply.github.com> Co-authored-by: Gary Lvov <glvov@theaiinstitute.com>
1 parent 21173c3 commit ea76678

File tree

12 files changed

+297
-230
lines changed

12 files changed

+297
-230
lines changed

docs/source/features/ray.rst

Lines changed: 167 additions & 145 deletions
Large diffs are not rendered by default.

source/standalone/workflows/ray/cluster_configs/Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
FROM isaac-lab-base:latest
22

3+
# WGet is needed so that GCS or other cloud providers can mark the container as ready.
4+
# Otherwise the Ray liveliness checks fail.
5+
RUN apt-get update && apt-get install wget
6+
37
# Set NVIDIA paths
48
ENV PATH="/usr/local/nvidia/bin:$PATH"
59
ENV LD_LIBRARY_PATH="/usr/local/nvidia/lib64"

source/standalone/workflows/ray/cluster_configs/google_cloud/kuberay.yaml.jinja

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ spec:
1919
block: "true"
2020
dashboard-host: 0.0.0.0
2121
dashboard-port: "8265"
22-
node-ip-address: "0.0.0.0"
2322
port: "6379"
2423
include-dashboard: "true"
2524
ray-debugger-external: "true"
@@ -30,7 +29,7 @@ spec:
3029
apiVersion: v1
3130
kind: Service
3231
metadata:
33-
name: head
32+
name: {{ name }}-head
3433
spec:
3534
type: LoadBalancer
3635
template:
@@ -130,7 +129,7 @@ spec:
130129
volumeMounts:
131130
- mountPath: /tmp/ray
132131
name: ray-logs
133-
command: ["/bin/bash", "-c", "ray start --address=head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
132+
command: ["/bin/bash", "-c", "ray start --address={{name}}-head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
134133
- image: fluent/fluent-bit:1.9.6
135134
name: fluentbit
136135
resources:

source/standalone/workflows/ray/grok_cluster_with_kubectl.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
2222
.. code-block:: bash
2323
24-
./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
24+
python3 source/standalone/workflows/ray/grok_cluster_with_kubectl.py
2525
# For options, supply -h arg
2626
"""
2727

@@ -67,9 +67,10 @@ def get_clusters(pods: list, cluster_name_prefix: str) -> set:
6767

6868
match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
6969
if match:
70-
# Get base name without head/worker suffix
71-
base_name = match.group(1).split("-head")[0].split("-worker")[0]
72-
clusters.add(base_name)
70+
# Get base name without head/worker suffix (skip workers)
71+
if "head" in pod_name:
72+
base_name = match.group(1).split("-head")[0]
73+
clusters.add(base_name)
7374
return sorted(clusters)
7475

7576

@@ -90,9 +91,7 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
9091
clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
9192
if len(clusters) > 1:
9293
raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")
93-
94-
base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
95-
mlflow_name = f"{base_name}-mlflow"
94+
mlflow_name = f"{cluster_prefix}-mlflow"
9695

9796
cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
9897
try:
@@ -102,7 +101,8 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
102101
# Get cluster IP
103102
cluster_ip = fields[2]
104103
port = "5000" # Default MLflow port
105-
104+
# This needs to be http to be resolved. HTTPS can't be resolved
105+
# This should be fine as it is on a subnet on the cluster regardless
106106
return f"http://{cluster_ip}:{port}"
107107
except subprocess.CalledProcessError as e:
108108
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string

source/standalone/workflows/ray/launch.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,28 @@
88
import subprocess
99
import yaml
1010

11+
import util
1112
from jinja2 import Environment, FileSystemLoader
1213
from kubernetes import config
1314

14-
import source.standalone.workflows.ray.util as util
15-
1615
"""This script helps create one or more KubeRay clusters.
1716
1817
Usage:
1918
2019
.. code-block:: bash
2120
# If the head node is stuck on container creating, make sure to create a secret
22-
./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h
21+
python3 source/standalone/workflows/ray/launch.py -h
2322
2423
# Examples
2524
2625
# The following creates 8 GPUx1 nvidia l4 workers
27-
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
26+
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
2827
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
2928
--num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1
3029
3130
# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
3231
# and 2 GPUx4 nvidia-tesla-t4 GPU workers
33-
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
32+
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
3433
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
3534
--num_workers 1 2 --num_clusters 1 \
3635
--worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
@@ -53,7 +52,7 @@ def apply_manifest(args: argparse.Namespace) -> None:
5352
# Set up Jinja2 environment for loading templates
5453
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
5554
file_loader = FileSystemLoader(str(templates_dir))
56-
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)
55+
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)
5756

5857
# Define template filename
5958
template_file = "kuberay.yaml.jinja"
@@ -79,6 +78,7 @@ def apply_manifest(args: argparse.Namespace) -> None:
7978

8079
# Apply the Kubernetes manifest using kubectl
8180
try:
81+
print(cleaned_yaml_string)
8282
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
8383
except subprocess.CalledProcessError as e:
8484
exit(f"An error occurred while running `kubectl`: {e}")

source/standalone/workflows/ray/submit_job.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@
4040
.. code-block:: bash
4141
4242
# Example; submitting a tuning job
43-
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
43+
python3 source/standalone/workflows/ray/submit_job.py \
4444
--aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
4545
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
46-
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>
46+
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI>
4747
4848
# Example: Submitting resource wrapped job
49-
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150
49+
python3 source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --test
5050
5151
# For all command line arguments
52-
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
52+
python3 source/standalone/workflows/ray/submit_job.py -h
5353
"""
5454
script_directory = os.path.dirname(os.path.abspath(__file__))
5555
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}

source/standalone/workflows/ray/tuner.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
"""
1818
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
1919
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
20-
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
21-
for each GPU-enabled node in the cluster for each individual job.
20+
By default, one worker is created for each GPU-enabled node in the cluster for each individual job.
21+
To use more than one worker per node (likely the case for multi-GPU machines), supply the
22+
num_workers_per_node argument.
2223
2324
Each hyperparameter sweep configuration should include the workflow,
2425
runner arguments, and hydra arguments to vary.
@@ -39,16 +40,15 @@
3940
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h
4041
4142
# Examples
42-
# Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
43+
# Local
4344
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
4445
--cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
45-
--cfg_class CartpoleRGBNoTuneJobCfg
46-
# Local docker: start the ray server and run above command in the same running container without run_mode arg
46+
--cfg_class CartpoleTheiaJobCfg
4747
# Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
4848
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
4949
--aggregate_jobs tuner.py \
5050
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
51-
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>
51+
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>
5252
5353
"""
5454

@@ -74,7 +74,7 @@ def setup(self, config: dict) -> None:
7474
print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
7575
self.experiment = None
7676

77-
def reset_config(self, new_config):
77+
def reset_config(self, new_config: dict):
7878
"""Allow environments to be re-used by fetching a new invocation command"""
7979
self.setup(new_config)
8080
return True
@@ -95,15 +95,15 @@ def step(self) -> dict:
9595
self.proc = experiment["proc"]
9696
self.experiment_name = experiment["experiment_name"]
9797
self.isaac_logdir = experiment["logdir"]
98-
self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
98+
self.tensorboard_logdir = self.isaac_logdir + "/" + self.experiment_name
9999
self.done = False
100100

101101
if self.proc is None:
102102
raise ValueError("Could not start trial.")
103-
104-
if self.proc.poll() is not None: # process finished, signal finish
103+
proc_status = self.proc.poll()
104+
if proc_status is not None: # process finished, signal finish
105105
self.data["done"] = True
106-
print("[INFO]: Process finished, returning...")
106+
print(f"[INFO]: Process finished with {proc_status}, returning...")
107107
else: # wait until the logs are ready or fresh
108108
data = util.load_tensorboard_logs(self.tensorboard_logdir)
109109

@@ -220,10 +220,24 @@ class JobCfg:
220220
"""To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
221221
at a minimum, the tune job should inherit from this class."""
222222

223-
def __init__(self, cfg):
223+
def __init__(self, cfg: dict):
224+
"""
225+
Runner args include command line arguments passed to the task.
226+
For example:
227+
cfg["runner_args"]["headless_singleton"] = "--headless"
228+
cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
229+
"""
224230
assert "runner_args" in cfg, "No runner arguments specified."
231+
"""
232+
Task is the desired task to train on. For example:
233+
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
234+
"""
225235
assert "--task" in cfg["runner_args"], "No task specified."
226-
assert "hydra_args" in cfg, "No hypeparameters specified."
236+
"""
237+
Hydra args define the hyperparameters varied within the sweep. For example:
238+
cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
239+
"""
240+
assert "hydra_args" in cfg, "No hyperparameters specified."
227241
self.cfg = cfg
228242

229243

source/standalone/workflows/ray/util.py

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,42 @@
66
import os
77
import re
88
import subprocess
9+
import threading
910
from datetime import datetime
1011
from math import isclose
1112

1213
import ray
14+
from tensorboard.backend.event_processing.directory_watcher import DirectoryDeletedError
1315
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
1416

1517

1618
def load_tensorboard_logs(directory: str) -> dict:
17-
"""From a tensorboard directory, get the latest scalar values.
19+
"""From a tensorboard directory, get the latest scalar values. If the logs can't be
20+
found, check the summaries sublevel.
1821
1922
Args:
2023
directory: The directory of the tensorboard logging.
2124
2225
Returns:
2326
The latest available scalar values.
2427
"""
28+
2529
# Initialize the event accumulator with a size guidance for only the latest entry
26-
size_guidance = {"scalars": 1} # Load only the latest entry for scalars
27-
event_acc = EventAccumulator(directory, size_guidance=size_guidance)
28-
event_acc.Reload() # Load all data from the directory
30+
def get_latest_scalars(path: str) -> dict:
31+
event_acc = EventAccumulator(path, size_guidance={"scalars": 1})
32+
try:
33+
event_acc.Reload()
34+
if event_acc.Tags()["scalars"]:
35+
return {
36+
tag: event_acc.Scalars(tag)[-1].value
37+
for tag in event_acc.Tags()["scalars"]
38+
if event_acc.Scalars(tag)
39+
}
40+
except (KeyError, OSError, RuntimeError, DirectoryDeletedError):
41+
return {}
2942

30-
# Extract the latest scalars logged
31-
latest_scalars = {}
32-
for tag in event_acc.Tags()["scalars"]:
33-
events = event_acc.Scalars(tag)
34-
if events: # Check if there is at least one entry
35-
latest_event = events[-1] # Get the latest entry
36-
latest_scalars[tag] = latest_event.value
37-
return latest_scalars
43+
scalars = get_latest_scalars(directory)
44+
return scalars or get_latest_scalars(os.path.join(directory, "summaries"))
3845

3946

4047
def get_invocation_command_from_cfg(
@@ -190,47 +197,62 @@ def execute_job(
190197
experiment_info_pattern = re.compile("Exact experiment name requested from command line: (.+)")
191198
logdir_pattern = re.compile(r"\[INFO\] Logging experiment in directory: (.+)$")
192199
err_pattern = re.compile("There was an error (.+)$")
193-
with process.stdout as stdout:
194-
for line in iter(stdout.readline, ""):
200+
201+
def stream_reader(stream, identifier_string, result_details):
202+
for line in iter(stream.readline, ""):
195203
line = line.strip()
196-
result_details.append(f"{identifier_string}: {line} \n")
204+
result_details.append(f"{identifier_string}: {line}\n")
197205
if log_all_output:
198206
print(f"{identifier_string}: {line}")
199207

200-
if extract_experiment:
201-
exp_match = experiment_info_pattern.search(line)
202-
log_match = logdir_pattern.search(line)
203-
err_match = err_pattern.search(line)
204-
if err_match:
205-
raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}")
206-
207-
if exp_match:
208-
experiment_name = exp_match.group(1)
209-
if log_match:
210-
logdir = log_match.group(1)
211-
212-
if experiment_name and logdir:
213-
result = {
214-
"experiment_name": experiment_name,
215-
"logdir": logdir,
216-
"proc": process,
217-
"result": " ".join(result_details),
218-
}
219-
return result
220-
221-
with process.stderr as stderr:
222-
for line in iter(stderr.readline, ""):
223-
line = line.strip()
224-
result_details.append(f"{identifier_string}: {line}")
208+
# Read stdout until we find experiment info
209+
# Do some careful handling prevent overflowing the pipe reading buffer with error 141
210+
for line in iter(process.stdout.readline, ""):
211+
line = line.strip()
212+
result_details.append(f"{identifier_string}: {line} \n")
213+
if log_all_output:
225214
print(f"{identifier_string}: {line}")
226215

227-
process.wait() # Wait for the subprocess to finish naturally if not exited early
228-
229-
now = datetime.now().strftime("%H:%M:%S.%f")
230-
completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n"
231-
print(completion_info)
232-
result_details.append(completion_info)
233-
return " ".join(result_details)
216+
if extract_experiment:
217+
exp_match = experiment_info_pattern.search(line)
218+
log_match = logdir_pattern.search(line)
219+
err_match = err_pattern.search(line)
220+
221+
if err_match:
222+
raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}")
223+
224+
if exp_match:
225+
experiment_name = exp_match.group(1)
226+
if log_match:
227+
logdir = log_match.group(1)
228+
229+
if experiment_name and logdir:
230+
# Start stderr reader after finding experiment info
231+
stderr_thread = threading.Thread(
232+
target=stream_reader, args=(process.stderr, identifier_string, result_details)
233+
)
234+
stderr_thread.daemon = True
235+
stderr_thread.start()
236+
237+
# Start stdout reader to continue reading to flush buffer
238+
stdout_thread = threading.Thread(
239+
target=stream_reader, args=(process.stdout, identifier_string, result_details)
240+
)
241+
stdout_thread.daemon = True
242+
stdout_thread.start()
243+
244+
return {
245+
"experiment_name": experiment_name,
246+
"logdir": logdir,
247+
"proc": process,
248+
"result": " ".join(result_details),
249+
}
250+
process.wait()
251+
now = datetime.now().strftime("%H:%M:%S.%f")
252+
completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n"
253+
print(completion_info)
254+
result_details.append(completion_info)
255+
return " ".join(result_details)
234256

235257

236258
def get_gpu_node_resources(

0 commit comments

Comments
 (0)