Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddtrace/contrib/internal/ray/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
RAY_ACTOR_ID = "ray.actor_id"
RAY_SUBMISSION_ID_TAG = "ray.submission_id"
RAY_HOSTNAME = "ray.hostname"
RAY_ENTRYPOINT = "ray.entrypoint"
RAY_ENTRYPOINT_SCRIPT = "ray.entrypoint_script"

# Default job name if not set by the user
DEFAULT_JOB_NAME = "unnamed.ray.job"
Expand Down Expand Up @@ -39,6 +41,7 @@
RAY_WAIT_TIMEOUT = "ray.wait.timeout_s"
RAY_WAIT_NUM_RETURNS = "ray.wait.num_returns"
RAY_WAIT_FETCH_LOCAL = "ray.wait.fetch_local"
RAY_METADATA_PREFIX = "ray.job.metadata"

# Error tag names
ERROR_MESSAGE = "error.message"
Expand Down
27 changes: 20 additions & 7 deletions ddtrace/contrib/internal/ray/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from .constants import DEFAULT_JOB_NAME
from .constants import RAY_ACTOR_METHOD_ARGS
from .constants import RAY_ACTOR_METHOD_KWARGS
from .constants import RAY_ENTRYPOINT
from .constants import RAY_ENTRYPOINT_SCRIPT
from .constants import RAY_JOB_NAME
from .constants import RAY_JOB_STATUS
from .constants import RAY_JOB_SUBMIT_STATUS
Expand All @@ -58,7 +60,7 @@
from .utils import _inject_ray_span_tags_and_metrics
from .utils import extract_signature
from .utils import get_dd_job_name_from_entrypoint
from .utils import get_dd_job_name_from_submission_id
from .utils import json_to_dot_paths
from .utils import set_tag_or_truncate


