Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 88 additions & 68 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
import sys
import time
import types
import typing
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type

import wrapt

Expand All @@ -15,20 +21,19 @@
from ddtrace.profiling import collector
from ddtrace.profiling.collector import _task
from ddtrace.profiling.collector import _traceback
from ddtrace.profiling.event import DDFrame
from ddtrace.settings.profiling import config
from ddtrace.trace import Tracer


T = typing.TypeVar("T")


def _current_thread() -> typing.Tuple[int, str]:
thread_id = _thread.get_ident()
def _current_thread() -> Tuple[int, str]:
thread_id: int = _thread.get_ident()
return thread_id, _threading.get_thread_name(thread_id)


# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
# appear in the stack trace and we need to hide it.
WRAPT_C_EXT: bool
if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
WRAPT_C_EXT = False
else:
Expand All @@ -42,10 +47,18 @@ def _current_thread() -> typing.Tuple[int, str]:


class _ProfiledLock(wrapt.ObjectProxy):
_self_tracer: Optional[Tracer]
_self_max_nframes: int
_self_capture_sampler: collector.CaptureSampler
_self_endpoint_collection_enabled: bool
_self_init_loc: str
_self_name: Optional[str]
_self_acquired_at: int # monotonic_ns timestamp

def __init__(
self,
wrapped: typing.Any,
tracer: typing.Optional[Tracer],
wrapped: Any,
tracer: Optional[Tracer],
max_nframes: int,
capture_sampler: collector.CaptureSampler,
endpoint_collection_enabled: bool,
Expand All @@ -55,44 +68,54 @@ def __init__(
self._self_max_nframes = max_nframes
self._self_capture_sampler = capture_sampler
self._self_endpoint_collection_enabled = endpoint_collection_enabled
frame = sys._getframe(2 if WRAPT_C_EXT else 3)
code = frame.f_code
frame: types.FrameType = sys._getframe(2 if WRAPT_C_EXT else 3)
code: types.CodeType = frame.f_code
self._self_init_loc = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
self._self_name: typing.Optional[str] = None
# self._self_name is already declared as class attribute

def __aenter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
def __aenter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs)

def __aexit__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
def __aexit__(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.__aexit__, *args, **kwargs)

def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any) -> T:
def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
if not self._self_capture_sampler.capture():
return inner_func(*args, **kwargs)

start = time.monotonic_ns()
start: int = time.monotonic_ns()
try:
return inner_func(*args, **kwargs)
finally:
try:
end = self._self_acquired_at = time.monotonic_ns()
end: int = time.monotonic_ns()
self._self_acquired_at = end
thread_id: int
thread_name: str
thread_id, thread_name = _current_thread()
task_id: Optional[int]
task_name: Optional[str]
task_frame: Optional[types.FrameType]
task_id, task_name, task_frame = _task.get_task(thread_id)
self._maybe_update_self_name()
lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
lock_name: str = (
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
)

frame: types.FrameType
if task_frame is None:
# If we can't get the task frame, we use the caller frame. We expect acquire/release or
# __enter__/__exit__ to be on the stack, so we go back 2 frames.
frame = sys._getframe(2)
else:
frame = task_frame

frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id = _threading.get_thread_native_id(thread_id)
thread_native_id: int = _threading.get_thread_native_id(thread_id)

handle = ddup.SampleHandle()
handle: ddup.SampleHandle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
handle.push_lock_name(lock_name)
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
Expand All @@ -102,26 +125,23 @@ def _acquire(self, inner_func: typing.Callable[..., T], *args: typing.Any, **kwa

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span())

for f in frames:
handle.push_frame(f.function_name, f.file_name, 0, f.lineno)

ddframe: DDFrame
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
handle.flush_sample()
except Exception:
pass # nosec

def acquire(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
def acquire(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)

def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.Any, **kwargs: typing.Any) -> None:
# type (typing.Any, typing.Any) -> None

def _release(self, inner_func: Any, *args: Any, **kwargs: Any) -> None:
# The underlying threading.Lock class is implemented using C code, and
# it doesn't have the __dict__ attribute. So we can't do
# self.__dict__.pop("_self_acquired_at", None) to remove the attribute.
# Instead, we need to use the following workaround to retrieve and
# remove the attribute.
start = getattr(self, "_self_acquired_at", None)
start: Optional[int] = getattr(self, "_self_acquired_at", None)
try:
# Though it should generally be avoided to call release() from
# multiple threads, it is possible to do so. In that scenario, the
Expand All @@ -137,22 +157,31 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A
return inner_func(*args, **kwargs)
finally:
if start is not None:
end = time.monotonic_ns()
end: int = time.monotonic_ns()
thread_id: int
thread_name: str
thread_id, thread_name = _current_thread()
task_id: Optional[int]
task_name: Optional[str]
task_frame: Optional[types.FrameType]
task_id, task_name, task_frame = _task.get_task(thread_id)
lock_name = "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
lock_name: str = (
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
)

