From 4e13a609b25d07e835d915ff107455562478a7f7 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 22 Sep 2025 18:00:17 -0400 Subject: [PATCH 1/5] Replace type hints with modern type annotations --- ddtrace/profiling/collector/_lock.py | 93 ++++++++++++---------------- 1 file changed, 41 insertions(+), 52 deletions(-) diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 5a82ebd27c5..137a0766257 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -6,7 +6,13 @@ import sys import time import types -import typing +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Type import wrapt @@ -15,14 +21,12 @@ from ddtrace.profiling import collector from ddtrace.profiling.collector import _task from ddtrace.profiling.collector import _traceback +from ddtrace.profiling.event import DDFrame from ddtrace.settings.profiling import config from ddtrace.trace import Tracer -T = typing.TypeVar("T") - - -def _current_thread() -> typing.Tuple[int, str]: +def _current_thread() -> Tuple[int, str]: thread_id = _thread.get_ident() return thread_id, _threading.get_thread_name(thread_id) @@ -44,8 +48,8 @@ def _current_thread() -> typing.Tuple[int, str]: class _ProfiledLock(wrapt.ObjectProxy): def __init__( self, - wrapped: typing.Any, - tracer: typing.Optional[Tracer], + wrapped: Any, + tracer: Optional[Tracer], max_nframes: int, capture_sampler: collector.CaptureSampler, endpoint_collection_enabled: bool, @@ -58,15 +62,15 @@ def __init__( frame = sys._getframe(2 if WRAPT_C_EXT else 3) code = frame.f_code self._self_init_loc = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno) - self._self_name: typing.Optional[str] = None + self._self_name: Optional[str] = None - def __aenter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def __aenter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs) - def __aexit__(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def __aexit__(self, *args: Any, **kwargs: Any) -> Any: return self._release(self.__wrapped__.__aexit__, *args, **kwargs) - def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any) -> T: + def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: if not self._self_capture_sampler.capture(): return inner_func(*args, **kwargs) @@ -88,6 +92,7 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa else: frame = task_frame + frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) thread_native_id = _threading.get_thread_native_id(thread_id) @@ -102,20 +107,16 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) - - for f in frames: - handle.push_frame(f.function_name, f.file_name, 0, f.lineno) - + for ddframe in frames: + handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() except Exception: pass # nosec - def acquire(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def acquire(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.acquire, *args, **kwargs) - def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.Any, **kwargs: typing.Any) -> None: - # type (typing.Any, typing.Any) -> None - + def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: # The underlying threading.Lock class is implemented using C code, and # it doesn't have the __dict__ attribute. So we can't do # self.__dict__.pop("_self_acquired_at", None) to remove the attribute. @@ -148,6 +149,7 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A else: frame = task_frame + frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) thread_native_id = _threading.get_thread_native_id(thread_id) @@ -162,23 +164,22 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) - - for f in frames: - handle.push_frame(f.function_name, f.file_name, 0, f.lineno) + for ddframe in frames: + handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() - def release(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def release(self, *args: Any, **kwargs: Any) -> Any: return self._release(self.__wrapped__.release, *args, **kwargs) acquire_lock = acquire - def __enter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def __enter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__enter__, *args, **kwargs) - def __exit__(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def __exit__(self, *args: Any, **kwargs: Any) -> None: self._release(self.__wrapped__.__exit__, *args, **kwargs) - def _find_self_name(self, var_dict: typing.Dict) -> typing.Optional[str]: + def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]: for name, value in var_dict.items(): if name.startswith("__") or isinstance(value, types.ModuleType): continue @@ -228,11 +229,7 @@ class FunctionWrapper(wrapt.FunctionWrapper): # Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static" # method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a # builtin function. Override default wrapt behavior here that tries to detect bound method. - def __get__( - self, - instance: typing.Optional[typing.Any], - owner: typing.Optional[typing.Any] = None, - ) -> "FunctionWrapper": + def __get__(self, instance: Any, owner: Optional[Type] = None) -> "FunctionWrapper": return self @@ -245,26 +242,23 @@ def __init__( self, nframes: int = config.max_frames, endpoint_collection_enabled: bool = config.endpoint_collection, - tracer: typing.Optional[Tracer] = None, - *args: typing.Any, - **kwargs: typing.Any, + tracer: Optional[Tracer] = None, + *args: Any, + **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) - self.nframes = nframes - self.endpoint_collection_enabled = endpoint_collection_enabled - self.tracer = tracer - self._original: typing.Optional[typing.Any] = None + self.nframes: int = nframes + self.endpoint_collection_enabled: bool = endpoint_collection_enabled + self.tracer: Optional[Tracer] = tracer + self._original: Optional[Any] = None @abc.abstractmethod - def _get_patch_target(self) -> typing.Type[typing.Any]: - raise NotImplementedError + def _get_patch_target(self) -> Callable[..., Any]: + ... @abc.abstractmethod - def _set_patch_target( - self, - value: typing.Any, - ) -> None: - raise NotImplementedError + def _set_patch_target(self, value: Any) -> None: + ... def _start_service(self) -> None: """Start collecting lock usage.""" @@ -282,14 +276,9 @@ def patch(self) -> None: # Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile. self._original = self._get_patch_target() - def _allocate_lock( - wrapped: typing.Any, - instance: typing.Any, - args: typing.Tuple[typing.Any, ...], - kwargs: typing.Dict[str, typing.Any], - ) -> typing.Any: + def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock: lock = wrapped(*args, **kwargs) - return self.PROFILED_LOCK_CLASS( + return self.PROFILED_LOCK_CLASS( # type: ignore[attr-defined] lock, self.tracer, self.nframes, From 6ac66a169b8e1d7c4524969b9e5a46f0d7024bf3 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Mon, 29 Sep 2025 16:49:47 -0400 Subject: [PATCH 2/5] [dd-trace-py][Lock profiler] type-annotate variables (global, local, class members) --- ddtrace/profiling/collector/_lock.py | 81 +++++++++++++++++++--------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 137a0766257..c0472bb3791 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -6,13 +6,15 @@ import sys import time import types -from typing import Any -from typing import Callable -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple -from typing import Type +from typing import ( # noqa: I001 + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Type, +) import wrapt @@ -27,12 +29,13 @@ def _current_thread() -> Tuple[int, str]: - thread_id = _thread.get_ident() + thread_id: int = _thread.get_ident() return thread_id, _threading.get_thread_name(thread_id) # We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will # appear in the stack trace and we need to hide it. +WRAPT_C_EXT: bool if os.environ.get("WRAPT_DISABLE_EXTENSIONS"): WRAPT_C_EXT = False else: @@ -46,6 +49,14 @@ def _current_thread() -> Tuple[int, str]: class _ProfiledLock(wrapt.ObjectProxy): + _self_tracer: Optional[Tracer] + _self_max_nframes: int + _self_capture_sampler: collector.CaptureSampler + _self_endpoint_collection_enabled: bool + _self_init_loc: str + _self_name: Optional[str] + _self_acquired_at: int # monotonic_ns timestamp + def __init__( self, wrapped: Any, @@ -59,10 +70,10 @@ def __init__( self._self_max_nframes = max_nframes self._self_capture_sampler = capture_sampler self._self_endpoint_collection_enabled = endpoint_collection_enabled - frame = sys._getframe(2 if WRAPT_C_EXT else 3) - code = frame.f_code + frame: types.FrameType = sys._getframe(2 if WRAPT_C_EXT else 3) + code: types.CodeType = frame.f_code self._self_init_loc = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno) - self._self_name: Optional[str] = None + # self._self_name is already declared as class attribute def __aenter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs) @@ -74,17 +85,26 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> if not self._self_capture_sampler.capture(): return inner_func(*args, **kwargs) - start = time.monotonic_ns() + start: int = time.monotonic_ns() try: return inner_func(*args, **kwargs) finally: try: - end = self._self_acquired_at = time.monotonic_ns() + end: int = time.monotonic_ns() + self._self_acquired_at = end + thread_id: int + thread_name: str thread_id, thread_name = _current_thread() + task_id: Optional[int] + task_name: Optional[str] + task_frame: Optional[types.FrameType] task_id, task_name, task_frame = _task.get_task(thread_id) self._maybe_update_self_name() - lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + lock_name: str = ( + "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + ) + frame: types.FrameType if task_frame is None: # If we can't get the task frame, we use the caller frame. We expect acquire/release or # __enter__/__exit__ to be on the stack, so we go back 2 frames. @@ -95,9 +115,9 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id: int = _threading.get_thread_native_id(thread_id) - handle = ddup.SampleHandle() + handle: ddup.SampleHandle = ddup.SampleHandle() handle.push_monotonic_ns(end) handle.push_lock_name(lock_name) handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here @@ -107,6 +127,7 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) + ddframe: DDFrame for ddframe in frames: handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() @@ -122,7 +143,7 @@ def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: # self.__dict__.pop("_self_acquired_at", None) to remove the attribute. # Instead, we need to use the following workaround to retrieve and # remove the attribute. - start = getattr(self, "_self_acquired_at", None) + start: Optional[int] = getattr(self, "_self_acquired_at", None) try: # Though it should generally be avoided to call release() from # multiple threads, it is possible to do so. In that scenario, the @@ -138,11 +159,19 @@ def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: return inner_func(*args, **kwargs) finally: if start is not None: - end = time.monotonic_ns() + end: int = time.monotonic_ns() + thread_id: int + thread_name: str thread_id, thread_name = _current_thread() + task_id: Optional[int] + task_name: Optional[str] + task_frame: Optional[types.FrameType] task_id, task_name, task_frame = _task.get_task(thread_id) - lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + lock_name: str = ( + "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + ) + frame: types.FrameType if task_frame is None: # See the comments in _acquire frame = sys._getframe(2) @@ -152,9 +181,9 @@ def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id: int = _threading.get_thread_native_id(thread_id) - handle = ddup.SampleHandle() + handle: ddup.SampleHandle = ddup.SampleHandle() handle.push_monotonic_ns(end) handle.push_lock_name(lock_name) handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here @@ -164,6 +193,7 @@ def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) + ddframe: DDFrame for ddframe in frames: handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() @@ -180,12 +210,15 @@ def __exit__(self, *args: Any, **kwargs: Any) -> None: self._release(self.__wrapped__.__exit__, *args, **kwargs) def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]: + name: str + value: Any for name, value in var_dict.items(): if name.startswith("__") or isinstance(value, types.ModuleType): continue if value is self: return name if config.lock.name_inspect_dir: + attribute: str for attribute in dir(value): if not attribute.startswith("__") and getattr(value, attribute) is self: self._self_name = attribute @@ -203,7 +236,7 @@ def _maybe_update_self_name(self) -> None: # 2: acquire/release (or __enter__/__exit__) # 3: caller frame if config.enable_asserts: - frame = sys._getframe(1) + frame: types.FrameType = sys._getframe(1) if frame.f_code.co_name not in {"_acquire", "_release"}: raise AssertionError("Unexpected frame %s" % frame.f_code.co_name) frame = sys._getframe(2) @@ -236,7 +269,7 @@ def __get__(self, instance: Any, owner: Optional[Type] = None) -> "FunctionWrapp class LockCollector(collector.CaptureSamplerCollector): """Record lock usage.""" - PROFILED_LOCK_CLASS: typing.Type[typing.Any] + PROFILED_LOCK_CLASS: Type[Any] def __init__( self, @@ -277,7 +310,7 @@ def patch(self) -> None: self._original = self._get_patch_target() def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock: - lock = wrapped(*args, **kwargs) + lock: Any = wrapped(*args, **kwargs) return self.PROFILED_LOCK_CLASS( # type: ignore[attr-defined] lock, self.tracer, From 177e1539d754692a01ef491a9173c458e9875e11 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Tue, 30 Sep 2025 12:50:13 -0400 Subject: [PATCH 3/5] [dd-trace-py][Lock profiler] ruff-ruff --- ddtrace/profiling/collector/_lock.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index c0472bb3791..88e7743808e 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -6,15 +6,13 @@ import sys import time import types -from typing import ( # noqa: I001 - Any, - Callable, - Dict, - List, - Optional, - Tuple, - Type, -) +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Type import wrapt From 83841d665f33f25dce953fb9b11382fff0731beb Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Tue, 30 Sep 2025 14:18:45 -0400 Subject: [PATCH 4/5] restart CI From edde5cc7da616c674859782439e06a508f186a4a Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Tue, 30 Sep 2025 15:24:53 -0400 Subject: [PATCH 5/5] [dd-trace-py][Lock profiler] Fix ignore error --- ddtrace/profiling/collector/_lock.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 88e7743808e..1f8daf35b0b 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -309,7 +309,7 @@ def patch(self) -> None: def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock: lock: Any = wrapped(*args, **kwargs) - return self.PROFILED_LOCK_CLASS( # type: ignore[attr-defined] + return self.PROFILED_LOCK_CLASS( lock, self.tracer, self.nframes,