Expand Down Expand Up @@ -190,17 +192,28 @@ def traced_submit_job(wrapped, instance, args, kwargs):
submission_id = kwargs.get("submission_id") or generate_job_id()
kwargs["submission_id"] = submission_id
entrypoint = kwargs.get("entrypoint", "")
job_name = (
config.service
or kwargs.get("metadata", {}).get("job_name", "")
or get_dd_job_name_from_submission_id(submission_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with completely removing that ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dubloom yes, killing this code was my request. I figured that --submission_id "job:foo,run:3" was a hacky way to specify the job name, and in all the conversations I've had about job names, I was always the only person that pushed for this. The more I think about this, the more I fear that no customer is going to use this mechanism. Between DD_SERVICE, the entrypoint, and metadata_json we already have three ways to specify the job name, I figured a fourth one was an unnecessary overkill. Let us know if you do not concur.

or get_dd_job_name_from_entrypoint(entrypoint)
)
job_name = config.service or kwargs.get("metadata", {}).get("job_name", "")

if not job_name:
if os.environ.get("DD_RAY_USE_ENTRYPOINT_AS_JOB_NAME", "false").lower() in ("true", "1"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added as a config in the integration and be documented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will also be easier to check rather than checking if is in ("true", "1")

job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME
else:
job_name = DEFAULT_JOB_NAME

# Root span creation
job_span = tracer.start_span("ray.job", service=job_name or DEFAULT_JOB_NAME, span_type=SpanTypes.RAY)
_inject_ray_span_tags_and_metrics(job_span)
job_span.set_tag_str(RAY_SUBMISSION_ID_TAG, submission_id)
if entrypoint:
job_span.set_tag_str(RAY_ENTRYPOINT, entrypoint)
entrypoint_script = get_dd_job_name_from_entrypoint(entrypoint)
if entrypoint_script:
job_span.set_tag_str(RAY_ENTRYPOINT_SCRIPT, entrypoint_script)
Comment on lines +208 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two point on that:

  • I think it is redundant (I tested and one tag is python .telemetry-tests/playground.py and one is playground)
  • I think we should be careful with sending paths without redaction which could be considered as PII or might be super long

metadata = kwargs.get("metadata", {})
dot_pairs = json_to_dot_paths(metadata)
for k, v in dot_pairs.items():
set_tag_or_truncate(job_span, k, v)

tracer.context_provider.activate(job_span)
start_long_running_job(job_span)

Expand Down
64 changes: 53 additions & 11 deletions ddtrace/contrib/internal/ray/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
from inspect import Parameter
from inspect import Signature
import json
import os
import re
import socket
Expand All @@ -24,6 +25,7 @@
from .constants import RAY_COMPONENT
from .constants import RAY_HOSTNAME
from .constants import RAY_JOB_ID
from .constants import RAY_METADATA_PREFIX
from .constants import RAY_NODE_ID
from .constants import RAY_SUBMISSION_ID
from .constants import RAY_SUBMISSION_ID_TAG
Expand Down Expand Up @@ -121,26 +123,66 @@ def set_tag_or_truncate(span, tag_name, tag_value):
span.set_tag(tag_name, tag_value)


def get_dd_job_name_from_submission_id(submission_id: str):
def get_dd_job_name_from_entrypoint(entrypoint: str):
"""
Get the job name from the submission id.
If the submission id is set but not in a job:test,run:3 format, return the default job name.
If the submission id is not set, return None.
Get the job name from the entrypoint.
"""
match = JOB_NAME_REGEX.match(submission_id)
match = ENTRY_POINT_REGEX.search(entrypoint)
if match:
return match.group(1)
return None


def get_dd_job_name_from_entrypoint(entrypoint: str):
def json_to_dot_paths(data):
"""
Get the job name from the entrypoint.
Converts a JSON (or Python dictionary) structure into a dict mapping
dot-notation paths to leaf values, with keys prefixed once by RAY_METADATA_PREFIX.

- Assumes the top-level is a dictionary. If a list is encountered anywhere,
it is stringified with json.dumps and treated as a leaf (no recursion into list elements).
- Leaf values (str, int, float, bool, None) are returned as-is as the dict values.
- If the top-level value is a simple primitive (str, int, float, bool, None),
it is returned unchanged. If the top-level is a list, it is stringified.
- Returned dict keys are prefixed once with RAY_METADATA_PREFIX.
"""
match = ENTRY_POINT_REGEX.search(entrypoint)
if match:
return match.group(1)
return None

# If top-level is a list, stringify and return
if isinstance(data, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen ? Quoting ray doc:

JSON-serialized dictionary of metadata to attach to the job.
(https://docs.ray.io/en/latest/cluster/running-applications/job-submission/cli.html)

try:
return json.dumps(data, ensure_ascii=False)
except Exception:
return "[]"

flat = {}

def _recurse(node, path):
if isinstance(node, dict):
for key, value in node.items():
new_path = f"{path}.{key}" if path else key
_recurse(value, new_path)
elif isinstance(node, list):
# Treat any list as a leaf by stringifying it
try:
list_dump = json.dumps(node, ensure_ascii=False)
except Exception:
list_dump = "[]"
flat[path] = list_dump
else:
# leaf node: store the accumulated path -> value
flat[path] = node

_recurse(data, "")

# Prefix keys once with RAY_METADATA_PREFIX
result = {}
for k, v in flat.items():
if k:
result[f"{RAY_METADATA_PREFIX}.{k}"] = v
else:
# If an empty key (top-level primitive under a dict), attach prefix alone
result[RAY_METADATA_PREFIX] = v

return result
Comment on lines +174 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use _recurse(data, RAY_METADATA_PREFIX) it would auto work I think without additional work.



# -------------------------------------------------------------------------------------------
Expand Down
25 changes: 16 additions & 9 deletions tests/contrib/ray/test_ray_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
from ddtrace.contrib.internal.ray.constants import RAY_METADATA_PREFIX
from ddtrace.contrib.internal.ray.utils import get_dd_job_name_from_entrypoint
from ddtrace.contrib.internal.ray.utils import get_dd_job_name_from_submission_id


def test_get_dd_job_name_from_submission_id():
assert (
get_dd_job_name_from_submission_id("job:frobnitzigate_idiosyncrasies,run:38") == "frobnitzigate_idiosyncrasies"
)
assert get_dd_job_name_from_submission_id("joe.schmoe-cf32445c3b2842958956ba6b6225ad") is None
assert get_dd_job_name_from_submission_id("") is None
from ddtrace.contrib.internal.ray.utils import json_to_dot_paths


def test_get_dd_job_name_from_entrypoint():
Expand All @@ -16,3 +9,17 @@ def test_get_dd_job_name_from_entrypoint():
assert get_dd_job_name_from_entrypoint("/Users/bits/.pyenv/shims/python3 woof.py") == "woof"
assert get_dd_job_name_from_entrypoint("python3 woof.py --breed mutt") == "woof"
assert get_dd_job_name_from_entrypoint("perl meow.pl") is None


def test_json_to_dot_paths():
assert json_to_dot_paths({"a": {"b": {"c": 1}}}) == {f"{RAY_METADATA_PREFIX}.a.b.c": 1}
assert json_to_dot_paths({"a": [1, 2, 3]}) == {f"{RAY_METADATA_PREFIX}.a": "[1, 2, 3]"}
assert json_to_dot_paths({"a": {"b": 1}, "c": 2}) == {
f"{RAY_METADATA_PREFIX}.a.b": 1,
f"{RAY_METADATA_PREFIX}.c": 2,
}
assert json_to_dot_paths({"a": {"b": {"c": 1, "d": [1, 2]}}}) == {
f"{RAY_METADATA_PREFIX}.a.b.c": 1,
f"{RAY_METADATA_PREFIX}.a.b.d": "[1, 2]",
}
assert json_to_dot_paths(1) == {f"{RAY_METADATA_PREFIX}": 1}
Loading