diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 5a82ebd27c5..1f8daf35b0b 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -6,7 +6,13 @@ import sys import time import types -import typing +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Type import wrapt @@ -15,20 +21,19 @@ from ddtrace.profiling import collector from ddtrace.profiling.collector import _task from ddtrace.profiling.collector import _traceback +from ddtrace.profiling.event import DDFrame from ddtrace.settings.profiling import config from ddtrace.trace import Tracer -T = typing.TypeVar("T") - - -def _current_thread() -> typing.Tuple[int, str]: - thread_id = _thread.get_ident() +def _current_thread() -> Tuple[int, str]: + thread_id: int = _thread.get_ident() return thread_id, _threading.get_thread_name(thread_id) # We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will # appear in the stack trace and we need to hide it. +WRAPT_C_EXT: bool if os.environ.get("WRAPT_DISABLE_EXTENSIONS"): WRAPT_C_EXT = False else: @@ -42,10 +47,18 @@ def _current_thread() -> typing.Tuple[int, str]: class _ProfiledLock(wrapt.ObjectProxy): + _self_tracer: Optional[Tracer] + _self_max_nframes: int + _self_capture_sampler: collector.CaptureSampler + _self_endpoint_collection_enabled: bool + _self_init_loc: str + _self_name: Optional[str] + _self_acquired_at: int # monotonic_ns timestamp + def __init__( self, - wrapped: typing.Any, - tracer: typing.Optional[Tracer], + wrapped: Any, + tracer: Optional[Tracer], max_nframes: int, capture_sampler: collector.CaptureSampler, endpoint_collection_enabled: bool, @@ -55,32 +68,41 @@ def __init__( self._self_max_nframes = max_nframes self._self_capture_sampler = capture_sampler self._self_endpoint_collection_enabled = endpoint_collection_enabled - frame = sys._getframe(2 if WRAPT_C_EXT else 3) - code = frame.f_code + frame: types.FrameType = sys._getframe(2 if WRAPT_C_EXT else 3) + code: types.CodeType = frame.f_code self._self_init_loc = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno) - self._self_name: typing.Optional[str] = None + # self._self_name is already declared as class attribute - def __aenter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def __aenter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs) - def __aexit__(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def __aexit__(self, *args: Any, **kwargs: Any) -> Any: return self._release(self.__wrapped__.__aexit__, *args, **kwargs) - def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any) -> T: + def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: if not self._self_capture_sampler.capture(): return inner_func(*args, **kwargs) - start = time.monotonic_ns() + start: int = time.monotonic_ns() try: return inner_func(*args, **kwargs) finally: try: - end = self._self_acquired_at = time.monotonic_ns() + end: int = time.monotonic_ns() + self._self_acquired_at = end + thread_id: int + thread_name: str thread_id, thread_name = _current_thread() + task_id: Optional[int] + task_name: Optional[str] + task_frame: Optional[types.FrameType] task_id, task_name, task_frame = _task.get_task(thread_id) self._maybe_update_self_name() - lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + lock_name: str = ( + "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + ) + frame: types.FrameType if task_frame is None: # If we can't get the task frame, we use the caller frame. We expect acquire/release or # __enter__/__exit__ to be on the stack, so we go back 2 frames. @@ -88,11 +110,12 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa else: frame = task_frame + frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id: int = _threading.get_thread_native_id(thread_id) - handle = ddup.SampleHandle() + handle: ddup.SampleHandle = ddup.SampleHandle() handle.push_monotonic_ns(end) handle.push_lock_name(lock_name) handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here @@ -102,26 +125,23 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) - - for f in frames: - handle.push_frame(f.function_name, f.file_name, 0, f.lineno) - + ddframe: DDFrame + for ddframe in frames: + handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() except Exception: pass # nosec - def acquire(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def acquire(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.acquire, *args, **kwargs) - def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.Any, **kwargs: typing.Any) -> None: - # type (typing.Any, typing.Any) -> None - + def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None: # The underlying threading.Lock class is implemented using C code, and # it doesn't have the __dict__ attribute. So we can't do # self.__dict__.pop("_self_acquired_at", None) to remove the attribute. # Instead, we need to use the following workaround to retrieve and # remove the attribute. - start = getattr(self, "_self_acquired_at", None) + start: Optional[int] = getattr(self, "_self_acquired_at", None) try: # Though it should generally be avoided to call release() from # multiple threads, it is possible to do so. In that scenario, the @@ -137,22 +157,31 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A return inner_func(*args, **kwargs) finally: if start is not None: - end = time.monotonic_ns() + end: int = time.monotonic_ns() + thread_id: int + thread_name: str thread_id, thread_name = _current_thread() + task_id: Optional[int] + task_name: Optional[str] + task_frame: Optional[types.FrameType] task_id, task_name, task_frame = _task.get_task(thread_id) - lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + lock_name: str = ( + "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc + ) + frame: types.FrameType if task_frame is None: # See the comments in _acquire frame = sys._getframe(2) else: frame = task_frame + frames: List[DDFrame] frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - thread_native_id = _threading.get_thread_native_id(thread_id) + thread_native_id: int = _threading.get_thread_native_id(thread_id) - handle = ddup.SampleHandle() + handle: ddup.SampleHandle = ddup.SampleHandle() handle.push_monotonic_ns(end) handle.push_lock_name(lock_name) handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here @@ -162,29 +191,32 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A if self._self_tracer is not None: handle.push_span(self._self_tracer.current_span()) - - for f in frames: - handle.push_frame(f.function_name, f.file_name, 0, f.lineno) + ddframe: DDFrame + for ddframe in frames: + handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) handle.flush_sample() - def release(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def release(self, *args: Any, **kwargs: Any) -> Any: return self._release(self.__wrapped__.release, *args, **kwargs) acquire_lock = acquire - def __enter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + def __enter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__enter__, *args, **kwargs) - def __exit__(self, *args: typing.Any, **kwargs: typing.Any) -> None: + def __exit__(self, *args: Any, **kwargs: Any) -> None: self._release(self.__wrapped__.__exit__, *args, **kwargs) - def _find_self_name(self, var_dict: typing.Dict) -> typing.Optional[str]: + def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]: + name: str + value: Any for name, value in var_dict.items(): if name.startswith("__") or isinstance(value, types.ModuleType): continue if value is self: return name if config.lock.name_inspect_dir: + attribute: str for attribute in dir(value): if not attribute.startswith("__") and getattr(value, attribute) is self: self._self_name = attribute @@ -202,7 +234,7 @@ def _maybe_update_self_name(self) -> None: # 2: acquire/release (or __enter__/__exit__) # 3: caller frame if config.enable_asserts: - frame = sys._getframe(1) + frame: types.FrameType = sys._getframe(1) if frame.f_code.co_name not in {"_acquire", "_release"}: raise AssertionError("Unexpected frame %s" % frame.f_code.co_name) frame = sys._getframe(2) @@ -228,43 +260,36 @@ class FunctionWrapper(wrapt.FunctionWrapper): # Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static" # method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a # builtin function. Override default wrapt behavior here that tries to detect bound method. - def __get__( - self, - instance: typing.Optional[typing.Any], - owner: typing.Optional[typing.Any] = None, - ) -> "FunctionWrapper": + def __get__(self, instance: Any, owner: Optional[Type] = None) -> "FunctionWrapper": return self class LockCollector(collector.CaptureSamplerCollector): """Record lock usage.""" - PROFILED_LOCK_CLASS: typing.Type[typing.Any] + PROFILED_LOCK_CLASS: Type[Any] def __init__( self, nframes: int = config.max_frames, endpoint_collection_enabled: bool = config.endpoint_collection, - tracer: typing.Optional[Tracer] = None, - *args: typing.Any, - **kwargs: typing.Any, + tracer: Optional[Tracer] = None, + *args: Any, + **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) - self.nframes = nframes - self.endpoint_collection_enabled = endpoint_collection_enabled - self.tracer = tracer - self._original: typing.Optional[typing.Any] = None + self.nframes: int = nframes + self.endpoint_collection_enabled: bool = endpoint_collection_enabled + self.tracer: Optional[Tracer] = tracer + self._original: Optional[Any] = None @abc.abstractmethod - def _get_patch_target(self) -> typing.Type[typing.Any]: - raise NotImplementedError + def _get_patch_target(self) -> Callable[..., Any]: + ... @abc.abstractmethod - def _set_patch_target( - self, - value: typing.Any, - ) -> None: - raise NotImplementedError + def _set_patch_target(self, value: Any) -> None: + ... def _start_service(self) -> None: """Start collecting lock usage.""" @@ -282,13 +307,8 @@ def patch(self) -> None: # Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile. self._original = self._get_patch_target() - def _allocate_lock( - wrapped: typing.Any, - instance: typing.Any, - args: typing.Tuple[typing.Any, ...], - kwargs: typing.Dict[str, typing.Any], - ) -> typing.Any: - lock = wrapped(*args, **kwargs) + def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock: + lock: Any = wrapped(*args, **kwargs) return self.PROFILED_LOCK_CLASS( lock, self.tracer,