frame: types.FrameType
if task_frame is None:
# See the comments in _acquire
frame = sys._getframe(2)
else:
frame = task_frame

frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id = _threading.get_thread_native_id(thread_id)
thread_native_id: int = _threading.get_thread_native_id(thread_id)

handle = ddup.SampleHandle()
handle: ddup.SampleHandle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
handle.push_lock_name(lock_name)
handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here
Expand All @@ -162,29 +191,32 @@ def _release(self, inner_func: typing.Callable[..., typing.Any], *args: typing.A

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span())

for f in frames:
handle.push_frame(f.function_name, f.file_name, 0, f.lineno)
ddframe: DDFrame
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
handle.flush_sample()

def release(self, *args: typing.Any, **kwargs: typing.Any) -> None:
def release(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.release, *args, **kwargs)

acquire_lock = acquire

def __enter__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
def __enter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__enter__, *args, **kwargs)

def __exit__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
def __exit__(self, *args: Any, **kwargs: Any) -> None:
self._release(self.__wrapped__.__exit__, *args, **kwargs)

def _find_self_name(self, var_dict: typing.Dict) -> typing.Optional[str]:
def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
name: str
value: Any
for name, value in var_dict.items():
if name.startswith("__") or isinstance(value, types.ModuleType):
continue
if value is self:
return name
if config.lock.name_inspect_dir:
attribute: str
for attribute in dir(value):
if not attribute.startswith("__") and getattr(value, attribute) is self:
self._self_name = attribute
Expand All @@ -202,7 +234,7 @@ def _maybe_update_self_name(self) -> None:
# 2: acquire/release (or __enter__/__exit__)
# 3: caller frame
if config.enable_asserts:
frame = sys._getframe(1)
frame: types.FrameType = sys._getframe(1)
if frame.f_code.co_name not in {"_acquire", "_release"}:
raise AssertionError("Unexpected frame %s" % frame.f_code.co_name)
frame = sys._getframe(2)
Expand All @@ -228,43 +260,36 @@ class FunctionWrapper(wrapt.FunctionWrapper):
# Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
# method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
# builtin function. Override default wrapt behavior here that tries to detect bound method.
def __get__(
self,
instance: typing.Optional[typing.Any],
owner: typing.Optional[typing.Any] = None,
) -> "FunctionWrapper":
def __get__(self, instance: Any, owner: Optional[Type] = None) -> "FunctionWrapper":
return self


class LockCollector(collector.CaptureSamplerCollector):
"""Record lock usage."""

PROFILED_LOCK_CLASS: typing.Type[typing.Any]
PROFILED_LOCK_CLASS: Type[Any]

def __init__(
self,
nframes: int = config.max_frames,
endpoint_collection_enabled: bool = config.endpoint_collection,
tracer: typing.Optional[Tracer] = None,
*args: typing.Any,
**kwargs: typing.Any,
tracer: Optional[Tracer] = None,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.nframes = nframes
self.endpoint_collection_enabled = endpoint_collection_enabled
self.tracer = tracer
self._original: typing.Optional[typing.Any] = None
self.nframes: int = nframes
self.endpoint_collection_enabled: bool = endpoint_collection_enabled
self.tracer: Optional[Tracer] = tracer
self._original: Optional[Any] = None

@abc.abstractmethod
def _get_patch_target(self) -> typing.Type[typing.Any]:
raise NotImplementedError
def _get_patch_target(self) -> Callable[..., Any]:
...

@abc.abstractmethod
def _set_patch_target(
self,
value: typing.Any,
) -> None:
raise NotImplementedError
def _set_patch_target(self, value: Any) -> None:
...

def _start_service(self) -> None:
"""Start collecting lock usage."""
Expand All @@ -282,14 +307,9 @@ def patch(self) -> None:
# Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
self._original = self._get_patch_target()

def _allocate_lock(
wrapped: typing.Any,
instance: typing.Any,
args: typing.Tuple[typing.Any, ...],
kwargs: typing.Dict[str, typing.Any],
) -> typing.Any:
lock = wrapped(*args, **kwargs)
return self.PROFILED_LOCK_CLASS(
def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock:
lock: Any = wrapped(*args, **kwargs)
return self.PROFILED_LOCK_CLASS( # type: ignore[attr-defined]
lock,
self.tracer,
self.nframes,
Expand Down
Loading