diff --git a/ddtrace/contrib/internal/ray/__init__.py b/ddtrace/contrib/internal/ray/__init__.py index 144f026aab5..292aaa7fd00 100644 --- a/ddtrace/contrib/internal/ray/__init__.py +++ b/ddtrace/contrib/internal/ray/__init__.py @@ -36,6 +36,14 @@ - ``DD_TRACE_EXPERIMENTAL_LONG_RUNNING_FLUSH_INTERVAL``: Interval for resubmitting long-running spans (default: ``120.0`` seconds) +- ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME``: Whether to use the job entrypoint as the + service name (default: ``False``). If ``True``, the entrypoint will be used as the service + name if DD_SERVICE is not set and a job name is not specified in the metadata. + +- ``DD_TRACE_RAY_REDACT_ENTRYPOINT_PATHS``: Whether to redact file paths in the job entrypoint + (default: ``True``). If ``True``, file paths in the entrypoint will be redacted to avoid + leaking sensitive information. + Ray service name can be configured by: - specifying in submission ID using ``job:your-job-name`` during job submission:: @@ -48,7 +56,10 @@ - specifying ``DD_SERVICE`` when initializing your Ray cluster. -By default, the service name will be the name of your entrypoint +- setting ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME=True``. In this case, the service + name will be the name of your entrypoint script. + +By default, the service name will be ``unnamed.ray.job``. Notes ~~~~~ diff --git a/ddtrace/contrib/internal/ray/constants.py b/ddtrace/contrib/internal/ray/constants.py index 47a7e9cdcfb..4a7196ff2d7 100644 --- a/ddtrace/contrib/internal/ray/constants.py +++ b/ddtrace/contrib/internal/ray/constants.py @@ -12,6 +12,7 @@ RAY_ACTOR_ID = "ray.actor_id" RAY_SUBMISSION_ID_TAG = "ray.submission_id" RAY_HOSTNAME = "ray.hostname" +RAY_ENTRYPOINT = "ray.entrypoint" # Default job name if not set by the user DEFAULT_JOB_NAME = "unnamed.ray.job" @@ -39,6 +40,7 @@ RAY_WAIT_TIMEOUT = "ray.wait.timeout_s" RAY_WAIT_NUM_RETURNS = "ray.wait.num_returns" RAY_WAIT_FETCH_LOCAL = "ray.wait.fetch_local" +RAY_METADATA_PREFIX = "ray.job.metadata" # Long-running span metric names DD_PARTIAL_VERSION = "_dd.partial_version" @@ -46,3 +48,4 @@ # Special values REDACTED_VALUE = "" +REDACTED_PATH = "" diff --git a/ddtrace/contrib/internal/ray/patch.py b/ddtrace/contrib/internal/ray/patch.py index c2da7784095..e65ef3c6106 100644 --- a/ddtrace/contrib/internal/ray/patch.py +++ b/ddtrace/contrib/internal/ray/patch.py @@ -27,6 +27,7 @@ from .constants import DEFAULT_JOB_NAME from .constants import RAY_ACTOR_METHOD_ARGS from .constants import RAY_ACTOR_METHOD_KWARGS +from .constants import RAY_ENTRYPOINT from .constants import RAY_JOB_NAME from .constants import RAY_JOB_STATUS from .constants import RAY_JOB_SUBMIT_STATUS @@ -52,8 +53,9 @@ from .utils import _inject_dd_trace_ctx_kwarg from .utils import _inject_ray_span_tags_and_metrics from .utils import extract_signature +from .utils import flatten_metadata_dict from .utils import get_dd_job_name_from_entrypoint -from .utils import get_dd_job_name_from_submission_id +from .utils import redact_paths from .utils import set_tag_or_truncate @@ -72,6 +74,8 @@ "ray", dict( _default_service=schematize_service_name("ray"), + use_entrypoint_as_service_name=asbool(os.getenv("DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME", default=False)), + redact_entrypoint_paths=asbool(os.getenv("DD_TRACE_RAY_REDACT_ENTRYPOINT_PATHS", default=True)), trace_core_api=_get_config("DD_TRACE_RAY_CORE_API", default=False, modifier=asbool), trace_args_kwargs=_get_config("DD_TRACE_RAY_ARGS_KWARGS", default=False, modifier=asbool), ), @@ -190,17 +194,28 @@ def traced_submit_job(wrapped, instance, args, kwargs): submission_id = kwargs.get("submission_id") or generate_job_id() kwargs["submission_id"] = submission_id entrypoint = kwargs.get("entrypoint", "") - job_name = ( - config.service - or kwargs.get("metadata", {}).get("job_name", "") - or get_dd_job_name_from_submission_id(submission_id) - or get_dd_job_name_from_entrypoint(entrypoint) - ) + if entrypoint and config.ray.redact_entrypoint_paths: + entrypoint = redact_paths(entrypoint) + job_name = config.service or kwargs.get("metadata", {}).get("job_name", "") + + if not job_name: + if config.ray.use_entrypoint_as_service_name: + job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME + else: + job_name = DEFAULT_JOB_NAME # Root span creation job_span = tracer.start_span("ray.job", service=job_name or DEFAULT_JOB_NAME, span_type=SpanTypes.RAY) _inject_ray_span_tags_and_metrics(job_span) job_span.set_tag_str(RAY_SUBMISSION_ID_TAG, submission_id) + if entrypoint: + job_span.set_tag_str(RAY_ENTRYPOINT, entrypoint) + + metadata = kwargs.get("metadata", {}) + dot_paths = flatten_metadata_dict(metadata) + for k, v in dot_paths.items(): + set_tag_or_truncate(job_span, k, v) + tracer.context_provider.activate(job_span) start_long_running_job(job_span) diff --git a/ddtrace/contrib/internal/ray/utils.py b/ddtrace/contrib/internal/ray/utils.py index 7afc32772c3..d42225adaae 100644 --- a/ddtrace/contrib/internal/ray/utils.py +++ b/ddtrace/contrib/internal/ray/utils.py @@ -1,6 +1,7 @@ import inspect from inspect import Parameter from inspect import Signature +import json import os import re import socket @@ -28,11 +29,13 @@ from .constants import RAY_COMPONENT from .constants import RAY_HOSTNAME from .constants import RAY_JOB_ID +from .constants import RAY_METADATA_PREFIX from .constants import RAY_NODE_ID from .constants import RAY_SUBMISSION_ID from .constants import RAY_SUBMISSION_ID_TAG from .constants import RAY_TASK_ID from .constants import RAY_WORKER_ID +from .constants import REDACTED_PATH from .constants import REDACTED_VALUE @@ -125,26 +128,98 @@ def set_tag_or_truncate(span: Span, tag_name: str, tag_value: Any = None) -> Non span.set_tag(tag_name, tag_value) -def get_dd_job_name_from_submission_id(submission_id: str) -> Optional[str]: +def get_dd_job_name_from_entrypoint(entrypoint: str): """ - Get the job name from the submission id. - If the submission id is set but not in a job:test,run:3 format, return the default job name. - If the submission id is not set, return None. + Get the job name from the entrypoint. """ - match = JOB_NAME_REGEX.match(submission_id) + match = ENTRY_POINT_REGEX.search(entrypoint) if match: return match.group(1) return None -def get_dd_job_name_from_entrypoint(entrypoint: str) -> Optional[str]: +def redact_paths(s: str) -> str: """ - Get the job name from the entrypoint. + Redact path-like substrings from an entry-point string. + Uses os.sep (and os.altsep if present) to detect paths; preserves spacing. """ - match = ENTRY_POINT_REGEX.search(entrypoint) - if match: - return match.group(1) - return None + + def _redact_pathlike(s): + """ + If s contains a path separator, replace the directory part with REDACTION, + preserving the final component (basename). Trailing separators are ignored. + Detects both os.sep and os.altsep if present. + """ + + # Pick the actual separator used in this token (prefer os.sep if both appear) + used_sep = os.sep if (os.sep in s) else (os.altsep if (os.altsep and os.altsep in s) else None) + if not used_sep: + return s + + core = s.rstrip(used_sep) + if not core: + return REDACTED_PATH + + basename = core.split(used_sep)[-1] + return f"{REDACTED_PATH}{used_sep}{basename}" + + def _redact_token(tok) -> str: + # key=value (value may be quoted) + if "=" in tok: + key, val = tok.split("=", 1) + if len(val) >= 2 and val[0] == val[-1] and val[0] in {"'", '"'}: + q = val[0] + inner = val[1:-1] + return f"{key}={q}{_redact_pathlike(inner)}{q}" + return f"{key}={_redact_pathlike(val)}" + + # Whole token may be quoted + if len(tok) >= 2 and tok[0] == tok[-1] and tok[0] in {"'", '"'}: + q = tok[0] + inner = tok[1:-1] + return f"{q}{_redact_pathlike(inner)}{q}" + + return _redact_pathlike(tok) + + parts = re.split(r"(\s+)", s) # keep whitespace + return "".join(part if part.strip() == "" else _redact_token(part) for part in parts) + + +def flatten_metadata_dict(data: dict) -> Dict[str, Any]: + """ + Converts a JSON (or Python dictionary) structure into a dict mapping + dot-notation paths to leaf values, with keys prefixed once by RAY_METADATA_PREFIX. + + - Assumes the top-level is a dictionary. If a list is encountered anywhere, + it is stringified with json.dumps and treated as a leaf (no recursion into list elements). + - Leaf values (str, int, float, bool, None) are returned as-is as the dict values. + - Returned dict keys are prefixed once with RAY_METADATA_PREFIX. + """ + + if not isinstance(data, dict): + return {} + + result = {} + + def _recurse(node, path): + if isinstance(node, dict): + for key, value in node.items(): + new_path = f"{path}.{key}" if path else key + _recurse(value, new_path) + elif isinstance(node, list): + # Treat any list as a leaf by stringifying it + try: + list_dump = json.dumps(node, ensure_ascii=False) + except Exception: + list_dump = "[]" + result[path] = list_dump + else: + # leaf node: store the accumulated path -> value + result[path] = node + + _recurse(data, RAY_METADATA_PREFIX) + + return result # ------------------------------------------------------------------------------------------- diff --git a/tests/contrib/ray/test_ray_utils.py b/tests/contrib/ray/test_ray_utils.py index 1dc2c30de88..e934ac0ed88 100644 --- a/tests/contrib/ray/test_ray_utils.py +++ b/tests/contrib/ray/test_ray_utils.py @@ -1,13 +1,8 @@ +from ddtrace.contrib.internal.ray.constants import RAY_METADATA_PREFIX +from ddtrace.contrib.internal.ray.constants import REDACTED_PATH +from ddtrace.contrib.internal.ray.utils import flatten_metadata_dict from ddtrace.contrib.internal.ray.utils import get_dd_job_name_from_entrypoint -from ddtrace.contrib.internal.ray.utils import get_dd_job_name_from_submission_id - - -def test_get_dd_job_name_from_submission_id(): - assert ( - get_dd_job_name_from_submission_id("job:frobnitzigate_idiosyncrasies,run:38") == "frobnitzigate_idiosyncrasies" - ) - assert get_dd_job_name_from_submission_id("joe.schmoe-cf32445c3b2842958956ba6b6225ad") is None - assert get_dd_job_name_from_submission_id("") is None +from ddtrace.contrib.internal.ray.utils import redact_paths def test_get_dd_job_name_from_entrypoint(): @@ -16,3 +11,52 @@ def test_get_dd_job_name_from_entrypoint(): assert get_dd_job_name_from_entrypoint("/Users/bits/.pyenv/shims/python3 woof.py") == "woof" assert get_dd_job_name_from_entrypoint("python3 woof.py --breed mutt") == "woof" assert get_dd_job_name_from_entrypoint("perl meow.pl") is None + + +def test_redact_paths(): + assert redact_paths("") == "" + assert redact_paths("my_script.py") == "my_script.py" + assert redact_paths("python my_script.py") == "python my_script.py" + assert redact_paths("python my_script.py arg1") == "python my_script.py arg1" + assert redact_paths("python my_script.py arg1 --kwarg1 value1") == "python my_script.py arg1 --kwarg1 value1" + assert redact_paths("python path/to/my_script.py") == f"python {REDACTED_PATH}/my_script.py" + assert redact_paths("python /path/to/my_script.py") == f"python {REDACTED_PATH}/my_script.py" + assert ( + redact_paths("path1/to1/python path2/to2/my_script.py") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py" + ) + assert ( + redact_paths("/path1/to1/python /path2/to2/my_script.py") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py" + ) + assert ( + redact_paths("/path1/to1/python /path2/to2/my_script.py /pathlike/arg1") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py {REDACTED_PATH}/arg1" + ) + assert ( + redact_paths("/path1/to1/python /path2/to2/my_script.py /pathlike/arg1 --kwarg1 /pathlike/value1") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py {REDACTED_PATH}/arg1 --kwarg1 {REDACTED_PATH}/value1" + ) + assert ( + redact_paths("/path1/to1/python /path2/to2/my_script.py /pathlike/arg1 --kwarg1=/pathlike/value1") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py {REDACTED_PATH}/arg1 --kwarg1={REDACTED_PATH}/value1" + ) + assert ( + redact_paths("/path1/to1/python /path2/to2/my_script.py /pathlike/arg1 --kwarg1='/pathlike/value1'") + == f"{REDACTED_PATH}/python {REDACTED_PATH}/my_script.py {REDACTED_PATH}/arg1 --kwarg1='{REDACTED_PATH}/value1'" + ) + + +def test_flatten_metadata_dict(): + assert flatten_metadata_dict({"a": {"b": {"c": 1}}}) == {f"{RAY_METADATA_PREFIX}.a.b.c": 1} + assert flatten_metadata_dict({"a": [1, 2, 3]}) == {f"{RAY_METADATA_PREFIX}.a": "[1, 2, 3]"} + assert flatten_metadata_dict({"a": {"b": 1}, "c": 2}) == { + f"{RAY_METADATA_PREFIX}.a.b": 1, + f"{RAY_METADATA_PREFIX}.c": 2, + } + assert flatten_metadata_dict({"a": {"b": {"c": 1, "d": [1, 2]}}}) == { + f"{RAY_METADATA_PREFIX}.a.b.c": 1, + f"{RAY_METADATA_PREFIX}.a.b.d": "[1, 2]", + } + assert flatten_metadata_dict(1) == {} + assert flatten_metadata_dict([1, 2, 3]) == {}